ВУЗ: Не указан
Категория: Не указан
Дисциплина: Не указана
Добавлен: 12.12.2023
Просмотров: 95
Скачиваний: 3
ВНИМАНИЕ! Если данный файл нарушает Ваши авторские права, то обязательно сообщите нам.
Глава 4
Многопоточное программирование
214
С этого момента возможность захватить блокировку и войти в критический участок получает любой из оставшихся ожидающих потоков. Заслуживает внимания то, что отсутствует какое-либо упорядочение потоков, работа которых организована с помо- щью блокировок (т.е. применяется принцип простой очереди — “первым пришел, первым обслуживается”); процесс выбора потока-победителя не детерминирован и может зависеть даже от применяемой реализации Python.
Рассмотрим, с чем связана необходимость применения блокировок. Сценарий mtsleepF.py представляет собой приложение, в котором происходит порождение случайным образом выбранного количества потоков, в каждом из которых осущест- вляется выход после завершения работы. Рассмотрим следующий базовый фрагмент исходного кода (для версии Python 2):
from atexit import register
from random import randrange
from threading import Thread, currentThread
from time import sleep, ctime
class CleanOutputSet(set):
def __str__(self):
return ', '.join(x for x in self) loops = (randrange(2,5) for x in xrange(randrange(3,7))) remaining = CleanOutputSet()
def loop(nsec): myname = currentThread().name remaining.add(myname)
print '[%s] Started %s' % (ctime(), myname) sleep(nsec) remaining.remove(myname)
print '[%s] Completed %s (%d secs)' % ( ctime(), myname, nsec)
print ' (remaining: %s)' % (remaining or 'NONE')
def _main():
for pause in loops:
Thread(target=loop, args=(pause,)).start()
@register
def _atexit():
print 'all DONE at:', ctime()
Более подробное построчное описание кода мы приведем вслед за окончательным вариантом сценария, в котором применяются блокировки, но вкратце можно отме- тить, что сценарий mtsleepF.py по сути лишь дополняет приведенные ранее приме- ры. Как и в примере сценария bookrank.py, немного упростим код. Для этого отло- жим на время применение средств объектно-ориентированного программирования, исключим список объектов потока и операции join() с потоками и снова введем в действие метод atexit.register() (по тем же причинам, как и в коде bookrank.py).
Проведем еще одно небольшое изменение по отношению к приведенным ранее примерам mtsleepX.py. Вместо жесткого задания пары операций приостановки ци- клов/потоков на 4 и 2 секунды соответственно, внесем некую неопределенность, созда- вая случайным образом от 3 до 6 потоков, каждый из которых может приостанавли- ваться на какой-то промежуток времени от 2 до 4 секунд.
06_ch04.indd 214 22.01.2015 22:00:46
215 4.7. Практическое применение многопоточной обработки
В этом сценарии применяются также некоторые новые средства, причем наиболее заметным среди них является использование множества для хранения имен остав- шихся потоков, которые все еще функционируют. Причина, по которой создается подкласс объекта множества вместо непосредственного использования самого класса, состоит в том, что это позволяет продемонстрировать еще один вариант использова- ния множества, в котором изменяется применяемое по умолчанию для вывода стро- ковое представление множества.
При использовании операции вывода содержимого множества формируются примерно такие результаты: set([X, Y, Z,...]). Однако это не очень удобно, по- скольку потенциальные пользователи нашего приложения не знают (и не должны знать) о том, что такое множества и для чего они используются в программе. Таким образом, необходимо вместо этого вывести данные, которые выглядят примерно как
X, Y, Z, .... Именно по этой причине мы создали подкласс класса set и реализова- ли его метод __str__().
После этого изменения, при условии, что вся остальная часть сценария будет ра- ботать правильно, должен сформироваться аккуратный вывод, который будет иметь подходящее выравнивание:
$ python mtsleepF.py
[Sat Apr 2 11:37:26 2011] Started Thread-1
[Sat Apr 2 11:37:26 2011] Started Thread-2
[Sat Apr 2 11:37:26 2011] Started Thread-3
[Sat Apr 2 11:37:29 2011] Completed Thread-2 (3 secs)
(remaining: Thread-3, Thread-1)
[Sat Apr 2 11:37:30 2011] Completed Thread-1 (4 secs)
(remaining: Thread-3)
[Sat Apr 2 11:37:30 2011] Completed Thread-3 (4 secs)
(remaining: NONE) all DONE at: Sat Apr 2 11:37:30 2011
However, if you're unlucky, you might get strange output such as this pair of example executions:
$ python mtsleepF.py
[Sat Apr 2 11:37:09 2011] Started Thread-1
[Sat Apr 2 11:37:09 2011] Started Thread-2
[Sat Apr 2 11:37:09 2011] Started Thread-3
[Sat Apr 2 11:37:12 2011] Completed Thread-1 (3 secs)
[Sat Apr 2 11:37:12 2011] Completed Thread-2 (3 secs)
(remaining: Thread-3)
(remaining: Thread-3)
[Sat Apr 2 11:37:12 2011] Completed Thread-3 (3 secs)
(remaining: NONE) all DONE at: Sat Apr 2 11:37:12 2011
$ python mtsleepF.py
[Sat Apr 2 11:37:56 2011] Started Thread-1
[Sat Apr 2 11:37:56 2011] Started Thread-2
[Sat Apr 2 11:37:56 2011] Started Thread-3
[Sat Apr 2 11:37:56 2011] Started Thread-4
[Sat Apr 2 11:37:58 2011] Completed Thread-2 (2 secs)
[Sat Apr 2 11:37:58 2011] Completed Thread-4 (2 secs)
(remaining: Thread-3, Thread-1)
(remaining: Thread-3, Thread-1)
[Sat Apr 2 11:38:00 2011] Completed Thread-1 (4 secs)
06_ch04.indd 215 22.01.2015 22:00:46
Глава 4
Многопоточное программирование
216
(remaining: Thread-3)
[Sat Apr 2 11:38:00 2011] Completed Thread-3 (4 secs)
(remaining: NONE) all DONE at: Sat Apr 2 11:38:00 2011
Что же произошло? Во-первых, очевидно, что результаты далеко не однородны
(поскольку возможность выполнять операции ввода-вывода параллельно предостав- лена сразу нескольким потокам). Подобное чередование результирующих данных можно было также наблюдать и в некоторых примерах приведенного ранее кода.
Во-вторых, обнаруживается такая проблема, что два потока изменяют значение од- ной и той же переменной (множества, содержащего имена оставшихся потоков).
Операции ввода-вывода и операции доступа к одной и той же структуре данных входят в состав критических разделов кода, поэтому необходимы блокировки, ко- торые смогли бы воспрепятствовать одновременному вхождению в эти разделы не- скольких потоков. Чтобы ввести в действие блокировки, необходимо добавить стро- ку кода для импорта объекта Lock (или RLock), создать объект блокировки и внести дополнения или изменения в код, позволяющие применять блокировки в нужных местах:
from threading import Thread, Lock, currentThread lock = Lock()
Теперь необходимо обеспечить установку и снятие блокировки. В следующем коде показаны вызовы acquire() и release(), которые должны быть введены в функцию loop():
def loop(nsec): myname = currentThread().name
lock.acquire() remaining.add(myname)
print '[%s] Started %s' % (ctime(), myname)
lock.release() sleep(nsec)
lock.acquire() remaining.remove(myname)
print '[%s] Completed %s (%d secs)' % ( ctime(), myname, nsec)
print ' (remaining: %s)' % (remaining or 'NONE')
lock.release()
После внесения изменений полученный вывод уже не должен содержать прежних искажений:
$ python mtsleepF.py
[Sun Apr 3 23:16:59 2011] Started Thread-1
[Sun Apr 3 23:16:59 2011] Started Thread-2
[Sun Apr 3 23:16:59 2011] Started Thread-3
[Sun Apr 3 23:16:59 2011] Started Thread-4
[Sun Apr 3 23:17:01 2011] Completed Thread-3 (2 secs)
(remaining: Thread-4, Thread-2, Thread-1)
[Sun Apr 3 23:17:01 2011] Completed Thread-4 (2 secs)
(remaining: Thread-2, Thread-1)
06_ch04.indd 216 22.01.2015 22:00:46
217 4.7. Практическое применение многопоточной обработки
[Sun Apr 3 23:17:02 2011] Completed Thread-1 (3 secs)
(remaining: Thread-2)
[Sun Apr 3 23:17:03 2011] Completed Thread-2 (4 secs)
(remaining: NONE) all DONE at: Sun Apr 3 23:17:03 2011
Исправленная (и окончательная) версия mtsleepF.py показана в примере 4.10.
1 2 3 4 5 6 7 8
Пример 4.10. Сценарий, в котором предусмотрены блокировки и введены дополнительные
средства случайного выбора (
mtsleepF.py)
В этом примере демонстрируется использование блокировок и других инструмен- тов обеспечения многопоточного функционирования.
1 #!/usr/bin/env python
2 3 from atexit import register
4 from random import randrange
5 from threading import Thread, Lock, currentThread
6 from time import sleep, ctime
7 8 class CleanOutputSet(set):
9 def __str__(self):
10 return ', '.join(x for x in self)
11 12 lock = Lock()
13 loops = (randrange(2,5) for x in xrange(randrange(3,7)))
14 remaining = CleanOutputSet()
15 16 def loop(nsec):
17 myname = currentThread().name
18 lock.acquire()
19 remaining.add(myname)
20 print '[%s] Started %s' % (ctime(), myname)
21 lock.release()
22 sleep(nsec)
23 lock.acquire()
24 remaining.remove(myname)
25 print '[%s] Completed %s (%d secs)' % (
26 ctime(), myname, nsec)
27 print ' (remaining: %s)' % (remaining or 'NONE')
28 lock.release()
29 30 def _main():
31 for pause in loops:
32 Thread(target=loop, args=(pause,)).start()
33 34 @register
35 def _atexit():
36 print 'all DONE at:', ctime()
37 38 if __name__ == '__main__':
39 main()
06_ch04.indd 217 22.01.2015 22:00:47
Глава 4
Многопоточное программирование
218
Построчное объяснение
Строки 1–6
Это обычные строки запуска и импорта. Следует учитывать, что начиная с версии 2.6 метод threading.currentThread() переименован в threading.
current_thread(), но первый компонент имени метода остался неизмен- ным в целях обеспечения обратной совместимости.
Строки 8–10
Это подкласс множества, о котором речь шла выше. Он содержит реализацию ме- тода __str__(), который позволяет вместо формата вывода, применяемого по умол- чанию, перейти к формату, представляющему собой строку элементов, разделенную запятыми.
Строки 12–14
В состав применяемых глобальных переменных входят блокировка, экземпляр по- заимствованного из предыдущего кода откорректированного множества и случайно выбранного числа потоков (от трех до шести), каждый из которых останавливается
(вернее, приостанавливается) на время от двух до четырех секунд.
Строки 16–28
Функция loop() сохраняет имя текущего потока, в котором она выполняется, за- тем захватывает блокировку, что позволяет сделать неразрывным (атомарным) сле- дующий ряд операций: добавление имени к множеству remaining и формирование вывода с указанием на начало потока (с этого момента больше никакой другой поток не сможет войти в критический участок кода). После освобождения блокировки этот поток приостанавливается на время в секундах, выбранное случайным образом, затем повторно захватывает блокировку, чтобы сформировать свой окончательный вывод перед ее освобождением.
Строки 30–39
Функция _main() выполняется, только если этот сценарий не был импортирован для использования в другой программе. Назначение этой функции состоит в том, что она порождает и запускает каждый из потоков. Как было указано выше, применяется метод atexit.register() для регистрации функции _atexit(), которую интерпре- татор может выполнить перед выходом из программы.
В качестве альтернативного по отношению к варианту, в котором поддерживается собственное множество выполняющихся в настоящее время потоков, можно приме- нить вариант с использованием метода threading.enumerate(), который возвращает список всех потоков, работающих в настоящее время (этот список включает потоки, действующие в качестве демона, но, безусловно, в него не входят потоки, которые еще не запущены). В рассматриваемом примере этот вариант не используется, поскольку его применение приводит к созданию двух дополнительных потоков, которые прихо- дится удалять для сокращения объема вывода, в том числе текущего потока (посколь- ку он еще не завершен) и основного потока (который так или иначе не должен быть показан).
06_ch04.indd 218 22.01.2015 22:00:47
219 4.7. Практическое применение многопоточной обработки
Не следует также забывать, что вместо оператора форматирования строки можно использовать метод str.format(), при условии, что для работы применяется вер- сия Python 2.6 или более новая (включая версии 3.x). Иными словами, следующая инструкция print:
print '[%s] Started %s' % (ctime(), myname)
может быть заменена в версии 2.6 и последующей таким вызовом:
print '[{0}] Started {1}'.format(ctime(), myname)
или (в версии 3.x) таким вызовом print():
print('[{0}] Started {1}'.format(ctime(), myname))
Если задача состоит лишь в том, чтобы подсчитать число потоков, выполняю- щихся в настоящее время, то вместо этого можно использовать метод threading.
activeCount() (переименованный в active_count(), начиная с версии 2.6).
Применение управления контекстом
Программисты, работающие в версии Python 2.5 и более новой, могут вос- пользоваться еще одним вариантом, который вообще не требует вызова ме- тодов acquire() и release() применительно к блокировке, что способству- ет еще большему упрощению кода. С помощью инструкции with можно вводить в действие для любого объекта диспетчер контекста, который обе- спечит вызов метода acquire() перед входом в критический участок и мето- да release(), когда этот блок завершит выполнение.
Диспетчеры контекста предусмотрены для всех объектов модуля threading, та- ких как Lock, RLock, Condition, Semaphore и BoundedSemaphore, а это означает, что для работы с этими объектами может применяться инструкция with. С помощью инструкции with можно еще больше упростить код функции loop() следующим образом:
from __future__ import with_statement # только в версии 2.5
def loop(nsec): myname = currentThread().name
with lock: remaining.add(myname)
print '[%s] Started %s' % (ctime(), myname) sleep(nsec)
with lock: remaining.remove(myname)
print '[%s] Completed %s (%d secs)' % ( ctime(), myname, nsec)
print ' (remaining: %s)' % ( remaining or 'NONE',)
06_ch04.indd 219 22.01.2015 22:00:47
Глава 4
Многопоточное программирование
220
Перенос приложения в версию Python 3
Теперь можно сравнительно легко перенести приведенный выше сценарий в версию Python 3.x, применив к нему инструмент 2to3 (следующий вывод приведен в сокращенном виде, поскольку полные результаты применения diff уже были показаны ранее):
$ 2to3 -w mtsleepF.py
RefactoringTool: Skipping implicit fixer: buffer
RefactoringTool: Skipping implicit fixer: idioms
RefactoringTool: Skipping implicit fixer: set_literal
RefactoringTool: Skipping implicit fixer: ws_comma
:
RefactoringTool: Files that were modified:
RefactoringTool: mtsleepF.py
После переименования mtsleepF.py в mtsleepF3.py и mtsleep.py.bak в mtsleepF.py обнаруживается, что этот сценарий относится к той замечательной ка- тегории программ, перенос которых в новую версию интерпретатора происходит идеально, без малейших проблем:
$ python3 mtsleepF3.py
[Sun Apr 3 23:29:39 2011] Started Thread-1
[Sun Apr 3 23:29:39 2011] Started Thread-2
[Sun Apr 3 23:29:39 2011] Started Thread-3
[Sun Apr 3 23:29:41 2011] Completed Thread-3 (2 secs)
(remaining: Thread-2, Thread-1)
[Sun Apr 3 23:29:42 2011] Completed Thread-2 (3 secs)
(remaining: Thread-1)
[Sun Apr 3 23:29:43 2011] Completed Thread-1 (4 secs)
(remaining: NONE) all DONE at: Sun Apr 3 23:29:43 2011
Теперь, после ознакомления с блокировками, перейдем к изучению семафоров и рассмотрим пример, в котором используется и то и другое.
4.7.4. Пример семафора
Как уже было сказано, блокировки являются довольно простыми для понимания и могут быть легко реализованы. Кроме того, можно довольно легко определить, в каком случае они действительно необходимы. Однако ситуация может оказаться сложнее, и тогда вместо блокировки потребуется более мощный примитив синхро- низации. Если в приложении приходится иметь дело с ограниченными ресурсами, то семафоры могут оказаться более приемлемыми.
Семафоры относятся к числу примитивов синхронизации, которые были введе- ны в действие раньше других. Семафор, по существу, представляет собой счетчик, значение которого уменьшается после захвата ресурса (и снова увеличивается после освобождения ресурса). Семафоры, представляющие закрепленные за ними ресурсы, можно рассматривать как доступные или недоступные. Действие по захвату ресурса и уменьшению значения счетчика принято обозначать как P() (от голландского слова probeer/proberen), но для обозначения этого действия применяются также термины
“переход в состояние ожидания”, “осуществление попытки”, “захват”, “приоста- новка” или “получение”. И наоборот, после завершения работы потока с ресурсом
06_ch04.indd 220 22.01.2015 22:00:47
221 4.7. Практическое применение многопоточной обработки должен быть произведен возврат ресурса в пул ресурсов. Для этого применяется действие, по традиции обозначаемое V() (от голландского слова verhogen/verhoog).
Это действие обозначается также как “сигнализация”, “наращивание”, “отпускание”,
“отправка”, “освобождение”. В языке Python вместо всех этих вариантов именования применяются упрощенные обозначения, согласно которым функции и (или) методы обозначаются как те, что служат для работы с блокировками: acquire и release. Се- мафоры являются более гибкими, чем блокировки, поскольку обеспечивают работу с несколькими потоками, в каждом из которых используется один из экземпляров конечного ресурса.
В качестве следующего примера рассмотрим предельно упрощенную модель ав- томата для торговли конфетами. В данном конкретном автомате имеются в наличии только пять карманов, заполняемых запасом товара (шоколадными батончиками).
Если заполнены все карманы, то в автомат больше нельзя заправить ни одной кон- феты и, аналогично, при отсутствии в автомате шоколадных батончиков какого-то конкретного типа покупатели, желающие приобрести именно их, будут вынуждены возвратиться с пустыми руками. В данном случае ресурсы (карманы для конфет) яв- ляются конечными, и для отслеживания их состояния можно использовать семафор.
Исходный код сценария (candy.py) приведен в примере 4.11.
Пример 4.11. Автомат для торговли конфетами, управляемый
с помощью семафоров (
candy.py)
В этом сценарии блокировки и семафоры используются для моделирования авто- мата для торговли конфетами.
1 #!/usr/bin/env python
2 3 from atexit import register
4 from random import randrange
5 from threading import BoundedSemaphore, Lock, Thread
6 from time import sleep, ctime
7 8 lock = Lock()
9 MAX = 5 10 candytray = BoundedSemaphore(MAX)
11 12 def refill():
13 lock.acquire()
14 print 'Refilling candy...',
15 try:
16 candytray.release()
17 except ValueError:
18 print 'full, skipping'
19: else:
20: print 'OK'
21: lock.release()
22:
23: def buy():
24: lock.acquire()
25: print 'Buying candy...',
26: if candytray.acquire(False):
27: print 'OK'
28: else:
29: print 'empty, skipping'
06_ch04.indd 221 22.01.2015 22:00:48
Глава 4
Многопоточное программирование
222 30: lock.release()
31:
32: def producer(loops):
33: for i in xrange(loops):
34: refill()
35: sleep(randrange(3))
36:
37: def consumer(loops):
38: for i in xrange(loops):
39: buy()
40: sleep(randrange(3))
41:
42: def _main():
43: print 'starting at:', ctime()
44: nloops = randrange(2, 6)
45: print 'THE CANDY MACHINE (full with %d bars)!' % MAX
46: Thread(target=consumer, args=(randrange(
47: nloops, nloops+MAX+2),)).start() # покупатель
48: Thread(target=producer, args=(nloops,)).start() #продавец
49:
50: @register
51: def _atexit():
52: print 'all DONE at:', ctime()
53:
54: if __name__ == '__main__':
55: _main()
Построчное объяснение
Строки 1–6
Строки запуска и импорта практически полностью совпадают с теми, которые были приведены в предыдущих примерах этой главы. Добавлено лишь объявление семафора. В модуле threading предусмотрены два класса семафоров, Semaphore и
BoundedSemaphore. Как уже было сказано, в действительности семафоры — просто счетчики; при запуске семафора задается некоторое постоянное число, которое опре- деляет конечное количество единиц ресурса.
Значение этого счетчика уменьшается на 1 после потребления одной единицы из конечного количества единиц ресурса, а после возврата этой единицы в пул значение счетчика увеличивается. Объект BoundedSemaphore предоставляет дополнительную возможность, которая состоит в том, что значение счетчика не может быть увеличено свыше установленного для него начального значения; иными словами, позволяет пре- дотвратить возникновение такой абсурдной ситуации, при которой количество опе- раций освобождения семафора превышает количество операций его захвата.
Строки 8–10
Глобальные переменные в этом сценарии определяют блокировку, константу, представляющую максимальное количество единиц ресурса, которые могут составить потребляемый запас, а также лоток с конфетами.
06_ch04.indd 222 22.01.2015 22:00:48