multiprocessing
— Процессный параллелизм¶
Исходный код: Lib/multiprocessing/
Введение¶
multiprocessing
- это пакет, который поддерживает порождение процессов с использованием
API, похожим на модуль threading
. Пакет multiprocessing
предлагает как локальный, так и удаленный параллелизм, эффективно обходя
Глобальную блокировку интерпретатора с использованием
подпроцессов вместо потоков. Также модуль multiprocessing
позволяет программисту полностью
использовать несколько процессоров на данном компьютере. Работает как на Unix, так и на Windows.
Модуль multiprocessing
также вводит API, которые не имеет аналогов в модуле
threading
. Ярким примером этого является объект Pool
, который
предлагает удобное средство параллелирования выполнения функции по множеству
входных значениц, распределения входных данных по процессам (параллелизм
данных). Следующий пример демонстрирует обычную практику определения таких
функций в модуле, чтобы дочерние процессы могли успешно импортировать этот
модуль. Этот базовый пример параллелизма данных с использованием Pool
:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
будет напечатано в стандартный вывод:
[1, 4, 9]
Класс Process
¶
В multiprocessing
процессы порождаются созданием объекта Process
, а затем вызовом
его метода start()
. Process
следует API threading.Thread
. Тривиальный
пример многопроцессорной программы:
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
Ниже приведен развернутый пример, показывающий соответствующие идентификаторы отдельных процессов:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
Объяснение причины необходимости if __name__ == '__main__'
части см. в разделе Программирование рекомендаций.
Контексты и методы запуска¶
В зависимости от платформы, multiprocessing
поддерживает три способа запуска
процесса. Далее перечислены методы запуска.
- spawn
Родительский процесс запускает новый процесс интерпретатора Python. Дочерний процесс унаследует только те ресурсы, которые необходимы для выполнения метода
run()
объекта процесса. В частности, ненужные файловые дескрипторы и дескрипторы родительского процесса не будут унаследованы. Запуск процесса с использованием этого метода довольно медленный по сравнению с использованием fork или forkserver.Доступно в Unix и Windows. Значение по умолчанию для Windows и macOS.
- fork
Родительский процесс использует
os.fork()
для форсирования Python интерпретатор. Дочерний процесс, когда он начинается, фактически идентичен родительскому процессу. Все ресурсы родителя наследуются дочерним процессом. Следует отметить, что безопасная форсировка многопоточного процесса является проблематичной.Доступно только в Unix. Значение по умолчанию в Unix.
- forkserver
Когда программа начинает и выбирает метод начала forkserver, процесс сервера, начат. С этого момента всякий раз, когда требуется новый процесс, родительский процесс подключается к серверу и запрашивает, чтобы он форсировал новый процесс. Процесс сервера вилки является однопотоковым, поэтому он безопасен для использования
os.fork()
. Ненужные ресурсы не наследуются.Доступный на платформах Unix, которые поддерживают мимолетный файл дескрипторы по Unix пайпы.
Изменено в версии 3.8: На macOS метод начала spawn - теперь дефолт. Метод запуска fork следует считать небезопасным, поскольку он может привести к сбоям подпроцессы. См. bpo-33725.
Изменено в версии 3.4: spawn добавлены на всех платформах unix и forkserver добавлены для некоторых платформ unix. Дочерние процессы больше не наследуют все унаследованные родительские дескрипторы в Windows.
В Unix с помощью методов запуска spawn или forkserver также запускается
процесс resource tracker, который отслеживает несвязанные именованные системные
ресурсы (такие как именованные семафоры или SharedMemory
объекты), созданные процессами программы. После завершения всех процессов средство отслеживания
ресурсов отменяет связь с любым оставшимся отслеживаемым объектом. Обычно их не
должно быть, но если какой-либо процесс был убит сигналом, то может возникнуть
некоторая «утечка» ресурсов. (Ни просочившиеся семафоры, ни сегменты общей
памяти не будут автоматически отключены до следующей перезагрузки. Это
проблематично для обоих объектов, поскольку система допускает только
ограниченное количество именованных семафоров, а сегменты совместно используемой
памяти занимают некоторое место в основной памяти.)
Для выбора метода запуска используется set_start_method()
в if __name__ == '__main__'
клаузула
основного модуля. Например:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method()
не должны быть используемый более одного раза в программе.
Кроме того, можно использовать get_context()
для получения объекта контекст.
Объекты контекста имеют тот же API, что и модуль многопроцессорной обработки, и
позволяют использовать несколько методов запуска в одной программе:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
Обратите внимание, что объекты, связанные с одним контекст, могут быть несовместимы с процессами для другого контекст. В частности, блокировки, созданные с помощью fork контекст, не могут быть переданы процессам, запущенным с помощью методов запуска spawn или forkserver.
Библиотека, которая хочет использовать конкретный метод запуска, должна,
вероятно, использовать get_context()
, чтобы избежать вмешательства в выбор
пользователя библиотеки.
Предупреждение
Стартовые методы 'spawn'
и 'forkserver'
в настоящее время не могут быть
используемый с «замороженными» исполняемыми файлами (т.е. двоичными файлами,
создаваемыми пакетами, такими как PyInstaller and cx_Freeze) в Unix.
Стартовый метод 'fork'
работает.
Обмен объектами между процессами¶
multiprocessing
поддерживает два типа каналов связи между процессами:
Очереди
Класс
Queue
является близким клономqueue.Queue
. Например:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # распечатает "[42, None, 'hello']" p.join()Очереди поток и безопасны для обработки.
Пайпы
Pipe()
функционируют возвращает пара объектов связи, связанных пайп, который по умолчанию является (двухсторонним) дуплексом. Например:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # распечатает "[42, None, 'hello']" p.join()Два объекта соединения возвращенный
Pipe()
представляют два конца пайп. Каждый объект подключения имеет методыsend()
иrecv()
(среди прочих). Обратите внимание, что данные в пайп могут быть повреждены, если два процесса (или потоки) пытаются одновременно считать или записывать в same конец пайп. Конечно, нет риска коррупции от процессов, использующих разные концы пайп одновременно.
Синхронизация между процессами¶
multiprocessing
содержит эквиваленты всех примитивов синхронизации из threading
.
Для сущность можно использовать блокировку для обеспечения одновременной
печати только одного процесса в стандартный вывод:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
Без использования блокировки выходные данные различных процессов могут быть смешаны.
Совместное использование состояние между процессами¶
Как упомянуто выше, когда выполнение параллельного программирования его является обычно лучшим избегать использования разделенного состояние в максимально возможной степени. Это особенно верно при использовании нескольких процессов.
Однако, если вам действительно нужно использовать некоторые общие данные, то
multiprocessing
предоставляет пару способов сделать это.
Общая память
Данные могут храниться в карте совместно используемой памяти с помощью
Value
илиArray
. Например, следующие код:from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])напечатает:
3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]Аргументы
'd'
и'i'
используемый при созданииnum
иarr
являются типкодами вида используемый модулемarray
:'d'
указывает двойную точность float и'i'
обозначает целое число со знаком. Эти общие объекты будут процессом и потокобезопасной.Для большей гибкости в использовании совместно используемой памяти можно использовать модуль
multiprocessing.sharedctypes
, поддерживающий создание объектов произвольных ctypes, выделенных из совместно используемой памяти.
Серверный процесс
Объект менеджера возвращенный
Manager()
управляет серверным процессом, который содержит объекты Python и позволяет другим процессам манипулировать ими с помощью прокси.Менеджер возвращенный
Manager()
поддержит типыlist
,dict
,Namespace
,Lock
,RLock
,Semaphore
,BoundedSemaphore
,Condition
,Event
,Barrier
,Queue
,Value
иArray
. Например,:from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)напечатает:
{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]Менеджеры серверных процессов более гибки, чем использование общих объектов памяти, поскольку они могут быть созданы для поддержки произвольных типов объектов. Кроме того, один менеджер может совместно использоваться процессами на разных компьютерах по сети. Однако они работают медленнее, чем совместно используемая память.
Использование пула воркеров¶
Класс Pool
представляет пул рабочих процессов. Он содержит методы,
позволяющие выгружать задачи в рабочие процессы несколькими различными
способами.
Например:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# запуск 4 рабочих процессов
with Pool(processes=4) as pool:
# печать "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# печатать одинаковые числа в произвольном порядке
for i in pool.imap_unordered(f, range(10)):
print(i)
# вычислить "f(20)" асинхронно
res = pool.apply_async(f, (20,)) # запускается в *только* одном процессе
print(res.get(timeout=1)) # печатает "400"
# вычислить "os.getpid()" асинхронно
res = pool.apply_async(os.getpid, ()) # запускается в *только* одном процессе
print(res.get(timeout=1)) # печатает PID этого процесса
# запуск нескольких оценок асинхронно *может* использовать больше процессов
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# уснуть одному рабочему на 10 секунд
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# выход из блока with с остановкой пула
print("Now the pool is closed and no longer available")
Обратите внимание, что методы бассейна должны только когда-либо быть используемый процессом, который создал его.
Примечание
Функциональность этого пакета требует, чтобы модуль __main__
был импортирован
нижестоящими элементами. Это освещено в Программирование рекомендаций, однако здесь стоит указать.
Это означает, что некоторые примеры, такие как примеры multiprocessing.pool.Pool
не будут
работать в интерактивном интерпретатор. Например:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(Если попробовать это, то на самом деле будет выведено три полных трейсбэки, перемежающихся полуслучайным образом, а затем, возможно, придется как-то остановить родительский процесс.
Ссылка¶
Пакет multiprocessing
в основном реплицирует API модуля threading
.
Process
и исключения¶
-
class
multiprocessing.
Process
(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶ Объекты процесса представляют собой операцию, выполняемую в отдельном процессе. Класс
Process
имеет эквиваленты всех методовthreading.Thread
.Конструктор всегда должен вызываться с аргументами ключевой. group всегда должны быть
None
; он существует исключительно для совместимости сthreading.Thread
. target - вызываемый объект, вызываемый методомrun()
. По умолчанию он имеет значениеNone
, то есть ничего не называется. name - имя процесса (более подробную информацию см. в разделеname
). args - кортеж аргументов для целевого вызова. kwargs - словарь ключевой аргументов для целевого вызова. Если указано, только ключевой аргумент daemon задает для флага процессаdaemon
значениеTrue
илиFalse
. Если значение равноNone
(по умолчанию), этот флаг наследуется от процесса создания.По умолчанию аргументы не передаются target.
Если подкласс отвергает конструктора, он должен удостовериться, что призывает конструктора базового класса (
Process.__init__()
) прежде, чем сделать что-либо еще к процессу.Изменено в версии 3.3: Добавлен аргумент daemon.
-
run
()¶ Метод, представляющий активность процесса.
Этот метод можно переопределить в подкласс. Стандартный метод
run()
призывает подлежащий выкупу объект, переданный конструктору объекта как целевой аргумент, если таковые имеются, с последовательными и аргументами ключевой, взятыми от args и аргументов kwargs, соответственно.
-
start
()¶ Запустите действие процесса.
Это должно вызываться не более одного раза для каждого объекта процесса. Она обеспечивает возможность вызова метода
run()
объекта в отдельном процессе.
-
join
([timeout])¶ Если необязательным аргументом timeout является
None
(по умолчанию), метод блокируется до завершения процесса, методjoin()
которого вызывается. Если timeout - положительное число, оно блокируется максимум через timeout секунд. Обратите внимание, что метод возвращаетNone
, если его процесс завершается или время ожидания метода истекло. Проверьтеexitcode
процесса, чтобы определить его завершение.Процесс может быть объединен много раз.
Процесс не может присоединиться к самому себе, поскольку это приведет к взаимоблокировке. Ошибка при попытке присоединения к процессу перед его запуском.
-
name
¶ Имя процесса. Имя - строка используемый в идентификационных целях только. У него нет семантики. Нескольким процессам может быть присвоено одно и то же имя.
Начальное имя задается конструктором. Если для конструктора не указано явное имя, создается имя формы „Process-N:sub:1: N:sub:2:…: N:sub:k“, где каждый Nk является N-м потомком своего родителя.
-
is_alive
()¶ Возвращает жив ли процесс.
Примерно, объект процесса жив с момента возвращает метода
start()
до завершения дочернего процесса.
-
daemon
¶ Флаг демона процесса, логический значение. Это должно быть установлено, прежде чем
start()
называют.Начальный значение унаследован от процесса создания.
Когда процесс завершается, он пытается завершить все свои демонические дочерние процессы.
Обратите внимание, что демоническому процессу запрещено создавать дочерние процессы. В противном случае демонический процесс оставит своих потомков сиротами, если он завершится после завершения родительского процесса. Кроме того, это демоны или службы не Unix, это обычные процессы, которые будут прекращены (и не присоединены), если не демонические процессы вышли.
В дополнение к
threading.Thread
API объектыProcess
также поддерживают следующий атрибуты и methods:-
pid
¶ Возвращает идентификатор процесса. Перед началом процесса это будет
None
.
-
exitcode
¶ Выход ребенка код. Это будет
None
, если процесс еще не завершен. Отрицательный значение -N указывает на то, что дочерний объект был завершен сигналом N.
-
authkey
¶ Ключ аутентификации процесса (байтовая строка).
Когда
multiprocessing
инициализируется, основному процессу присваивается случайный строка с помощьюos.urandom()
.При создании объекта
Process
он наследует ключ аутентификации родительского процесса, хотя его можно изменить, установив для параметраauthkey
значение другого байта строка.См. Ключи аутентификации.
-
sentinel
¶ Числовой дескриптор системного объекта, который становится «готовым» по завершении процесса.
Вы можете использовать этот значение, если вы хотите ждать на нескольких событиях, сразу используя
multiprocessing.connection.wait()
. В противном случае вызовjoin()
проще.В Windows это дескриптор ОС, используемый с семейством вызовов API
WaitForSingleObject
иWaitForMultipleObjects
. На Unix это - файл дескриптор, применимый с примитивами от модуляselect
.Добавлено в версии 3.3.
-
terminate
()¶ Завершить процесс. В Unix это выполняется с помощью сигнала
SIGTERM
; на Windows используетсяTerminateProcess()
. Следует отметить, что обработчики exit и finally и т.д. выполняться не будут.Заметим, что потомки процесса не будут прекращены - они просто осиротеют.
Предупреждение
Если этот метод является используемый, когда связанный процесс использует пайп или очередь, то пайп или очередь может быть повреждена и может стать непригодной для использования другим процессом. Аналогично, если процесс приобрел блокировку или семафор и т.д., то завершение его может привести к взаимоблокировке других процессов.
-
kill
()¶ То же, что и
terminate()
, но с использованием сигналаSIGKILL
в Unix.Добавлено в версии 3.7.
-
close
()¶ Закрыть объект
Process
, освободив все связанные с ним ресурсы.ValueError
возникает, если базовый процесс все еще выполняется. Как толькоclose()
возвращает успешно, большинство других методов и атрибуты объектаProcess
поднимутValueError
.Добавлено в версии 3.7.
Обратите внимание, что
start()
,join()
,is_alive()
,terminate()
и методыexitcode
должен только вызвать процесс, который создал объект процесса.Пример использования некоторых методов
Process
:>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
-
-
exception
multiprocessing.
ProcessError
¶ Базовый класс всех исключений
multiprocessing
.
-
exception
multiprocessing.
BufferTooShort
¶ Исключение, вызванное
Connection.recv_bytes_into()
, когда предоставленный объект буфера слишком мал для чтения сообщения.Если
e
является сущностьBufferTooShort
, тоe.args[0]
передаст сообщение в виде байтовой строки.
-
exception
multiprocessing.
AuthenticationError
¶ Возникает при ошибке проверки подлинности.
-
exception
multiprocessing.
TimeoutError
¶ Вызывается методами с тайм-аутом по истечении тайм-аута.
Пайпы и очереди¶
При использовании нескольких процессов обычно используют передачу сообщений для связи между процессами и избегают необходимости использования каких-либо примитивов синхронизации, таких как блокировки.
Для передачи сообщений можно использовать Pipe()
(для соединения между
двумя процессами) или очередь (которая позволяет нескольким производителям и
потребителям).
Queue
, SimpleQueue
и типы JoinableQueue
- мультипроизводитель,
мультипотребитель очереди FIFO, смоделированные на классе queue.Queue
в
стандартной библиотеке. Они отличаются по тому Queue
, испытывает
недостаток в task_done()
и методах join()
, введенных в класс Python
2.5’s queue.Queue
.
Если вы используете JoinableQueue
, то вы должны вызвать JoinableQueue.task_done()
для
каждой задачи, удаленной из очереди, или семафор используемый для подсчета
количества незавершенных задач может в конечном итоге переполниться, вызывая
исключение.
Обратите внимание, что можно также создать общую очередь с помощью объекта менеджера - см. раздел Менеджеры.
Примечание
multiprocessing
использует обычные исключения queue.Empty
и queue.Full
для
сигнализации тайм-аута. Они недоступны в пространстве имен multiprocessing
, поэтому
их необходимо импортировать из queue
.
Примечание
Когда объект помещен на очередь, объект - pickled и фон поток более поздние потоки данные pickled к основному пайп. Это имеет некоторые последствия, которые немного удивительны, но не должны вызывать никаких практических трудностей - если они действительно беспокоят вас, то вы можете вместо этого использовать очередь, созданную с manager.
- После помещения объекта в пустую очередь может произойти бесконечная
задержка перед тем, как метод
empty()
очереди возвращаетFalse
иget_nowait()
может возвращает без повышенияqueue.Empty
. - если несколько процессов ставят в очередь объекты, возможно, чтобы объекты были приняты на другом конце вне порядка. Однако объекты, поставленные в очередь одним и тем же процессом, всегда будут находиться в ожидаемом порядке относительно друг друга.
Предупреждение
Если процесс был убит с помощью Process.terminate()
или os.kill()
во
время попытки использования Queue
, то данные в очереди, вероятно, будут
повреждены. Это может привести к тому, что любой другой процесс получит
исключение при попытке использовать очередь позже.
Предупреждение
Как упомянуто выше, если дочерний процесс поместил предметы на очередь (и у него
нет используемый JoinableQueue.cancel_join_thread
),
тогда тот процесс не закончится,
пока все буферизированные предметы не смылись (flushed) в пайп.
Это означает, что при попытке присоединения к этому процессу может возникнуть взаимоблокировка, если вы не уверены, что все элементы, помещенные в очередь, были израсходованы. Аналогично, если дочерний процесс не является демоническим, то родительский процесс может зависать на выходе, когда пытается присоединиться ко всем своим недемоническим потомкам.
Обратите внимание, что очередь, созданная с помощью менеджера, не имеет этой проблемы. См. Программирование рекомендаций.
Пример использования очередей для межпроцессной связи см. в разделе Примеры.
-
multiprocessing.
Pipe
([duplex])¶ Возвращает пару
(conn1, conn2)
объектовConnection
, представляющих концы пайпа.Если duplex равно
True
(по умолчанию), то пайп является двунаправленным. Если duplex -False
тогда, пайп однонаправлен:conn1
может только быть используемый для получения сообщений, иconn2
может только быть используемый для отправки сообщений.
-
class
multiprocessing.
Queue
([maxsize])¶ Возвращает общую очередь процесса, реализованную с использованием пайп и нескольких блокировок/семафоров. Когда процесс сначала помещает элемент в очередь, запускается питатель поток, который передает объекты из буфера в пайп.
Обычные исключения
queue.Empty
иqueue.Full
из модуляqueue
стандартной библиотеки вызываются тайм-аутами сигнала.Queue
реализует все методыqueue.Queue
, кромеtask_done()
иjoin()
.-
qsize
()¶ Возвращает приблизительный размер очереди. Из-за семантики многопоточной/многопроцессорной обработки это число не является надежным.
Обратите внимание, что это может привести к появлению
NotImplementedError
на платформах Unix, таких как Mac OS X, гдеsem_getvalue()
не реализован.
-
empty
()¶ Возвращает
True
, если очередь пуста,False
иначе. Из-за семантики многопоточной/многопроцессорной обработки это ненадежно.
-
full
()¶ Возвращает
True
, если очередь полна,False
иначе. Из-за семантики многопоточной/многопроцессорной обработки это ненадежно.
-
put
(obj[, block[, timeout]])¶ Поместить объект в очередь. Если дополнительный аргумент block имеет значение
True
(по умолчанию), а timeout - значениеNone
(по умолчанию), при необходимости блокируйте до тех пор, пока свободный слот не будет доступен. Если timeout является положительным числом, он блокирует максимум timeout секунд и вызывает исключениеqueue.Full
, если свободный слот не был доступен в течение этого времени. В противном случае (block -False
) Поместить элемент в очередь, если свободный слот немедленно доступен, иначе вызовите исключениеqueue.Full
(timeout игнорируется в этом случае).Изменено в версии 3.8: Если очередь закрыта,
ValueError
поднят вместоAssertionError
.
-
put_nowait
(obj)¶ Эквивалентный
put(obj, False)
.
-
get
([block[, timeout]])¶ Удаление и возвращает элемента из очереди. Если необязательными args block является
True
(по умолчанию), а timeout -None
(по умолчанию), при необходимости блокируйте до тех пор, пока элемент не будет доступен. Если timeout является положительным числом, он блокирует максимум timeout секунд и вызывает исключениеqueue.Empty
, если за это время не было доступно ни одного элемента. В противном случае (блок -False
), возвращает элемент, если он немедленно доступен, иначе вызовите исключениеqueue.Empty
(в этом случае timeout игнорируется).Изменено в версии 3.8: Если очередь закрыта,
ValueError
поднят вместоOSError
.
-
get_nowait
()¶ Эквивалентный
get(False)
.
multiprocessing.Queue
имеет несколько дополнительных методов, не найденных вqueue.Queue
. Эти методы обычно не нужны для большинства код:-
close
()¶ Укажите, что текущий процесс больше не будет помещать данные в эту очередь. Фоновый поток завершит работу после очистки всех буферизованных данных в пайп. Это вызывается автоматически при сборе мусора в очередь.
-
join_thread
()¶ Присоединиться к фоновому потоку. Это может только быть используемый после того, как вызван
close()
. Это блокирует до фона выходы поток, гарантируя, что все данные в буфере смылись (flushed) в пайп.По умолчанию, если процесс не является создателем очереди, при выходе он попытается присоединиться к фоновому поток очереди. Процесс может вызвать
cancel_join_thread()
, чтобы заставитьjoin_thread()
ничего не делать.
-
cancel_join_thread
()¶ Препятствуйте тому, чтобы
join_thread()
блокировал. В частности, это предотвращает автоматическое соединение фоновых поток при завершении процесса – см. разделjoin_thread()
.Лучшее имя для этого метода может быть
allow_exit_without_flush()
. Вероятно, это приведет к потере поставленных в очередь данных, и вам почти наверняка не потребуется их использовать. Это действительно только в том случае, если необходимо, чтобы текущий процесс немедленно вышел, не дожидаясь очистки поставленных в очередь данных на основной пайп, и вы не заботитесь о потерянных данных.
Примечание
Функциональность этого класса требует функциональной реализации общего семафора в операционной системе хоста. Без него функциональность этого класса будет отключена, и попытки создания экземпляра
Queue
приведут к появлениюImportError
. Дополнительные сведения см. в разделе bpo-3770. То же самое относится и к любому из перечисленных ниже типов специализированных очередей.-
-
class
multiprocessing.
SimpleQueue
¶ Это упрощенный тип
Queue
, очень близкий к заблокированномуPipe
.-
empty
()¶ Возвращает
True
, если очередь пуста,False
иначе.
-
get
()¶ Удаление и возвращает элемента из очереди.
-
put
(item)¶ Поместить item в очередь.
-
-
class
multiprocessing.
JoinableQueue
([maxsize])¶ JoinableQueue
,Queue
подкласс, представляет собой очередь, которая дополнительно содержит методыtask_done()
иjoin()
.-
task_done
()¶ Укажите, что ранее поставленная в очередь задача завершена. Используется потребителями очереди. Для каждого
get()
используемый, чтобы выбрать задачу, последующий вызовtask_done()
сообщает очереди, что обработка задачи завершена.Если
join()
в настоящее время заблокирует, то он возобновит, когда все предметы были обработаны (подразумевать, что требованиеtask_done()
было получено для каждого предмета, который былput()
в очередь).Вызывает
ValueError
, если вызывается больше раз, чем было элементов, помещенных в очередь.
-
join
()¶ Блокировать до тех пор, пока все элементы в очереди не будут получены и обработаны.
Количество незавершенных задач увеличивается при каждом добавлении элемента в очередь. Счетчик снижается всякий раз, когда потребитель звонит
task_done()
, чтобы указать, что элемент был извлечен, и вся работа над ним завершена. Когда число незавершенных задач падает до нуля,join()
разблокирует.
-
Разное¶
-
multiprocessing.
active_children
()¶ Возвращает список всех живых потомков текущего процесса.
Вызов этого метода имеет побочный эффект «присоединения» к любым уже завершенным процессам.
-
multiprocessing.
cpu_count
()¶ Возвращает количество ЦПУ в системе.
Это число не эквивалентно числу ЦПУ, которые может использовать текущий процесс. Количество используемых ЦПУ можно получить с помощью
len(os.sched_getaffinity(0))
Может поднять
NotImplementedError
.См.также
-
multiprocessing.
current_process
()¶ Возвращает объект
Process
, соответствующий текущему процессу.Аналог
threading.current_thread()
.
-
multiprocessing.
parent_process
()¶ Возвращает объект
Process
, соответствующий родительскому процессуcurrent_process()
. Для основного процессаparent_process
будетNone
.Добавлено в версии 3.8.
-
multiprocessing.
freeze_support
()¶ Добавление поддержки в том случае, если программа, использующая
multiprocessing
, была заморожена для создания исполняемого файла Windows. (Был проверен с помощью py2exe, PyInstaller и cx_Freeze.)Эту функцию необходимо вызвать непосредственно после
if __name__ == '__main__'
линии основного модуля. Например:from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
Если строка
freeze_support()
опущена, то попытка запуска замороженного исполняемого файла вызоветRuntimeError
.freeze_support()
запроса не имеет никакого эффекта, когда он призван на любую операционную систему кроме Windows. Кроме того, если модулем управляет обычно Python интерпретатор на Windows (программа не была заморожена), тоfreeze_support()
не имеет никакого эффекта.
-
multiprocessing.
get_all_start_methods
()¶ Возвращает список поддерживаемых методов запуска, первым из которых является метод по умолчанию. Возможные методы запуска:
'fork'
,'spawn'
и'forkserver'
. В Windows доступен только'spawn'
. На Unix'fork'
и'spawn'
всегда поддерживаются, при этом'fork'
- дефолт.Добавлено в версии 3.4.
-
multiprocessing.
get_context
(method=None)¶ Возвращает объект контекст, имеющий ту же атрибуты, что и модуль
multiprocessing
.Если method -
None
тогда дефолт, контекст - возвращенный. В противном случае method должны быть'fork'
,'spawn'
,'forkserver'
.ValueError
возникает, если указанный метод запуска недоступен.Добавлено в версии 3.4.
-
multiprocessing.
get_start_method
(allow_none=False)¶ Возвращает имя метода запуска используемый для запуска процессов.
Если метод начала не был исправлен, и allow_none ложный, то метод начала прикреплен к дефолту, и имя - возвращенный. Если метод запуска не был фиксирован, а allow_none имеет значение true, то
None
имеет значение возвращенный.возвращает значение может быть
'fork'
,'spawn'
,'forkserver'
илиNone
.'fork'
является значением по умолчанию в Unix, а'spawn'
- значением по умолчанию в Windows.Добавлено в версии 3.4.
-
multiprocessing.
set_executable
()¶ Задает путь к Python интерпретатор, который будет использоваться при запуске дочернего процесса. (По умолчанию
sys.executable
- используемый). Встраивающие устройства, вероятно, должны будут сделать что-то вроде:set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
прежде чем они смогут создавать дочерние процессы.
Изменено в версии 3.4: Теперь поддержанный на Unix, когда метод начала
'spawn'
- используемый.
-
multiprocessing.
set_start_method
(method)¶ Установите метод, который должен быть используемый, чтобы начать дочерние процессы. method может быть
'fork'
,'spawn'
или'forkserver'
.Следует отметить, что это должно вызываться не более одного раза и должно быть защищено внутри
if __name__ == '__main__'
клаузула основного модуля.Добавлено в версии 3.4.
Примечание
multiprocessing
не содержит аналогов threading.active_count()
, threading.enumerate()
, threading.settrace()
,
threading.setprofile()
, threading.Timer
или threading.local
.
Объекты связи¶
Объекты соединения позволяют отправлять и принимать выбираемые объекты или строки. Они могут рассматриваться как связанные сокеты, ориентированные на сообщение.
Объекты соединения обычно создаются с помощью Pipe
– см. также раздел
Слушатели и клиенты.
-
class
multiprocessing.connection.
Connection
¶ -
send
(obj)¶ Отправьте объект на другой конец соединения, который следует прочитать с помощью команды
recv()
.Объект должен быть подбираемым. Очень большие огурцы (приблизительно 32 MiB +, хотя это зависит от ОС) могут вызвать исключение
ValueError
.
-
recv
()¶ Возвращает объект, отправленный с другого конца соединения с помощью команды
send()
. Блокируется, пока не будет что получить. ПоднимаетEOFError
, если ничего не осталось получить, а другой конец был закрыт.
-
fileno
()¶ Возвращает файл дескриптор или дескриптор используемый по соединению.
-
close
()¶ Закрыть связь.
Это вызывается автоматически при сборе мусора соединения.
-
poll
([timeout])¶ Возвращает, имеются ли данные, доступные для чтения.
Если timeout не указан, то он будет возвращает немедленно. Если timeout - число, это указывает максимальное время блокировки в секундах. Если timeout
None
, то бесконечное время ожидания равно используемый.Следует отметить, что несколько объектов соединения могут опрашиваться одновременно с помощью функции
multiprocessing.connection.wait()
.
-
send_bytes
(buffer[, offset[, size]])¶ Отправка байтовых данных из байтоподобного объекта в виде полного сообщения.
Если задано значение offset, то данные считываются из этой позиции в buffer. Если задано значение size, то многие байты будут считываться из буфера. Очень большие буферы (приблизительно 32 MiB +, хотя это зависит от ОС) могут вызвать исключение
ValueError
-
recv_bytes
([maxlength])¶ Возвращает полное сообщение байтовых данных, отправленное с другого конца соединения в виде строка. Блокируется, пока не будет что получить. Поднимает
EOFError
, если ничего не осталось получить, а другой конец закрылся.Если задано значение maxlength, и сообщение длиннее maxlength, то
OSError
возникает, и соединение будет недоступно для чтения.
-
recv_bytes_into
(buffer[, offset])¶ Считывание в buffer полного сообщения байтовых данных, отправленных с другого конца соединения, и возвращает количества байтов в сообщении. Блокируется, пока не будет что получить. Поднимает
EOFError
, если ничего не осталось получить, а другой конец был закрыт.buffer должен быть записываемым байтоподобным объектом. Если задано значение offset, то сообщение будет записано в буфер из этой позиции. Смещение должно быть неотрицательным целым числом, меньшим длины buffer (в байтах).
Если буфер слишком короткий, то возникает исключение
BufferTooShort
и полное сообщение доступно какe.args[0]
гдеe
- исключение сущность.
Изменено в версии 3.3: Сами объекты соединения теперь могут переноситься между процессами с помощью
Connection.send()
иConnection.recv()
.Добавлено в версии 3.3: Объекты соединения теперь поддерживают протокол управления контекст - см. раздел Типы менеджера контекста.
__enter__()
возвращает объект подключения, а__exit__()
вызываетclose()
.-
Например:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Предупреждение
Метод Connection.recv()
автоматически отключает полученные данные, что
может быть угрозой безопасности, если вы не можете доверять процессу,
отправившему сообщение.
Поэтому, если объект подключения не был создан с помощью Pipe()
, следует
использовать методы recv()
и send()
только
после выполнения той или иной аутентификации. См. Ключи аутентификации.
Предупреждение
Если процесс погибает, когда он пытается прочитать или записать в пайп, то данные в пайп, вероятно, будут повреждены, поскольку может оказаться невозможным убедиться, где лежат границы сообщения.
Примитивы синхронизации¶
Обычно примитивы синхронизации не так необходимы в многопроцессорной программе,
как в многопоточной программе. См. документацию по модулю threading
.
Обратите внимание, что можно также создать примитивы синхронизации с помощью объекта менеджера - см. раздел Менеджеры.
-
class
multiprocessing.
Barrier
(parties[, action[, timeout]])¶ Объект барьер: клон
threading.Barrier
.Добавлено в версии 3.3.
-
class
multiprocessing.
BoundedSemaphore
([value])¶ Барьерный семафорный объект: близкий аналог
threading.BoundedSemaphore
.Существует одиночное отличие от его близкого аналога: первый аргумент его метода
acquire
называется block, как это согласуется сLock.acquire()
.Примечание
В Mac OS X это неотличимо от
Semaphore
, потому чтоsem_getvalue()
не реализована на этой платформе.
-
class
multiprocessing.
Condition
([lock])¶ Переменная условия: алиас для
threading.Condition
.Если задано значение lock, то это должен быть объект
Lock
илиRLock
изmultiprocessing
.Изменено в версии 3.3: Добавлен метод
wait_for()
.
-
class
multiprocessing.
Event
¶ Клон
threading.Event
.
-
class
multiprocessing.
Lock
¶ Нерекурсивный объект блокировки: близкий аналог
threading.Lock
. Как только процесс или поток приобрел блокировку, последующие попытки получить ее от любого процесса или поток будут блокироваться до тех пор, пока она не будет освобождена; любой процесс или поток может освободить его. Понятия и поведениеthreading.Lock
в том виде, в каком они применяются к потоки, воспроизводятся здесь вmultiprocessing.Lock
, как это применимо к процессам или потоки, за исключением отмеченных.Обратите внимание, что
Lock
- на самом деле фабричная функция, какой возвращает сущностьmultiprocessing.synchronize.Lock
инициализировал с дефолтом контекст.Lock
поддерживает протокол контекстного менеджера и таким образом может быть используемый вwith
инструкции.-
acquire
(block=True, timeout=None)¶ Получить блокировку, блокировку или неблокировку.
Если для аргумента block установлено значение
True
(значение по умолчанию), вызов метода будет блокироваться до тех пор, пока блокировка не окажется в разблокированном состояние, а затем будет установлен параметр locked и возвращаетTrue
. Обратите внимание, что имя этого первого аргумента отличается от имени вthreading.Lock.acquire()
.Если для аргумента block установлено значение
False
, вызов метода не блокируется. Если блокировка в данный момент находится в заблокированном состояние, возвращаетFalse
; в противном случае установите блокировку на заблокированный состояние и возвращаетTrue
.При вызове с положительным значение с плавающей запятой для timeout блок максимум на количество секунд, указанное параметром timeout, если блокировка не может быть получена. Просьбы с отрицательным значение для timeout эквивалентны timeout ноля. Вызовы с timeout значение
None
(по умолчанию) устанавливают период тайм-аута как бесконечный. Отметим, что лечение отрицательного илиNone
значения для timeout отличается от реализованного поведения вthreading.Lock.acquire()
. Аргумент timeout не имеет практических последствий, если аргумент block имеет значениеFalse
и, таким образом, игнорируется. ВозвращаетTrue
если блокировка была приобретена илиFalse
, если истек период ожидания.
-
release
()¶ Отпустите замок. Это может быть вызвано из любого процесса или поток, а не только из процесса или поток, которые первоначально приобрели блокировку.
Поведение аналогично поведению в
threading.Lock.release()
, за исключением того, что при вызове на разблокированной блокировке возникаетValueError
.
-
-
class
multiprocessing.
RLock
¶ Объект рекурсивной блокировки: близкий аналог
threading.RLock
. Рекурсивная блокировка должна быть разблокирована процессом или поток, получившим ее. Как только процесс или поток приобрел рекурсивную блокировку, тот же самый процесс или поток может снова получить ее без блокировки; этот процесс или поток должны выпускать его один раз в каждый раз, когда он был приобретен.Обратите внимание, что
RLock
- на самом деле фабричная функция, какой возвращает сущностьmultiprocessing.synchronize.RLock
инициализировал с дефолтом контекст.RLock
поддерживает протокол контекстного менеджера и таким образом может быть используемый вwith
инструкции.-
acquire
(block=True, timeout=None)¶ Получить блокировку, блокировку или неблокировку.
При вызове с аргументом block, имеющим значение
True
, блокировать до тех пор, пока блокировка не окажется в разблокированном состояние (не принадлежащем какому-либо процессу или поток), пока блокировка уже не принадлежит текущему процессу или поток. Текущий процесс или поток затем получает право собственности на блокировку (если она еще не имеет права собственности), а уровень рекурсии внутри блокировки увеличивается на единицу, что приводит к возвращает значениеTrue
. Обратите внимание, что в поведении этого первого аргумента существует несколько различий по сравнению с реализациейthreading.RLock.acquire()
, начиная с имени самого аргумента.При вызове с аргументом block, имеющим значение
False
, не блокируйте. Если замок был уже приобретен (и таким образом принадлежит) другим процессом или поток, текущий процесс или поток не берут собственность, и уровень рекурсии в замке не изменен, приведя к возвращает значениеFalse
. Если блокировка находится в разблокированном состояние, текущий процесс или поток становится владельцем и уровень рекурсии увеличивается, что приводит к возвращает значениеTrue
.Использование и поведение аргумента timeout такие же, как и в
Lock.acquire()
. Обратите внимание, что некоторые из этих поведений timeout отличаются от реализованных поведений вthreading.RLock.acquire()
.
-
release
()¶ Разблокируйте блокировку, уменьшая уровень рекурсии. Если после уменьшения уровень рекурсии равен нулю, сбросьте блокировку на разблокированную (не принадлежащую какому-либо процессу или поток) и если какие-либо другие процессы или потоки заблокированы в ожидании разблокировки блокировки, разрешите выполнение только одного из них. Если после уменьшения уровень рекурсии остается ненулевым, блокировка остается заблокированной и принадлежит вызывающему процессу или поток.
Только назовите этот метод, когда процесс запроса или поток будут владеть замком.
AssertionError
поднят, если этот метод называют процесс или поток кроме владельца или если замок находится в незапертом (ненаходящемся в собственности) состояние. Обратите внимание, что тип исключения, вызванного в этой ситуации, отличается от реализованного поведения вthreading.RLock.release()
.
-
-
class
multiprocessing.
Semaphore
([value])¶ Семафорный объект: близкий аналог
threading.Semaphore
.Существует одиночное отличие от его близкого аналога: первый аргумент его метода
acquire
называется block, как это согласуется сLock.acquire()
.
Примечание
В Mac OS X sem_timedwait
не поддерживается, поэтому вызов acquire()
с тайм-аутом
будет эмулировать поведение этой функции с помощью спящего цикла.
Примечание
Если сигнал SIGINT, сгенерированный Ctrl-C, прибудет, в то время как
главный поток заблокирован требованием к BoundedSemaphore.acquire()
, Lock.acquire()
,
RLock.acquire()
, Semaphore.acquire()
, Condition.acquire()
или Condition.wait()
тогда, требование будет
немедленно прервано, и KeyboardInterrupt
будет поднят.
Это отличается от поведения threading
, где SIGINT будет игнорироваться во
время выполнения эквивалентных блокирующих вызовов.
Примечание
Некоторые функциональные возможности этого пакета требуют функциональной
реализации общего семафора в операционной системе хоста. Без него модуль
multiprocessing.synchronize
будет отключен, и попытки его импорта приведут к появлению
ImportError
. Дополнительные сведения см. в разделе bpo-3770.
Разделенные объекты ctypes
¶
С помощью общей памяти можно создавать общие объекты, которые могут наследоваться дочерними процессами.
-
multiprocessing.
Value
(typecode_or_type, *args, lock=True)¶ Возвращает объект
ctypes
, выделенный из общей памяти. По умолчанию возвращает значение фактически является синхронизированной оболочкой для объекта. К самому объекту можно получить доступ через value атрибутValue
.typecode_or_type определяет тип объекта возвращенный: это - или тип ctypes или один символ typecode доброго используемый модулем
array
. *args передан конструктору для типа.Если lock равно
True
(по умолчанию), создается новый объект рекурсивной блокировки для синхронизации доступа к значение. Если lock является объектомLock
илиRLock
, то это будет используемый для синхронизации доступа к значение. Если lock будетFalse
тогда, то доступ к объекту возвращенный не будет автоматически защищен замком, таким образом, это не обязательно будет «безопасно от процесса».Операции как
+=
, которые включают прочитанный и пишут, не атомные. Так что, если для сущность, вы хотите атомарно увеличить общий значение это недостаточно, чтобы просто сделать:counter.value += 1
Предполагая, что связанная блокировка является рекурсивной (что по умолчанию), можно сделать:
with counter.get_lock(): counter.value += 1
Обратите внимание, что lock является только ключевым аргументом.
-
multiprocessing.
Array
(typecode_or_type, size_or_initializer, *, lock=True)¶ Возвращает массив ctypes, выделенный из общей памяти. По умолчанию возвращает значение фактически является синхронизированной обёрткой для массива.
typecode_or_type определяет тип элементов множества возвращенный: это - или тип ctypes или один символ typecode доброго используемый модулем
array
. Если size_or_initializer является целым числом, то он определяет длину массива, и массив будет первоначально обнулен. Иначе size_or_initializer - последовательность, которая является используемый, чтобы инициализировать множество и чья длина определяет длину множества.Если lock равно
True
(по умолчанию), создается новый объект блокировки для синхронизации доступа к значение. Если lock является объектомLock
илиRLock
, то это будет используемый для синхронизации доступа к значение. Если lock будетFalse
тогда, то доступ к объекту возвращенный не будет автоматически защищен замком, таким образом, это не обязательно будет «безопасно от процесса».Обратите внимание, что lock - ключевой только аргумент.
Обратите внимание, что у множества
ctypes.c_char
есть value и raw атрибуты, которые позволяют использовать его, чтобы сохранить и восстановить строки.
Менеджеры¶
Менеджеры предоставляют способ создания данных, которые могут совместно использоваться различными процессами, включая совместное использование по сети между процессами, работающими на разных машинах. Объект-менеджер управляет серверным процессом, который управляет shared objects. Другие процессы могут обращаться к общим объектам с помощью прокси.
Возвращает начатый объект
SyncManager
, который может быть используемый для разделения объектов между процессами. Объект менеджера возвращенный соответствует порожденному дочернему процессу и имеет методы, которые создадут общие объекты и соответствующие возвращаемое прокси.
Процессы менеджера будут остановлены сразу после сбора мусора или завершения
родительского процесса. Классы менеджера определяются в модуле multiprocessing.managers
:
-
class
multiprocessing.managers.
BaseManager
([address[, authkey]])¶ Создайте объект BaseManager.
После создания необходимо вызвать
start()
илиget_server().serve_forever()
, чтобы гарантировать, что объект менеджера ссылается на запущенный процесс менеджера.address - адрес, по которому процесс менеджера прослушивает новые подключения. Если address
None
, то выбирается произвольная.authkey - ключ идентификации, который будет используемый, чтобы проверить законность поступающих связей с процессом сервера. Если authkey -
None
тогда,current_process().authkey
- используемый. В противном случае authkey является используемый и должен быть байтом строка.-
start
([initializer[, initargs]])¶ Начните подпроцессы, чтобы начать менеджера. Если initializer не является
None
, то подпроцессы вызоветinitializer(*initargs)
при запуске.
-
get_server
()¶ Возвращает объект
Server
, представляющий фактический сервер под управлением менеджера. ОбъектServer
поддерживает методserve_forever()
:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
дополнительно содержитaddress
атрибут.
-
connect
()¶ Подключить объект локального менеджера к процессу удаленного менеджера:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
-
shutdown
()¶ Остановить процесс используемый менеджером. Это только доступно, если
start()
был используемый, чтобы начать процесс сервера.Это можно вызвать несколько раз.
-
register
(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶ classmethod, который может быть используемый для регистрации типа или подлежащий выкупу с классом менеджера.
typeid - «идентификатор типа», который является используемый, чтобы определить особый тип общего объекта. Это должно быть строка.
callable является вызываемым используемый для создания объектов для этого идентификатора типа. Если диспетчер сущность будет подключен к серверу с помощью метода
connect()
или если аргумент create_method имеет значениеFalse
, его можно оставить какNone
.proxytype является подкласс
BaseProxy
, который используемый для создания прокси для общих объектов с этим typeid. ЕслиNone
, то класс прокси создается автоматически.exposed - используемый, чтобы определить последовательность вызова метода, которые прокси для этого typeid нужно разрешить доступу, используя
BaseProxy._callmethod()
. (Если exposed -None
тогда,proxytype._exposed_
- используемый вместо этого, если он существует.) в случае, если не указан список разоблаченных объектов, будут доступны все «публичные методы» общего объекта. (Здесь «метод публичный» означает любой атрибут, у которого есть метод__call__()
и чьё имя не начинается с'_'
.)method_to_typeid - отображение используемый, чтобы определить тип возвращает тех выставленных методов, которые должны возвращает прокси. Это наносит на карту названия метода к typeid строки. (Если method_to_typeid -
None
тогда,proxytype._method_to_typeid_
- используемый вместо этого, если он существует.) Если вызов метода не будет ключом этого отображения или если отображение будетNone
тогда объект, то возвращенный методом будет скопирован значение.create_method определяет, должен ли метод быть создан с именем typeid, который может быть используемый, чтобы сказать процессу сервера создавать новый общий объект и возвращает прокси для него. По умолчанию это значение равно
True
.
BaseManager
сущности также имеют одно свойство только для чтения:-
address
¶ Адрес, используемый менеджером.
Изменено в версии 3.3: Объекты менеджера поддерживают протокол управления контекст - см. раздел Типы менеджера контекста.
__enter__()
запускает процесс сервера (если он еще не запущен), а затем возвращает объект диспетчера.__exit__()
вызовыshutdown()
.В предыдущих версиях
__enter__()
не запускал серверный процесс диспетчера, если он еще не был запущен.-
-
class
multiprocessing.managers.
SyncManager
¶ Подкласс
BaseManager
, который может быть используемый для синхронизации процессов. Объекты этого типа возвращенныйmultiprocessing.Manager()
.Его методы обычно создают и возвращает Прокси объекты для многих типы данных используемый, которые будут синхронизированы через процессы. Это, в частности, включает общие списки и словари.
-
Barrier
(parties[, action[, timeout]])¶ Создание общего объекта
threading.Barrier
и возвращает прокси для него.Добавлено в версии 3.3.
-
BoundedSemaphore
([value])¶ Создание общего объекта
threading.BoundedSemaphore
и возвращает прокси для него.
-
Condition
([lock])¶ Создание общего объекта
threading.Condition
и возвращает прокси для него.Если задано значение lock, то оно должно быть прокси для объекта
threading.Lock
илиthreading.RLock
.Изменено в версии 3.3: Добавлен метод
wait_for()
.
-
Event
()¶ Создание общего объекта
threading.Event
и возвращает прокси для него.
-
Lock
()¶ Создание общего объекта
threading.Lock
и возвращает прокси для него.
-
Queue
([maxsize])¶ Создание общего объекта
queue.Queue
и возвращает прокси для него.
-
RLock
()¶ Создание общего объекта
threading.RLock
и возвращает прокси для него.
-
Semaphore
([value])¶ Создание общего объекта
threading.Semaphore
и возвращает прокси для него.
-
Array
(typecode, sequence)¶ Создайте массив и возвращает прокси-сервер для него.
-
Value
(typecode, value)¶ Создайте объект с возможностью записи
value
атрибут и возвращает прокси для него.
Изменено в версии 3.6: Общие объекты могут быть вложены. Например, объект общего контейнера, такой как общий список, может содержать другие общие объекты, которые будут управляться и синхронизироваться
SyncManager
.-
-
class
multiprocessing.managers.
Namespace
¶ Тип, который может регистрироваться в
SyncManager
.Объект пространства имен не имеет публичных методов, но содержит записываемые атрибуты. Его представление показывает значения его атрибутов.
Однако при использовании прокси для объекта пространства имен атрибут, начинающийся с
'_'
, будет атрибут прокси, а не атрибут ссылки:>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # это атрибут прокси >>> print(Global) Namespace(x=10, y='hello')
Настраиваемые менеджеры¶
Для создания собственного менеджера создается подкласс BaseManager
и используется
классметод register()
для регистрации новых типов или вызываемых объектов в
классе менеджера. Например:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
Использование удаленного менеджера¶
Можно запускать сервер менеджера на одной машине и использовать его клиентами с других машин (при условии, что соответствующие брандмауэры разрешают это).
Выполнение следующих команд создает сервер для одной общей очереди, к которой могут иметь доступ удаленные клиенты:
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Один клиент может получить доступ к серверу следующим образом:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
Его также может использовать другой клиент:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
Локальные процессы также могут обращаться к этой очереди, используя код сверху на клиенте для удаленного доступа к ней:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super(Worker, self).__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Прокси объекты¶
Прокси - объект, какой относится к общему объекту, который живет (по- видимому), в другом процессе. Говорят, что общий объект является ссылкой прокси. Несколько прокси-объектов могут иметь один и тот же ссылочный объект.
Прокси-объект содержат методы, которые вызывают соответствующие методы его ссылки (хотя не каждый метод ссылки обязательно будет доступен через прокси). Таким образом, прокси может быть используемый так же, как его референт:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Заметьте, что применение str()
прокси будет возвращает представление
референта, тогда как применение repr()
будет возвращает представление
прокси.
Важной особенностью прокси-объектов является то, что они могут быть переданы между процессами. Таким образом, референт может содержать Прокси объекты. Это разрешает вложение этих управляемых списков, словари и другого Прокси объекты:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # ссылка a теперь содержит ссылку b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
Аналогично, прокси словарь и списка могут быть вложены друг в друга:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
Если стандартные (не прокси) list
или dict
объекты содержатся в
ссылке, изменения этих изменяемых значения не будут распространяться через
менеджер, поскольку прокси не может знать, когда изменятся содержащиеся в них
значения. Однако хранение значение в контейнерном прокси (который
вызывает __setitem__
на объекте по доверенности) действительно размножается
через менеджера и так эффективно изменить такой предмет, можно было повторно
назначить измененный значение контейнерному прокси:
# создать список прокси и добавить изменяемый объект (словарь)
lproxy = manager.list()
lproxy.append({})
# теперь изменяемый словарь
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# на этом этапе изменения d еще не синхронизируются, но при обновлении словаря
# прокси уведомляется об изменении
lproxy[0] = d
Этот подход, возможно, менее удобен, чем использование вложенных Прокси объекты в большинстве случаев использования, но также демонстрирует уровень управления синхронизацией.
Примечание
Прокси-типы в multiprocessing
не поддерживают сравнения по значение. Итак, для
сущность у нас есть:
>>> manager.list([1,2,3]) == [1,2,3]
False
Вместо этого следует использовать копию ссылки при проведении сравнений.
-
class
multiprocessing.managers.
BaseProxy
¶ Объекты по доверенности - сущности подклассы
BaseProxy
.-
_callmethod
(methodname[, args[, kwds]])¶ Вызов и возвращает результата метода ссылки прокси.
Если
proxy
- прокси, ссылочным элементом которого являетсяobj
, то выражение:proxy._callmethod(methodname, args, kwds)
вычислит выражение:
getattr(obj, methodname)(*args, **kwds)
в процессе менеджере.
возвращенный значение будет копией результата вызова или прокси для нового общего объекта - см. документацию для аргумента method_to_typeid
BaseManager.register()
.Если в результате вызова возникает исключение, то вызов повторно инициируется функцией
_callmethod()
. Если в процессе менеджера возникает другое исключение, оно преобразуется в исключениеRemoteError
и инициируется_callmethod()
.Обратите внимание, в частности, на то, что исключение будет создано, если methodname не было exposed.
Пример использования
_callmethod()
:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
-
_getvalue
()¶ Возвращает копию референта.
Если референт не может быть выбран, это вызовет исключение.
-
__repr__
()¶ Возвращает представление прокси-объекта.
-
__str__
()¶ Возвращает представление референта.
-
Очистка¶
Объект по доверенности использует weakref колбэк так, чтобы, когда это собрало мусор, он вычеркнул из списка себя от менеджера, который владеет его референтом.
Общий объект удаляется из процесса диспетчера, если на него больше не ссылается ни один прокси.
Процессные пулы¶
Можно создать пул процессов, который будет выполнять задачи, переданные ему с
классом Pool
.
-
class
multiprocessing.pool.
Pool
([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶ Объект пула процессов, который управляет пулом рабочих процессов, в который могут отправляться задания. Он поддерживает асинхронные результаты с тайм-аутами и колбэки и имеет параллельную реализацию карты.
processes - количество используемых рабочих процессов. Если processes
None
, то число возвращенный поos.cpu_count()
равно используемый.Если initializer не является
None
, то каждый рабочий процесс будет вызыватьinitializer(*initargs)
при запуске.maxtasksperchild - количество задач, которые рабочий процесс может выполнить до выхода и замены новым рабочим процессом для освобождения неиспользуемых ресурсов. Дефолт maxtasksperchild -
None
, что означает процессы рабочего, будет жить пока бассейн.context может быть используемый, чтобы определить контекст используемый для старта процессов рабочего. Обычно пул создается с помощью функции
multiprocessing.Pool()
или методаPool()
объекта контекст. В обоих случаях context устанавливается соответствующим образом.Следует отметить, что методы объекта пула должны вызываться только процессом, создавшим пул.
Предупреждение
multiprocessing.pool
объекты имеют внутренние ресурсы, которыми необходимо правильно управлять (как и любой другой ресурс), используя пул в качестве менеджера контекст или вызываяclose()
иterminate()
вручную. Неспособность сделать это может привести к зависанию процесса при завершении.Обратите внимание, что это не верно, чтобы полагаться на сборщик мусора, чтобы уничтожить пул, так как CPython не гарантирует, что финализатор пула будет вызван (см.
object.__del__()
для получения дополнительной информации).Добавлено в версии 3.2: maxtasksperchild
Добавлено в версии 3.4: context
Примечание
Рабочие процессы внутри
Pool
обычно работают в течение всей продолжительности рабочей очереди пула. Частый шаблон, встречающийся в других системах (таких как Apache, mod_wsgi и т.д.) для освобождения ресурсов, находящихся в распоряжении работников, заключается в том, чтобы позволить работнику в пуле выполнить только определенный объем работы перед выходом, очисткой и новым процессом, который должен заменить старый. Аргумент maxtasksperchildPool
предоставляет эту возможность конечному пользователю.-
apply
(func[, args[, kwds]])¶ Вызовите func с аргументами args и ключевой аргументы kwds. Он блокируется до тех пор, пока результат не будет готов. Учитывая эти блоки,
apply_async()
лучше подходит для выполнения работы параллельно. Кроме того, func выполняется только в одном из работников пула.
-
apply_async
(func[, args[, kwds[, callback[, error_callback]]]])¶ Вариант метода
apply()
, который возвращает объектAsyncResult
.Если задан параметр callback, то он должен быть вызываемым и принимать один аргумент. То, когда результат становится готовым callback, применено к нему, это - то, если неудавшееся требование, в этом случае error_callback не применен вместо этого.
Если задан параметр error_callback, то он должен быть вызываемым и принимать один аргумент. Если целевая функция терпит неудачу, то error_callback называют за исключением сущность.
Обратные вызовы должны быть завершены немедленно, поскольку в противном случае поток, обрабатывающая результаты, будет заблокирована.
-
map
(func, iterable[, chunksize])¶ Параллельный эквивалент встроенной функции
map()
(однако он поддерживает только один аргумент iterable для нескольких итерабелей см.starmap()
). Он блокируется до тех пор, пока результат не будет готов.Этот метод разбивает итабль на несколько чанки, которые он передает в пул процессов как отдельные задачи. (Приблизительный) размер этих чанки можно задать, установив для параметра chunksize положительное целое число.
Обратите внимание, что это может привести к интенсивному использованию памяти для очень длинных итераблей. Рассмотрите возможность использования
imap()
илиimap_unordered()
с явным параметром chunksize для повышения эффективности.
-
map_async
(func, iterable[, chunksize[, callback[, error_callback]]])¶ Вариант метода
map()
, который возвращает объектAsyncResult
.Если задан параметр callback, то он должен быть вызываемым и принимать один аргумент. То, когда результат становится готовым callback, применено к нему, это - то, если неудавшееся требование, в этом случае error_callback не применен вместо этого.
Если задан параметр error_callback, то он должен быть вызываемым и принимать один аргумент. Если целевая функция терпит неудачу, то error_callback называют за исключением сущность.
Обратные вызовы должны быть завершены немедленно, поскольку в противном случае поток, обрабатывающая результаты, будет заблокирована.
-
imap
(func, iterable[, chunksize])¶ Более ленивый вариант
map()
.Аргумент chunksize совпадает с аргументом используемый методом
map()
. Очень долго iterables использование большого значение для chunksize может заставить работу закончить гораздо быстрее, чем использование дефолта значение1
.Также если chunksize является
1
, то методnext()
итератора возвращенный методомimap()
имеет необязательный параметр timeout:next(timeout)
подниметmultiprocessing.TimeoutError
, если результат не может быть возвращенный в течение timeout секунд.
-
imap_unordered
(func, iterable[, chunksize])¶ То же как
imap()
за исключением того, что заказ результатов возвращенный iterator нужно считать произвольным. (Только при наличии только одного рабочего процесса порядок гарантирован как «правильный».
-
starmap
(func, iterable[, chunksize])¶ Как
map()
за исключением того, что элементы iterable, как ожидают, будут iterables, которые распакованы как аргументы.Следовательно iterable
[(1,2), (3, 4)]
приводит к[func(1,2), func(3,4)]
.Добавлено в версии 3.3.
-
starmap_async
(func, iterable[, chunksize[, callback[, error_callback]]])¶ Комбинация
starmap()
иmap_async()
, которая итерирует по iterable итераблей и вызывает func с итераблями распакованными. Возвращает объект результата.Добавлено в версии 3.3.
-
close
()¶ Запрещает отправку дополнительных задач в пул. После завершения всех задач рабочие процессы завершаются.
-
terminate
()¶ Немедленная остановка рабочих процессов без завершения незавершенной работы. То, когда объект бассейна - мусор, собралось,
terminate()
немедленно назовут.
-
join
()¶ Дождитесь завершения рабочих процессов. Перед использованием
close()
необходимо вызватьterminate()
илиjoin()
.
Добавлено в версии 3.3: Объекты пула теперь поддерживают протокол управления контекст - см. раздел Типы менеджера контекста.
__enter__()
возвращает объект пула,__exit__()
вызываетterminate()
.-
-
class
multiprocessing.pool.
AsyncResult
¶ Класс результата возвращенный по
Pool.apply_async()
иPool.map_async()
.-
get
([timeout])¶ Возвращает результат при его поступлении. Если timeout не является
None
и результат не поступает в течение timeout секунд, тоmultiprocessing.TimeoutError
поднимается. Если удаленное требование подняло исключение тогда, что исключение будет повторно поднятоget()
.
-
wait
([timeout])¶ Подождите, пока результат будет доступен, или до истечения timeout секунд.
-
ready
()¶ Возвращает, завершен ли вызов.
-
successful
()¶ Возвращает, выполнен ли вызов без возникновения исключения. Поднимет
ValueError
, если результат не готов.Изменено в версии 3.7: Если результат не готов,
ValueError
поднимается вместоAssertionError
.
-
В следующем примере показано использование пула:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # запустить 4 процесса воркера
result = pool.apply_async(f, (10,)) # асинхронно вычислить "f(10)" в одном процессе
print(result.get(timeout=1)) # печатает "100", если только ваш компьютер не работает *очень* медленно
print(pool.map(f, range(10))) # печать "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # печать "0"
print(next(it)) # печать "1"
print(it.next(timeout=1)) # печать "4" если ваш компьютер не *очень* медленный
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # поднять multiprocessing.TimeoutError
Слушатели и клиенты¶
Обычно сообщение, проходящее между процессами, сделано, используя очереди, или
при помощи Connection
возражает возвращенный Pipe()
.
Однако модуль multiprocessing.connection
обеспечивает некоторую гибкость. Он в основном дает
высокоуровневый ориентированный на сообщения API для работы с сокеты или
Windows с именем пайпы. У этого также есть поддержка digest authentication, используя
модуль hmac
, и опроса многочисленных связей одновременно.
-
multiprocessing.connection.
deliver_challenge
(connection, authkey)¶ Отправка случайно сгенерированного сообщения на другой конец соединения и ожидание ответа.
Если ответ соответствует дайджесту сообщения с использованием клавиши authkey, то приветственное сообщение отправляется на другой конец соединения. В противном случае поднимается
AuthenticationError
.
-
multiprocessing.connection.
answer_challenge
(connection, authkey)¶ Получение сообщения, вычисление дайджеста сообщения с использованием authkey в качестве ключа, а затем отправка дайджеста обратно.
Если приветственное сообщение не получено, возникает сообщение
AuthenticationError
.
-
multiprocessing.connection.
Client
(address[, family[, authkey]])¶ Попытка установить соединение с прослушивателем, который использует адрес address, возвращая
Connection
.Тип соединения определяется аргументом family, но обычно его можно опустить, поскольку его обычно можно вывести из формата address. (См. Форматы адреса)
Если authkey задан, а не None, он должен быть байтом строка и будет используемый в качестве секретного ключа для вызова аутентификации на основе HMAC. Аутентификация не выполняется, если authkey является None.
AuthenticationError
возникает в случае сбоя аутентификации. См. Ключи аутентификации.
-
class
multiprocessing.connection.
Listener
([address[, family[, backlog[, authkey]]]])¶ Обертка для связанной сокет или Windows с именем пайп, которая «слушает» подключения.
address - адрес, который должен быть используемый связанным сокет или именованным пайп объекта прослушивателя.
Примечание
Если адрес „0.0.0.0“ будет используемый, то адрес не будет соединяемой конечной точкой на Windows. Если требуется соединяемая конечная точка, следует использовать „127.0.0.1“.
family - тип используемого сокет (или именованного пайп). Это может быть один из строки
'AF_INET'
(для TCP сокет),'AF_UNIX'
(для домена Unix сокет) или'AF_PIPE'
(для Windows с именем пайп). Из них только первый гарантированно доступен. Если familyNone
, то семейство выводится из формата address. Если address также являетсяNone
, то выбирается значение по умолчанию. Это семейство по умолчанию считается самым быстрым из доступных. См. Форматы адреса. Обратите внимание, что если family является'AF_UNIX'
и адресNone
, то сокет будет создан в частном временном каталоге, созданном с помощьюtempfile.mkstemp()
.Если объект прослушивателя использует сокет, то backlog (по умолчанию 1) передается методу
listen()
сокет после его привязки.Если authkey задан, а не None, он должен быть байтом строка и будет используемый в качестве секретного ключа для вызова аутентификации на основе HMAC. Аутентификация не выполняется, если authkey является None.
AuthenticationError
возникает в случае сбоя аутентификации. См. Ключи аутентификации.-
accept
()¶ Принять подключение на связанном сокете или именованный пайп объекта слушателя и возвращает объект
Connection
. Если идентификация предпринята и терпит неудачу, то поднимаетсяAuthenticationError
.
-
close
()¶ Закрыть связанный сокет или именованный пайп объекта слушателя. Это вызывается автоматически при сборе мусора прослушивателя. Однако желательно называть его явно.
Объекты прослушивателя имеют следующие свойства только для чтения:
-
address
¶ Адрес, используемый объектом Listener.
-
last_accepted
¶ Адрес, с которого пришло последнее принятое соединение. Если это недоступно, то это
None
.
Добавлено в версии 3.3: Объекты прослушивателя теперь поддерживают протокол управления контекст - см. раздел Типы менеджера контекста.
__enter__()
возвращает объект прослушивателя, а__exit__()
вызываетclose()
.-
-
multiprocessing.connection.
wait
(object_list, timeout=None)¶ Подождите, пока объект в object_list будет готов. Возвращает список тех объектов в object_list, которые готовы. Если timeout является float, то вызов блокируется максимум на столько секунд. Если timeout будет
None
тогда, то он заблокирует в течение неограниченного периода. Отрицательное время ожидания эквивалентно нулевому времени ожидания.Как для Unix, так и для Windows объект может отображаться в object_list, если это так
- читаемый объект
Connection
; - подключенный и читаемый объект
socket.socket
; или sentinel
атрибут объектаProcess
.
Соединение или объект сокет готов, когда имеются данные, доступные для чтения из него, или другой конец закрыт.
Unix:
wait(object_list, timeout)
почти эквивалентныйselect.select(object_list, [], [], timeout)
. Разница в том, что, еслиselect.select()
прерывается сигналом, он может поднятьOSError
с числом ошибокEINTR
, тогда какwait()
не будет.Windows: Элемент в object_list должен или быть целочисленным обработчиком, который ожидает (waitable) (согласно определению используемый документацией функции Win32
WaitForMultipleObjects()
), или это может быть объект с методомfileno()
, с каким возвращает ручка сокет или пайп обращаются. (Обратите внимание, что дескрипторы пайпа и сокет не являются дескрипторами ожидания.Добавлено в версии 3.3.
- читаемый объект
Примеры
Следующий код сервера создает слушателя, который использует 'secret password'
в
качестве ключа идентификации. Затем он ожидает подключения и передает некоторые
данные клиенту:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # семейство выведено как 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
Следующая код подключается к серверу и получает некоторые данные от сервера:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
Следующая код использует wait()
для ожидания сообщений от
нескольких процессов одновременно:
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# Мы закрываем записываемый конец пайп, чтобы убедиться, что p является
# единственным процессом, который владеет дескриптором для него. Это гарантирует,
# что когда p закроет свой дескриптор для записываемого конца, wait()
# быстро сообщит о читаемом конце как о прочитанном.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
Форматы адреса¶
- Адрес
'AF_INET'
- кортеж формы(hostname, port)
, где hostname - строка, а port - целое число. - Адрес
'AF_UNIX'
- это строка, представляющий имя файла в файловой системе. - Адрес
'AF_PIPE'
представляет собой строку видаr'\\.\pipe{PipeName}'
. Чтобы использоватьClient()
для подключения к именованному каналу на удаленном компьютере с именем ServerName, вместо этого следует использовать адрес формыr'\ServerName\pipe{PipeName}'
.
Обратите внимание, что любая строка, начинающаяся с двух обратных косых
черт, по умолчанию принимается как 'AF_PIPE'
адрес, а не 'AF_UNIX'
адрес.
Ключи аутентификации¶
Когда каждый использует Connection.recv
, полученными данными является автоматически
unpickled. К сожалению, отмена извлечения данных из ненадежного источника
является угрозой безопасности. Поэтому Listener
и Client()
используйте
модуль hmac
для обеспечения дайджест-аутентификации.
Ключ аутентификации - это байт строка, который можно рассматривать как пароль: после установления соединения оба конца потребуют подтверждения того, что другой знает ключ аутентификации. (Демонстрирующий то, что оба конца используют тот же ключ, делает не, включают отправку ключа по связи.)
Если идентификацию запрошенный, но никакой ключ идентификации не определен
тогда, возвращает значение current_process().authkey
- используемый (см. Process
). Этот
значение будет автоматически унаследован любым объектом Process
, что
текущий процесс создает. Это означает, что (по умолчанию) все процессы
многопроцессорной программы будут совместно использовать один ключ
аутентификации, который может быть используемый при установке соединений между
собой.
Подходящие ключи аутентификации также могут быть сгенерированы с использованием
os.urandom()
.
Логирование¶
Доступна некоторая поддержка логирование. Однако следует отметить, что пакет
logging
не использует общие блокировки процессов, что позволяет (в
зависимости от типа обработчик) смешивать сообщения из различных процессов.
-
multiprocessing.
get_logger
()¶ Возвращает логгер используемый по
multiprocessing
. При необходимости будет создан новый.При первом создании логгер имеет уровень
logging.NOTSET
и не имеет обработчик по умолчанию. Сообщения, отправляемые этому логгер, по умолчанию не распространяются на корневой логгер.Обратите внимание, что в Windows дочерние процессы наследуют только уровень логгер родительского процесса - любая другая настройка логгер не наследуется.
-
multiprocessing.
log_to_stderr
()¶ Эта функция выполняет вызов
get_logger()
, но в дополнение к возвращению логгер, созданного get_logger, она добавляет обработчик, который отправляет вывод вsys.stderr
, используя формат'[%(levelname)s/%(processName)s] %(message)s'
.
Ниже приведен пример сеанса с включенной функцией логирование:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
Полную таблицу уровней логирование см. в модуле logging
.
Модуль multiprocessing.dummy
¶
multiprocessing.dummy
копирует API multiprocessing
, но является не
более чем оболочкой для модуля threading
.
В частности, функция Pool
, предоставляемая multiprocessing.dummy
,
возвращает экземпляр ThreadPool
, который является подклассом
Pool
, который поддерживает все те же вызовы методов, но использует пул
рабочих потоков, а не рабочих процессов.
-
class
multiprocessing.pool.
ThreadPool
([processes[, initializer[, initargs]]])¶ Объект пула потоков, который управляет пулом рабочих потоков, которым могут быть отправлены задания. Экземпляры
ThreadPool
полностью совместимы по интерфейсу с экземплярамиPool
, и их ресурсами также необходимо правильно управлять, используя пул в качестве диспетчера контекста или вызываяclose()
иterminate()
вручную.processes - количество используемых рабочих потоков. Если processes -
None
, то используется номер, возвращенныйos.cpu_count()
.Если initializer не
None
, то каждый рабочий процесс при запуске будет вызыватьinitializer(*initargs)
.В отличие от
Pool
, maxtasksperchild и context не могут быть предоставлены.Примечание
ThreadPool
имеет тот же интерфейс, что иPool
, который разработан на основе пула процессов и предшествует введению модуляconcurrent.futures
. Таким образом, он наследует некоторые операции, которые не имеют смысла для пула, поддерживаемого потоками, и имеет свой собственный тип для представления состояния асинхронных заданий,AsyncResult
, который не понимается никакими другими библиотеками.Пользователям обычно следует использовать
concurrent.futures.ThreadPoolExecutor
, который имеет более простой интерфейс, изначально спроектированный для потоков, и который возвращает экземплярыconcurrent.futures.Future
, совместимые со многими другими библиотеками, включаяasyncio
.
Программирование рекомендаций¶
Существуют определенные руководящие принципы и идиомы, которых следует
придерживаться при использовании multiprocessing
.
Все методы начала¶
Следующее относится ко всем методам запуска.
Избегайте разделенного состояние
Насколько это возможно, следует стараться избегать переноса больших объемов данных между процессами.
Вероятно, лучше всего использовать очереди или пайпы для обмена данными между процессами, чем использовать примитивы синхронизации более низкого уровня.
Picklability
Убедитесь в том, что аргументы для методов прокси-серверов доступны.
Безопасность потоков прокси
Не используйте прокси-объект из нескольких поток, если вы не защитите его с помощью блокировки.
(Никогда нет проблемы с различными процессами, используя прокси same.)
Присоединение к процессам зомби
В Unix, когда процесс заканчивается, но не был присоединен, он становится зомби. Никогда не должно быть очень многих, потому что каждый раз новый процесс начинается (илиactive_children()
называют), все законченные процессы, к которым еще не присоединились, будет присоединен. Кроме того, вызовProcess.is_alive
завершенного процесса присоединится к процессу. Тем не менее, это, вероятно, хорошая практика, чтобы явно присоединиться ко всем процессам, которые вы начинаете.
Лучше наследовать, чем pickle/unpickle
При использовании методов запуска spawn или forkserver многие типы изmultiprocessing
должны быть подбираемыми, чтобы их могли использовать дочерние процессы. Однако обычно следует избегать отправки общих объектов другим процессам с использованием пайпов или очередей. Вместо этого следует упорядочить программу так, чтобы процесс, которому необходим доступ к общему ресурсу, созданному в другом месте, мог наследовать его от предшествующего процесса.
Постарайтесь не заканчивать процессы
Используя метод
Process.terminate
, чтобы остановить процесс склонно заставить любые общие ресурсы (такие как замки, семафоры, пайпы и очереди) в настоящее время являющийся используемый процессом становиться сломанными или недоступными к другим процессам.Поэтому, вероятно, лучше только рассмотреть использование
Process.terminate
на процессах, которые никогда не используют общих ресурсов.
Объединение процессов, использующих очереди
Следует иметь в виду, что процесс, поставивший элементы в очередь, будет ждать завершения до тех пор, пока все буферизированные элементы не будут поданы «питателем» поток в нижележащий пайп. (Дочерний процесс может вызвать метод
Queue.cancel_join_thread
очереди, чтобы избежать такого поведения.Это означает, что при каждом использовании очереди необходимо убедиться, что все элементы, помещенные в очередь, будут удалены до присоединения процесса. В противном случае вы не можете быть уверены, что процессы, поставившие элементы в очередь, завершатся. Также следует помнить, что недемонические процессы будут присоединяться автоматически.
Примером взаимоблокировки является следующее:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # это мертвая блокировка obj = queue.get()Фиксация здесь должна была бы обменять последние две линии (или просто удалить линию
p.join()
).
Явная передача ресурсов дочерним процессам
На Unix, используя метод начала fork, дочерний процесс может использовать общий ресурс, созданный в родительском процессе, используя глобальный ресурс. Однако лучше передать объект в качестве аргумента конструктору для дочернего процесса.
Кроме обеспечения совместимости код (потенциально) с Windows и другими методами запуска, это также гарантирует, что до тех пор, пока дочерний процесс еще жив, объект не будет мусором собран в родительском процессе. Это может быть важно, если какой-либо ресурс освобождается при сборе мусора объекта в родительском процессе.
Таким образом для сущность:
from multiprocessing import Process, Lock def f(): ... сделать что-то используя "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()следует переписать как:
from multiprocessing import Process, Lock def f(l): ... сделать что-нибудь, используя "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
Остерегайтесь замены sys.stdin
на «файл, как объект»
multiprocessing
изначально безоговорочно вызвается:os.close(sys.stdin.fileno())в методе
multiprocessing.Process._bootstrap()
— это привело к проблемам с процессами в процессах. Это было изменено на:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)Который решает основную проблему процессов, сталкивающихся друг с другом приводящим к плохой ошибке файла дескриптор, но вводит потенциальную опасность для заявлений, которые заменяют
sys.stdin()
«подобным файлу объектом» с буферизующей продукцией. Эта опасность заключается в том, что если несколько процессов вызываютclose()
на этом файловом объекте, это может привести к тому, что одни и те же данные будут очищены на объекте несколько раз, что приведет к повреждению.Если вы пишете подобный файлу объект и осуществляете ваш собственный кэширование, вы можете сделать его безопасным от вилки, храня pid каждый раз, когда вы прилагаете к кэш и отказу от кэш, когда pid изменяется. Например:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cacheДополнительные сведения см. в разделах bpo-5155, bpo-5313 и bpo-5331
Методы запуска spawn и forkserver¶
Есть некоторые дополнительное ограничение, которые не относятся к методу начала fork.
Больше picklability
Убедитесь, что все аргументы дляProcess.__init__()
доступны. Кроме того, если вы, подклассProcess
тогда удостоверяется, что сущности будет picklable, когда методProcess.start
назовут.
Глобальные переменные
Принять во внимание, что, если пробег код в дочернем процессе пытается получить доступ к глобальной переменной, то значение он видит (если таковые имеются) может не совпасть с значение в родительском процессе в то время, когда вызван
Process.start
.Однако глобальные переменные, являющиеся только константами уровня модуля, не вызывают проблем.
Безопасный импорт основного модуля
Удостоверьтесь, что главный модуль может быть безопасно импортирован новым Python интерпретатор, не вызывая непреднамеренные побочные эффекты (такой старт нового процесса).
Например, использование метода начала spawn или forkserver, управляющего следующим модулем, потерпело бы неудачу с
RuntimeError
:from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()Вместо этого следует защитить «точку входа» программы, используя
if __name__ == '__main__':
следующим образом:from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(Строка
freeze_support()
может быть опущена, если программа будет запущена нормально, а не заморожена.Это позволяет недавно порожденному Python интерпретатор безопасно импортировать модуль и затем управлять функцией
foo()
модуля.Аналогичные ограничения применяются в случае создания пула или менеджера в главном модуле.
Примеры¶
Демонстрация создания и использования настраиваемых менеджеров и прокси:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
Использование Pool
:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
Пример использования очередей для подачи задач в коллекцию рабочих процессов и сбора результатов: