multiprocessing — Процессный параллелизм

Исходный код: Lib/multiprocessing/


Введение

multiprocessing - это пакет, который поддерживает порождение процессов с использованием API, похожим на модуль threading. Пакет multiprocessing предлагает как локальный, так и удаленный параллелизм, эффективно обходя Глобальную блокировку интерпретатора с использованием подпроцессов вместо потоков. Также модуль multiprocessing позволяет программисту полностью использовать несколько процессоров на данном компьютере. Работает как на Unix, так и на Windows.

Модуль multiprocessing также вводит API, которые не имеет аналогов в модуле threading. Ярким примером этого является объект Pool, который предлагает удобное средство параллелирования выполнения функции по множеству входных значениц, распределения входных данных по процессам (параллелизм данных). Следующий пример демонстрирует обычную практику определения таких функций в модуле, чтобы дочерние процессы могли успешно импортировать этот модуль. Этот базовый пример параллелизма данных с использованием Pool:

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

будет напечатано в стандартный вывод:

[1, 4, 9]

Класс Process

В multiprocessing процессы порождаются созданием объекта Process, а затем вызовом его метода start(). Process следует API threading.Thread. Тривиальный пример многопроцессорной программы:

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Ниже приведен развернутый пример, показывающий соответствующие идентификаторы отдельных процессов:

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Объяснение причины необходимости if __name__ == '__main__' части см. в разделе Программирование рекомендаций.

Контексты и методы запуска

В зависимости от платформы, multiprocessing поддерживает три способа запуска процесса. Далее перечислены методы запуска.

spawn

Родительский процесс запускает новый процесс интерпретатора Python. Дочерний процесс унаследует только те ресурсы, которые необходимы для выполнения метода run() объекта процесса. В частности, ненужные файловые дескрипторы и дескрипторы родительского процесса не будут унаследованы. Запуск процесса с использованием этого метода довольно медленный по сравнению с использованием fork или forkserver.

Доступно в Unix и Windows. Значение по умолчанию для Windows и macOS.

fork

Родительский процесс использует os.fork() для форсирования Python интерпретатор. Дочерний процесс, когда он начинается, фактически идентичен родительскому процессу. Все ресурсы родителя наследуются дочерним процессом. Следует отметить, что безопасная форсировка многопоточного процесса является проблематичной.

Доступно только в Unix. Значение по умолчанию в Unix.

forkserver

Когда программа начинает и выбирает метод начала forkserver, процесс сервера, начат. С этого момента всякий раз, когда требуется новый процесс, родительский процесс подключается к серверу и запрашивает, чтобы он форсировал новый процесс. Процесс сервера вилки является однопотоковым, поэтому он безопасен для использования os.fork(). Ненужные ресурсы не наследуются.

Доступный на платформах Unix, которые поддерживают мимолетный файл дескрипторы по Unix пайпы.

Изменено в версии 3.8: На macOS метод начала spawn - теперь дефолт. Метод запуска fork следует считать небезопасным, поскольку он может привести к сбоям подпроцессы. См. bpo-33725.

Изменено в версии 3.4: spawn добавлены на всех платформах unix и forkserver добавлены для некоторых платформ unix. Дочерние процессы больше не наследуют все унаследованные родительские дескрипторы в Windows.

В Unix с помощью методов запуска spawn или forkserver также запускается процесс resource tracker, который отслеживает несвязанные именованные системные ресурсы (такие как именованные семафоры или SharedMemory объекты), созданные процессами программы. После завершения всех процессов средство отслеживания ресурсов отменяет связь с любым оставшимся отслеживаемым объектом. Обычно их не должно быть, но если какой-либо процесс был убит сигналом, то может возникнуть некоторая «утечка» ресурсов. (Ни просочившиеся семафоры, ни сегменты общей памяти не будут автоматически отключены до следующей перезагрузки. Это проблематично для обоих объектов, поскольку система допускает только ограниченное количество именованных семафоров, а сегменты совместно используемой памяти занимают некоторое место в основной памяти.)

Для выбора метода запуска используется set_start_method() в if __name__ == '__main__' клаузула основного модуля. Например:

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() не должны быть используемый более одного раза в программе.

Кроме того, можно использовать get_context() для получения объекта контекст. Объекты контекста имеют тот же API, что и модуль многопроцессорной обработки, и позволяют использовать несколько методов запуска в одной программе:

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

Обратите внимание, что объекты, связанные с одним контекст, могут быть несовместимы с процессами для другого контекст. В частности, блокировки, созданные с помощью fork контекст, не могут быть переданы процессам, запущенным с помощью методов запуска spawn или forkserver.

Библиотека, которая хочет использовать конкретный метод запуска, должна, вероятно, использовать get_context(), чтобы избежать вмешательства в выбор пользователя библиотеки.

Предупреждение

Стартовые методы 'spawn' и 'forkserver' в настоящее время не могут быть используемый с «замороженными» исполняемыми файлами (т.е. двоичными файлами, создаваемыми пакетами, такими как PyInstaller and cx_Freeze) в Unix. Стартовый метод 'fork' работает.

Обмен объектами между процессами

multiprocessing поддерживает два типа каналов связи между процессами:

Очереди

Класс Queue является близким клоном queue.Queue. Например:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # распечатает "[42, None, 'hello']"
    p.join()

Очереди поток и безопасны для обработки.

Пайпы

Pipe() функционируют возвращает пара объектов связи, связанных пайп, который по умолчанию является (двухсторонним) дуплексом. Например:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # распечатает "[42, None, 'hello']"
    p.join()

Два объекта соединения возвращенный Pipe() представляют два конца пайп. Каждый объект подключения имеет методы send() и recv() (среди прочих). Обратите внимание, что данные в пайп могут быть повреждены, если два процесса (или потоки) пытаются одновременно считать или записывать в same конец пайп. Конечно, нет риска коррупции от процессов, использующих разные концы пайп одновременно.

Синхронизация между процессами

multiprocessing содержит эквиваленты всех примитивов синхронизации из threading. Для сущность можно использовать блокировку для обеспечения одновременной печати только одного процесса в стандартный вывод:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

Без использования блокировки выходные данные различных процессов могут быть смешаны.

Совместное использование состояние между процессами

Как упомянуто выше, когда выполнение параллельного программирования его является обычно лучшим избегать использования разделенного состояние в максимально возможной степени. Это особенно верно при использовании нескольких процессов.

Однако, если вам действительно нужно использовать некоторые общие данные, то multiprocessing предоставляет пару способов сделать это.

Общая память

Данные могут храниться в карте совместно используемой памяти с помощью Value или Array. Например, следующие код:

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

напечатает:

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Аргументы 'd' и 'i' используемый при создании num и arr являются типкодами вида используемый модулем array: 'd' указывает двойную точность float и 'i' обозначает целое число со знаком. Эти общие объекты будут процессом и потокобезопасной.

Для большей гибкости в использовании совместно используемой памяти можно использовать модуль multiprocessing.sharedctypes, поддерживающий создание объектов произвольных ctypes, выделенных из совместно используемой памяти.

Серверный процесс

Объект менеджера возвращенный Manager() управляет серверным процессом, который содержит объекты Python и позволяет другим процессам манипулировать ими с помощью прокси.

Менеджер возвращенный Manager() поддержит типы list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value и Array. Например,:

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

напечатает:

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Менеджеры серверных процессов более гибки, чем использование общих объектов памяти, поскольку они могут быть созданы для поддержки произвольных типов объектов. Кроме того, один менеджер может совместно использоваться процессами на разных компьютерах по сети. Однако они работают медленнее, чем совместно используемая память.

Использование пула воркеров

Класс Pool представляет пул рабочих процессов. Он содержит методы, позволяющие выгружать задачи в рабочие процессы несколькими различными способами.

Например:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # запуск 4 рабочих процессов
    with Pool(processes=4) as pool:

        # печать "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # печатать одинаковые числа в произвольном порядке
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # вычислить "f(20)" асинхронно
        res = pool.apply_async(f, (20,))      # запускается в *только* одном процессе
        print(res.get(timeout=1))             # печатает "400"

        # вычислить "os.getpid()" асинхронно
        res = pool.apply_async(os.getpid, ()) # запускается в *только* одном процессе
        print(res.get(timeout=1))             # печатает PID этого процесса

        # запуск нескольких оценок асинхронно *может* использовать больше процессов
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # уснуть одному рабочему на 10 секунд
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # выход из блока with с остановкой пула
    print("Now the pool is closed and no longer available")

Обратите внимание, что методы бассейна должны только когда-либо быть используемый процессом, который создал его.

Примечание

Функциональность этого пакета требует, чтобы модуль __main__ был импортирован нижестоящими элементами. Это освещено в Программирование рекомендаций, однако здесь стоит указать. Это означает, что некоторые примеры, такие как примеры multiprocessing.pool.Pool не будут работать в интерактивном интерпретатор. Например:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> with p:
...   p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(Если попробовать это, то на самом деле будет выведено три полных трейсбэки, перемежающихся полуслучайным образом, а затем, возможно, придется как-то остановить родительский процесс.

Ссылка

Пакет multiprocessing в основном реплицирует API модуля threading.

Process и исключения

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Объекты процесса представляют собой операцию, выполняемую в отдельном процессе. Класс Process имеет эквиваленты всех методов threading.Thread.

Конструктор всегда должен вызываться с аргументами ключевой. group всегда должны быть None; он существует исключительно для совместимости с threading.Thread. target - вызываемый объект, вызываемый методом run(). По умолчанию он имеет значение None, то есть ничего не называется. name - имя процесса (более подробную информацию см. в разделе name). args - кортеж аргументов для целевого вызова. kwargs - словарь ключевой аргументов для целевого вызова. Если указано, только ключевой аргумент daemon задает для флага процесса daemon значение True или False. Если значение равно None (по умолчанию), этот флаг наследуется от процесса создания.

По умолчанию аргументы не передаются target.

Если подкласс отвергает конструктора, он должен удостовериться, что призывает конструктора базового класса (Process.__init__()) прежде, чем сделать что-либо еще к процессу.

Изменено в версии 3.3: Добавлен аргумент daemon.

run()

Метод, представляющий активность процесса.

Этот метод можно переопределить в подкласс. Стандартный метод run() призывает подлежащий выкупу объект, переданный конструктору объекта как целевой аргумент, если таковые имеются, с последовательными и аргументами ключевой, взятыми от args и аргументов kwargs, соответственно.

start()

Запустите действие процесса.

Это должно вызываться не более одного раза для каждого объекта процесса. Она обеспечивает возможность вызова метода run() объекта в отдельном процессе.

join([timeout])

Если необязательным аргументом timeout является None (по умолчанию), метод блокируется до завершения процесса, метод join() которого вызывается. Если timeout - положительное число, оно блокируется максимум через timeout секунд. Обратите внимание, что метод возвращает None, если его процесс завершается или время ожидания метода истекло. Проверьте exitcode процесса, чтобы определить его завершение.

Процесс может быть объединен много раз.

Процесс не может присоединиться к самому себе, поскольку это приведет к взаимоблокировке. Ошибка при попытке присоединения к процессу перед его запуском.

name

Имя процесса. Имя - строка используемый в идентификационных целях только. У него нет семантики. Нескольким процессам может быть присвоено одно и то же имя.

Начальное имя задается конструктором. Если для конструктора не указано явное имя, создается имя формы „Process-N:sub:1: N:sub:2:…: N:sub:k“, где каждый Nk является N-м потомком своего родителя.

is_alive()

Возвращает жив ли процесс.

Примерно, объект процесса жив с момента возвращает метода start() до завершения дочернего процесса.

daemon

Флаг демона процесса, логический значение. Это должно быть установлено, прежде чем start() называют.

Начальный значение унаследован от процесса создания.

Когда процесс завершается, он пытается завершить все свои демонические дочерние процессы.

Обратите внимание, что демоническому процессу запрещено создавать дочерние процессы. В противном случае демонический процесс оставит своих потомков сиротами, если он завершится после завершения родительского процесса. Кроме того, это демоны или службы не Unix, это обычные процессы, которые будут прекращены (и не присоединены), если не демонические процессы вышли.

В дополнение к threading.Thread API объекты Process также поддерживают следующий атрибуты и methods:

pid

Возвращает идентификатор процесса. Перед началом процесса это будет None.

exitcode

Выход ребенка код. Это будет None, если процесс еще не завершен. Отрицательный значение -N указывает на то, что дочерний объект был завершен сигналом N.

authkey

Ключ аутентификации процесса (байтовая строка).

Когда multiprocessing инициализируется, основному процессу присваивается случайный строка с помощью os.urandom().

При создании объекта Process он наследует ключ аутентификации родительского процесса, хотя его можно изменить, установив для параметра authkey значение другого байта строка.

См. Ключи аутентификации.

sentinel

Числовой дескриптор системного объекта, который становится «готовым» по завершении процесса.

Вы можете использовать этот значение, если вы хотите ждать на нескольких событиях, сразу используя multiprocessing.connection.wait(). В противном случае вызов join() проще.

В Windows это дескриптор ОС, используемый с семейством вызовов API WaitForSingleObject и WaitForMultipleObjects. На Unix это - файл дескриптор, применимый с примитивами от модуля select.

Добавлено в версии 3.3.

terminate()

Завершить процесс. В Unix это выполняется с помощью сигнала SIGTERM; на Windows используется TerminateProcess(). Следует отметить, что обработчики exit и finally и т.д. выполняться не будут.

Заметим, что потомки процесса не будут прекращены - они просто осиротеют.

Предупреждение

Если этот метод является используемый, когда связанный процесс использует пайп или очередь, то пайп или очередь может быть повреждена и может стать непригодной для использования другим процессом. Аналогично, если процесс приобрел блокировку или семафор и т.д., то завершение его может привести к взаимоблокировке других процессов.

kill()

То же, что и terminate(), но с использованием сигнала SIGKILL в Unix.

Добавлено в версии 3.7.

close()

Закрыть объект Process, освободив все связанные с ним ресурсы. ValueError возникает, если базовый процесс все еще выполняется. Как только close() возвращает успешно, большинство других методов и атрибуты объекта Process поднимут ValueError.

Добавлено в версии 3.7.

Обратите внимание, что start(), join(), is_alive(), terminate() и методы exitcode должен только вызвать процесс, который создал объект процесса.

Пример использования некоторых методов Process:

 >>> import multiprocessing, time, signal
 >>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
 >>> print(p, p.is_alive())
 <Process ... initial> False
 >>> p.start()
 >>> print(p, p.is_alive())
 <Process ... started> True
 >>> p.terminate()
 >>> time.sleep(0.1)
 >>> print(p, p.is_alive())
 <Process ... stopped exitcode=-SIGTERM> False
 >>> p.exitcode == -signal.SIGTERM
 True
exception multiprocessing.ProcessError

Базовый класс всех исключений multiprocessing.

exception multiprocessing.BufferTooShort

Исключение, вызванное Connection.recv_bytes_into(), когда предоставленный объект буфера слишком мал для чтения сообщения.

Если e является сущность BufferTooShort, то e.args[0] передаст сообщение в виде байтовой строки.

exception multiprocessing.AuthenticationError

Возникает при ошибке проверки подлинности.

exception multiprocessing.TimeoutError

Вызывается методами с тайм-аутом по истечении тайм-аута.

Пайпы и очереди

При использовании нескольких процессов обычно используют передачу сообщений для связи между процессами и избегают необходимости использования каких-либо примитивов синхронизации, таких как блокировки.

Для передачи сообщений можно использовать Pipe() (для соединения между двумя процессами) или очередь (которая позволяет нескольким производителям и потребителям).

Queue, SimpleQueue и типы JoinableQueue - мультипроизводитель, мультипотребитель очереди FIFO, смоделированные на классе queue.Queue в стандартной библиотеке. Они отличаются по тому Queue, испытывает недостаток в task_done() и методах join(), введенных в класс Python 2.5’s queue.Queue.

Если вы используете JoinableQueue, то вы должны вызвать JoinableQueue.task_done() для каждой задачи, удаленной из очереди, или семафор используемый для подсчета количества незавершенных задач может в конечном итоге переполниться, вызывая исключение.

Обратите внимание, что можно также создать общую очередь с помощью объекта менеджера - см. раздел Менеджеры.

Примечание

multiprocessing использует обычные исключения queue.Empty и queue.Full для сигнализации тайм-аута. Они недоступны в пространстве имен multiprocessing, поэтому их необходимо импортировать из queue.

Примечание

Когда объект помещен на очередь, объект - pickled и фон поток более поздние потоки данные pickled к основному пайп. Это имеет некоторые последствия, которые немного удивительны, но не должны вызывать никаких практических трудностей - если они действительно беспокоят вас, то вы можете вместо этого использовать очередь, созданную с manager.

  1. После помещения объекта в пустую очередь может произойти бесконечная задержка перед тем, как метод empty() очереди возвращает False и get_nowait() может возвращает без повышения queue.Empty.
  2. если несколько процессов ставят в очередь объекты, возможно, чтобы объекты были приняты на другом конце вне порядка. Однако объекты, поставленные в очередь одним и тем же процессом, всегда будут находиться в ожидаемом порядке относительно друг друга.

Предупреждение

Если процесс был убит с помощью Process.terminate() или os.kill() во время попытки использования Queue, то данные в очереди, вероятно, будут повреждены. Это может привести к тому, что любой другой процесс получит исключение при попытке использовать очередь позже.

Предупреждение

Как упомянуто выше, если дочерний процесс поместил предметы на очередь (и у него нет используемый JoinableQueue.cancel_join_thread), тогда тот процесс не закончится, пока все буферизированные предметы не смылись (flushed) в пайп.

Это означает, что при попытке присоединения к этому процессу может возникнуть взаимоблокировка, если вы не уверены, что все элементы, помещенные в очередь, были израсходованы. Аналогично, если дочерний процесс не является демоническим, то родительский процесс может зависать на выходе, когда пытается присоединиться ко всем своим недемоническим потомкам.

Обратите внимание, что очередь, созданная с помощью менеджера, не имеет этой проблемы. См. Программирование рекомендаций.

Пример использования очередей для межпроцессной связи см. в разделе Примеры.

multiprocessing.Pipe([duplex])

Возвращает пару (conn1, conn2) объектов Connection, представляющих концы пайпа.

Если duplex равно True (по умолчанию), то пайп является двунаправленным. Если duplex - False тогда, пайп однонаправлен: conn1 может только быть используемый для получения сообщений, и conn2 может только быть используемый для отправки сообщений.

class multiprocessing.Queue([maxsize])

Возвращает общую очередь процесса, реализованную с использованием пайп и нескольких блокировок/семафоров. Когда процесс сначала помещает элемент в очередь, запускается питатель поток, который передает объекты из буфера в пайп.

Обычные исключения queue.Empty и queue.Full из модуля queue стандартной библиотеки вызываются тайм-аутами сигнала.

Queue реализует все методы queue.Queue, кроме task_done() и join().

qsize()

Возвращает приблизительный размер очереди. Из-за семантики многопоточной/многопроцессорной обработки это число не является надежным.

Обратите внимание, что это может привести к появлению NotImplementedError на платформах Unix, таких как Mac OS X, где sem_getvalue() не реализован.

empty()

Возвращает True, если очередь пуста, False иначе. Из-за семантики многопоточной/многопроцессорной обработки это ненадежно.

full()

Возвращает True, если очередь полна, False иначе. Из-за семантики многопоточной/многопроцессорной обработки это ненадежно.

put(obj[, block[, timeout]])

Поместить объект в очередь. Если дополнительный аргумент block имеет значение True (по умолчанию), а timeout - значение None (по умолчанию), при необходимости блокируйте до тех пор, пока свободный слот не будет доступен. Если timeout является положительным числом, он блокирует максимум timeout секунд и вызывает исключение queue.Full, если свободный слот не был доступен в течение этого времени. В противном случае (block - False) Поместить элемент в очередь, если свободный слот немедленно доступен, иначе вызовите исключение queue.Full (timeout игнорируется в этом случае).

Изменено в версии 3.8: Если очередь закрыта, ValueError поднят вместо AssertionError.

put_nowait(obj)

Эквивалентный put(obj, False).

get([block[, timeout]])

Удаление и возвращает элемента из очереди. Если необязательными args block является True (по умолчанию), а timeout - None (по умолчанию), при необходимости блокируйте до тех пор, пока элемент не будет доступен. Если timeout является положительным числом, он блокирует максимум timeout секунд и вызывает исключение queue.Empty, если за это время не было доступно ни одного элемента. В противном случае (блок - False), возвращает элемент, если он немедленно доступен, иначе вызовите исключение queue.Empty (в этом случае timeout игнорируется).

Изменено в версии 3.8: Если очередь закрыта, ValueError поднят вместо OSError.

get_nowait()

Эквивалентный get(False).

multiprocessing.Queue имеет несколько дополнительных методов, не найденных в queue.Queue. Эти методы обычно не нужны для большинства код:

close()

Укажите, что текущий процесс больше не будет помещать данные в эту очередь. Фоновый поток завершит работу после очистки всех буферизованных данных в пайп. Это вызывается автоматически при сборе мусора в очередь.

join_thread()

Присоединиться к фоновому потоку. Это может только быть используемый после того, как вызван close(). Это блокирует до фона выходы поток, гарантируя, что все данные в буфере смылись (flushed) в пайп.

По умолчанию, если процесс не является создателем очереди, при выходе он попытается присоединиться к фоновому поток очереди. Процесс может вызвать cancel_join_thread(), чтобы заставить join_thread() ничего не делать.

cancel_join_thread()

Препятствуйте тому, чтобы join_thread() блокировал. В частности, это предотвращает автоматическое соединение фоновых поток при завершении процесса – см. раздел join_thread().

Лучшее имя для этого метода может быть allow_exit_without_flush(). Вероятно, это приведет к потере поставленных в очередь данных, и вам почти наверняка не потребуется их использовать. Это действительно только в том случае, если необходимо, чтобы текущий процесс немедленно вышел, не дожидаясь очистки поставленных в очередь данных на основной пайп, и вы не заботитесь о потерянных данных.

Примечание

Функциональность этого класса требует функциональной реализации общего семафора в операционной системе хоста. Без него функциональность этого класса будет отключена, и попытки создания экземпляра Queue приведут к появлению ImportError. Дополнительные сведения см. в разделе bpo-3770. То же самое относится и к любому из перечисленных ниже типов специализированных очередей.

class multiprocessing.SimpleQueue

Это упрощенный тип Queue, очень близкий к заблокированному Pipe.

empty()

Возвращает True, если очередь пуста, False иначе.

get()

Удаление и возвращает элемента из очереди.

put(item)

Поместить item в очередь.

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue, Queue подкласс, представляет собой очередь, которая дополнительно содержит методы task_done() и join().

task_done()

Укажите, что ранее поставленная в очередь задача завершена. Используется потребителями очереди. Для каждого get() используемый, чтобы выбрать задачу, последующий вызов task_done() сообщает очереди, что обработка задачи завершена.

Если join() в настоящее время заблокирует, то он возобновит, когда все предметы были обработаны (подразумевать, что требование task_done() было получено для каждого предмета, который был put() в очередь).

Вызывает ValueError, если вызывается больше раз, чем было элементов, помещенных в очередь.

join()

Блокировать до тех пор, пока все элементы в очереди не будут получены и обработаны.

Количество незавершенных задач увеличивается при каждом добавлении элемента в очередь. Счетчик снижается всякий раз, когда потребитель звонит task_done(), чтобы указать, что элемент был извлечен, и вся работа над ним завершена. Когда число незавершенных задач падает до нуля, join() разблокирует.

Разное

multiprocessing.active_children()

Возвращает список всех живых потомков текущего процесса.

Вызов этого метода имеет побочный эффект «присоединения» к любым уже завершенным процессам.

multiprocessing.cpu_count()

Возвращает количество ЦПУ в системе.

Это число не эквивалентно числу ЦПУ, которые может использовать текущий процесс. Количество используемых ЦПУ можно получить с помощью len(os.sched_getaffinity(0))

Может поднять NotImplementedError.

См.также

os.cpu_count()

multiprocessing.current_process()

Возвращает объект Process, соответствующий текущему процессу.

Аналог threading.current_thread().

multiprocessing.parent_process()

Возвращает объект Process, соответствующий родительскому процессу current_process(). Для основного процесса parent_process будет None.

Добавлено в версии 3.8.

multiprocessing.freeze_support()

Добавление поддержки в том случае, если программа, использующая multiprocessing, была заморожена для создания исполняемого файла Windows. (Был проверен с помощью py2exe, PyInstaller и cx_Freeze.)

Эту функцию необходимо вызвать непосредственно после if __name__ == '__main__' линии основного модуля. Например:

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

Если строка freeze_support() опущена, то попытка запуска замороженного исполняемого файла вызовет RuntimeError.

freeze_support() запроса не имеет никакого эффекта, когда он призван на любую операционную систему кроме Windows. Кроме того, если модулем управляет обычно Python интерпретатор на Windows (программа не была заморожена), то freeze_support() не имеет никакого эффекта.

multiprocessing.get_all_start_methods()

Возвращает список поддерживаемых методов запуска, первым из которых является метод по умолчанию. Возможные методы запуска: 'fork', 'spawn' и 'forkserver'. В Windows доступен только 'spawn'. На Unix 'fork' и 'spawn' всегда поддерживаются, при этом 'fork' - дефолт.

Добавлено в версии 3.4.

multiprocessing.get_context(method=None)

Возвращает объект контекст, имеющий ту же атрибуты, что и модуль multiprocessing.

Если method - None тогда дефолт, контекст - возвращенный. В противном случае method должны быть 'fork', 'spawn', 'forkserver'. ValueError возникает, если указанный метод запуска недоступен.

Добавлено в версии 3.4.

multiprocessing.get_start_method(allow_none=False)

Возвращает имя метода запуска используемый для запуска процессов.

Если метод начала не был исправлен, и allow_none ложный, то метод начала прикреплен к дефолту, и имя - возвращенный. Если метод запуска не был фиксирован, а allow_none имеет значение true, то None имеет значение возвращенный.

возвращает значение может быть 'fork', 'spawn', 'forkserver' или None. 'fork' является значением по умолчанию в Unix, а 'spawn' - значением по умолчанию в Windows.

Добавлено в версии 3.4.

multiprocessing.set_executable()

Задает путь к Python интерпретатор, который будет использоваться при запуске дочернего процесса. (По умолчанию sys.executable - используемый). Встраивающие устройства, вероятно, должны будут сделать что-то вроде:

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

прежде чем они смогут создавать дочерние процессы.

Изменено в версии 3.4: Теперь поддержанный на Unix, когда метод начала 'spawn' - используемый.

multiprocessing.set_start_method(method)

Установите метод, который должен быть используемый, чтобы начать дочерние процессы. method может быть 'fork', 'spawn' или 'forkserver'.

Следует отметить, что это должно вызываться не более одного раза и должно быть защищено внутри if __name__ == '__main__' клаузула основного модуля.

Добавлено в версии 3.4.

Объекты связи

Объекты соединения позволяют отправлять и принимать выбираемые объекты или строки. Они могут рассматриваться как связанные сокеты, ориентированные на сообщение.

Объекты соединения обычно создаются с помощью Pipe – см. также раздел Слушатели и клиенты.

class multiprocessing.connection.Connection
send(obj)

Отправьте объект на другой конец соединения, который следует прочитать с помощью команды recv().

Объект должен быть подбираемым. Очень большие огурцы (приблизительно 32 MiB +, хотя это зависит от ОС) могут вызвать исключение ValueError.

recv()

Возвращает объект, отправленный с другого конца соединения с помощью команды send(). Блокируется, пока не будет что получить. Поднимает EOFError, если ничего не осталось получить, а другой конец был закрыт.

fileno()

Возвращает файл дескриптор или дескриптор используемый по соединению.

close()

Закрыть связь.

Это вызывается автоматически при сборе мусора соединения.

poll([timeout])

Возвращает, имеются ли данные, доступные для чтения.

Если timeout не указан, то он будет возвращает немедленно. Если timeout - число, это указывает максимальное время блокировки в секундах. Если timeout None, то бесконечное время ожидания равно используемый.

Следует отметить, что несколько объектов соединения могут опрашиваться одновременно с помощью функции multiprocessing.connection.wait().

send_bytes(buffer[, offset[, size]])

Отправка байтовых данных из байтоподобного объекта в виде полного сообщения.

Если задано значение offset, то данные считываются из этой позиции в buffer. Если задано значение size, то многие байты будут считываться из буфера. Очень большие буферы (приблизительно 32 MiB +, хотя это зависит от ОС) могут вызвать исключение ValueError

recv_bytes([maxlength])

Возвращает полное сообщение байтовых данных, отправленное с другого конца соединения в виде строка. Блокируется, пока не будет что получить. Поднимает EOFError, если ничего не осталось получить, а другой конец закрылся.

Если задано значение maxlength, и сообщение длиннее maxlength, то OSError возникает, и соединение будет недоступно для чтения.

Изменено в версии 3.3: Используемая функция поднимает IOError, которое теперь является алиасом OSError.

recv_bytes_into(buffer[, offset])

Считывание в buffer полного сообщения байтовых данных, отправленных с другого конца соединения, и возвращает количества байтов в сообщении. Блокируется, пока не будет что получить. Поднимает EOFError, если ничего не осталось получить, а другой конец был закрыт.

buffer должен быть записываемым байтоподобным объектом. Если задано значение offset, то сообщение будет записано в буфер из этой позиции. Смещение должно быть неотрицательным целым числом, меньшим длины buffer (в байтах).

Если буфер слишком короткий, то возникает исключение BufferTooShort и полное сообщение доступно как e.args[0] где e - исключение сущность.

Изменено в версии 3.3: Сами объекты соединения теперь могут переноситься между процессами с помощью Connection.send() и Connection.recv().

Добавлено в версии 3.3: Объекты соединения теперь поддерживают протокол управления контекст - см. раздел Типы менеджера контекста. __enter__() возвращает объект подключения, а __exit__() вызывает close().

Например:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

Предупреждение

Метод Connection.recv() автоматически отключает полученные данные, что может быть угрозой безопасности, если вы не можете доверять процессу, отправившему сообщение.

Поэтому, если объект подключения не был создан с помощью Pipe(), следует использовать методы recv() и send() только после выполнения той или иной аутентификации. См. Ключи аутентификации.

Предупреждение

Если процесс погибает, когда он пытается прочитать или записать в пайп, то данные в пайп, вероятно, будут повреждены, поскольку может оказаться невозможным убедиться, где лежат границы сообщения.

Примитивы синхронизации

Обычно примитивы синхронизации не так необходимы в многопроцессорной программе, как в многопоточной программе. См. документацию по модулю threading.

Обратите внимание, что можно также создать примитивы синхронизации с помощью объекта менеджера - см. раздел Менеджеры.

class multiprocessing.Barrier(parties[, action[, timeout]])

Объект барьер: клон threading.Barrier.

Добавлено в версии 3.3.

class multiprocessing.BoundedSemaphore([value])

Барьерный семафорный объект: близкий аналог threading.BoundedSemaphore.

Существует одиночное отличие от его близкого аналога: первый аргумент его метода acquire называется block, как это согласуется с Lock.acquire().

Примечание

В Mac OS X это неотличимо от Semaphore, потому что sem_getvalue() не реализована на этой платформе.

class multiprocessing.Condition([lock])

Переменная условия: алиас для threading.Condition.

Если задано значение lock, то это должен быть объект Lock или RLock из multiprocessing.

Изменено в версии 3.3: Добавлен метод wait_for().

class multiprocessing.Event

Клон threading.Event.

class multiprocessing.Lock

Нерекурсивный объект блокировки: близкий аналог threading.Lock. Как только процесс или поток приобрел блокировку, последующие попытки получить ее от любого процесса или поток будут блокироваться до тех пор, пока она не будет освобождена; любой процесс или поток может освободить его. Понятия и поведение threading.Lock в том виде, в каком они применяются к потоки, воспроизводятся здесь в multiprocessing.Lock, как это применимо к процессам или потоки, за исключением отмеченных.

Обратите внимание, что Lock - на самом деле фабричная функция, какой возвращает сущность multiprocessing.synchronize.Lock инициализировал с дефолтом контекст.

Lock поддерживает протокол контекстного менеджера и таким образом может быть используемый в with инструкции.

acquire(block=True, timeout=None)

Получить блокировку, блокировку или неблокировку.

Если для аргумента block установлено значение True (значение по умолчанию), вызов метода будет блокироваться до тех пор, пока блокировка не окажется в разблокированном состояние, а затем будет установлен параметр locked и возвращает True. Обратите внимание, что имя этого первого аргумента отличается от имени в threading.Lock.acquire().

Если для аргумента block установлено значение False, вызов метода не блокируется. Если блокировка в данный момент находится в заблокированном состояние, возвращает False; в противном случае установите блокировку на заблокированный состояние и возвращает True.

При вызове с положительным значение с плавающей запятой для timeout блок максимум на количество секунд, указанное параметром timeout, если блокировка не может быть получена. Просьбы с отрицательным значение для timeout эквивалентны timeout ноля. Вызовы с timeout значение None (по умолчанию) устанавливают период тайм-аута как бесконечный. Отметим, что лечение отрицательного или None значения для timeout отличается от реализованного поведения в threading.Lock.acquire(). Аргумент timeout не имеет практических последствий, если аргумент block имеет значение False и, таким образом, игнорируется. Возвращает True если блокировка была приобретена или False, если истек период ожидания.

release()

Отпустите замок. Это может быть вызвано из любого процесса или поток, а не только из процесса или поток, которые первоначально приобрели блокировку.

Поведение аналогично поведению в threading.Lock.release(), за исключением того, что при вызове на разблокированной блокировке возникает ValueError.

class multiprocessing.RLock

Объект рекурсивной блокировки: близкий аналог threading.RLock. Рекурсивная блокировка должна быть разблокирована процессом или поток, получившим ее. Как только процесс или поток приобрел рекурсивную блокировку, тот же самый процесс или поток может снова получить ее без блокировки; этот процесс или поток должны выпускать его один раз в каждый раз, когда он был приобретен.

Обратите внимание, что RLock - на самом деле фабричная функция, какой возвращает сущность multiprocessing.synchronize.RLock инициализировал с дефолтом контекст.

RLock поддерживает протокол контекстного менеджера и таким образом может быть используемый в with инструкции.

acquire(block=True, timeout=None)

Получить блокировку, блокировку или неблокировку.

При вызове с аргументом block, имеющим значение True, блокировать до тех пор, пока блокировка не окажется в разблокированном состояние (не принадлежащем какому-либо процессу или поток), пока блокировка уже не принадлежит текущему процессу или поток. Текущий процесс или поток затем получает право собственности на блокировку (если она еще не имеет права собственности), а уровень рекурсии внутри блокировки увеличивается на единицу, что приводит к возвращает значение True. Обратите внимание, что в поведении этого первого аргумента существует несколько различий по сравнению с реализацией threading.RLock.acquire(), начиная с имени самого аргумента.

При вызове с аргументом block, имеющим значение False, не блокируйте. Если замок был уже приобретен (и таким образом принадлежит) другим процессом или поток, текущий процесс или поток не берут собственность, и уровень рекурсии в замке не изменен, приведя к возвращает значение False. Если блокировка находится в разблокированном состояние, текущий процесс или поток становится владельцем и уровень рекурсии увеличивается, что приводит к возвращает значение True.

Использование и поведение аргумента timeout такие же, как и в Lock.acquire(). Обратите внимание, что некоторые из этих поведений timeout отличаются от реализованных поведений в threading.RLock.acquire().

release()

Разблокируйте блокировку, уменьшая уровень рекурсии. Если после уменьшения уровень рекурсии равен нулю, сбросьте блокировку на разблокированную (не принадлежащую какому-либо процессу или поток) и если какие-либо другие процессы или потоки заблокированы в ожидании разблокировки блокировки, разрешите выполнение только одного из них. Если после уменьшения уровень рекурсии остается ненулевым, блокировка остается заблокированной и принадлежит вызывающему процессу или поток.

Только назовите этот метод, когда процесс запроса или поток будут владеть замком. AssertionError поднят, если этот метод называют процесс или поток кроме владельца или если замок находится в незапертом (ненаходящемся в собственности) состояние. Обратите внимание, что тип исключения, вызванного в этой ситуации, отличается от реализованного поведения в threading.RLock.release().

class multiprocessing.Semaphore([value])

Семафорный объект: близкий аналог threading.Semaphore.

Существует одиночное отличие от его близкого аналога: первый аргумент его метода acquire называется block, как это согласуется с Lock.acquire().

Примечание

В Mac OS X sem_timedwait не поддерживается, поэтому вызов acquire() с тайм-аутом будет эмулировать поведение этой функции с помощью спящего цикла.

Примечание

Если сигнал SIGINT, сгенерированный Ctrl-C, прибудет, в то время как главный поток заблокирован требованием к BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() или Condition.wait() тогда, требование будет немедленно прервано, и KeyboardInterrupt будет поднят.

Это отличается от поведения threading, где SIGINT будет игнорироваться во время выполнения эквивалентных блокирующих вызовов.

Примечание

Некоторые функциональные возможности этого пакета требуют функциональной реализации общего семафора в операционной системе хоста. Без него модуль multiprocessing.synchronize будет отключен, и попытки его импорта приведут к появлению ImportError. Дополнительные сведения см. в разделе bpo-3770.

Разделенные объекты ctypes

С помощью общей памяти можно создавать общие объекты, которые могут наследоваться дочерними процессами.

multiprocessing.Value(typecode_or_type, *args, lock=True)

Возвращает объект ctypes, выделенный из общей памяти. По умолчанию возвращает значение фактически является синхронизированной оболочкой для объекта. К самому объекту можно получить доступ через value атрибут Value.

typecode_or_type определяет тип объекта возвращенный: это - или тип ctypes или один символ typecode доброго используемый модулем array. *args передан конструктору для типа.

Если lock равно True (по умолчанию), создается новый объект рекурсивной блокировки для синхронизации доступа к значение. Если lock является объектом Lock или RLock, то это будет используемый для синхронизации доступа к значение. Если lock будет False тогда, то доступ к объекту возвращенный не будет автоматически защищен замком, таким образом, это не обязательно будет «безопасно от процесса».

Операции как +=, которые включают прочитанный и пишут, не атомные. Так что, если для сущность, вы хотите атомарно увеличить общий значение это недостаточно, чтобы просто сделать:

counter.value += 1

Предполагая, что связанная блокировка является рекурсивной (что по умолчанию), можно сделать:

with counter.get_lock():
    counter.value += 1

Обратите внимание, что lock является только ключевым аргументом.

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

Возвращает массив ctypes, выделенный из общей памяти. По умолчанию возвращает значение фактически является синхронизированной обёрткой для массива.

typecode_or_type определяет тип элементов множества возвращенный: это - или тип ctypes или один символ typecode доброго используемый модулем array. Если size_or_initializer является целым числом, то он определяет длину массива, и массив будет первоначально обнулен. Иначе size_or_initializer - последовательность, которая является используемый, чтобы инициализировать множество и чья длина определяет длину множества.

Если lock равно True (по умолчанию), создается новый объект блокировки для синхронизации доступа к значение. Если lock является объектом Lock или RLock, то это будет используемый для синхронизации доступа к значение. Если lock будет False тогда, то доступ к объекту возвращенный не будет автоматически защищен замком, таким образом, это не обязательно будет «безопасно от процесса».

Обратите внимание, что lock - ключевой только аргумент.

Обратите внимание, что у множества ctypes.c_char есть value и raw атрибуты, которые позволяют использовать его, чтобы сохранить и восстановить строки.

Модуль multiprocessing.sharedctypes

Модуль multiprocessing.sharedctypes предоставляет функции для выделения объектов ctypes из общей памяти, которые могут быть унаследованы дочерними процессами.

Примечание

Хотя можно сохранить указатель в общей памяти, помните, что это относится к расположению в адресном пространстве конкретного процесса. Однако указатель, вероятно, недействителен в контекст второго процесса, и попытка отсоединить указатель от второго процесса может вызвать сбой.

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

Возвращает массив ctypes, выделенный из общей памяти.

typecode_or_type определяет тип элементов множества возвращенный: это - или тип ctypes или один символ typecode доброго используемый модулем array. Если size_or_initializer является целым числом, то он определяет длину массива, и массив будет первоначально обнулен. Иначе size_or_initializer - последовательность, которая является используемый, чтобы инициализировать множество и чья длина определяет длину множества.

Обратите внимание, что настройка и получение элемента потенциально неатомны - вместо этого используйте команду Array(), чтобы убедиться, что доступ автоматически синхронизирован с помощью блокировки.

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

Возвращает объект ctypes, выделенный из общей памяти.

typecode_or_type определяет тип объекта возвращенный: это - или тип ctypes или один символ typecode доброго используемый модулем array. *args передан конструктору для типа.

Обратите внимание, что настройка и получение значение потенциально неатомны - вместо этого используйте Value(), чтобы убедиться, что доступ автоматически синхронизирован с помощью блокировки.

Обратите внимание, что массив ctypes.c_char имеет value и raw атрибуты, которые позволяют использовать его для хранения и извлечения строки - см. документацию для ctypes.

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

То же как RawArray() за исключением того, что в зависимости от значение lock безопасная от процесса обертка синхронизации может быть возвращенный вместо сырья ctypes множество.

Если lock равно True (по умолчанию), создается новый объект блокировки для синхронизации доступа к значение. Если lock является объектом Lock или RLock, то это будет используемый для синхронизации доступа к значение. Если lock будет False тогда, то доступ к объекту возвращенный не будет автоматически защищен замком, таким образом, это не обязательно будет «безопасно от процесса».

Обратите внимание, что lock является толко ключевым аргументом.

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

То же как RawValue() за исключением того, что в зависимости от значение lock безопасная от процесса обертка синхронизации может быть возвращенный вместо сырья ctypes объект.

Если lock равно True (по умолчанию), создается новый объект блокировки для синхронизации доступа к значение. Если lock является объектом Lock или RLock, то это будет используемый для синхронизации доступа к значение. Если lock будет False тогда, то доступ к объекту возвращенный не будет автоматически защищен замком, таким образом, это не обязательно будет «безопасно от процесса».

Обратите внимание, что lock является только ключевым аргументом.

multiprocessing.sharedctypes.copy(obj)

Возвращает объект ctypes, выделенный из общей памяти, который является копией объекта ctypes obj.

multiprocessing.sharedctypes.synchronized(obj[, lock])

Возвращает безопасный для процесса объект-обертка для объекта ctypes, который использует lock для синхронизации доступа. Если lock имеет значение None (по умолчанию), то объект multiprocessing.RLock создается автоматически.

Синхронизированная обертка будет иметь два метода в дополнение к методам обертываемого объекта: get_obj() возвращает обертываемый объект и get_lock() возвращает блокируемый объект используемый для синхронизации.

Обратите внимание, что доступ к объекту ctypes через оболочку может быть намного медленнее, чем доступ к объекту raw ctypes.

Изменено в версии 3.5: Синхронизированные объекты поддерживают протокол менеджера контекста.

В приведенной ниже таблице сравнивается синтаксис создания общих объектов ctypes из общей памяти с синтаксисом обычных ctypes. (В таблице MyStruct представлена некоторая подкласс ctypes.Structure.

ctypes sharedctypes используя тип sharedctypes используя typecode
c_double(2.4) RawValue(c_double, 2.4) RawValue(„d“, 2.4)
MyStruct(4, 6) RawValue(MyStruct, 4, 6)  
(c_short * 7)() RawArray(c_short, 7) RawArray(„h“, 7)
(c_int * 3)(9, 2, 8) RawArray(c_int, (9, 2, 8)) RawArray(„i“, (9, 2, 8))

Ниже приведен пример, когда ряд объектов ctypes изменяется дочерним процессом:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

Результаты напечатаны:

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

Менеджеры

Менеджеры предоставляют способ создания данных, которые могут совместно использоваться различными процессами, включая совместное использование по сети между процессами, работающими на разных машинах. Объект-менеджер управляет серверным процессом, который управляет shared objects. Другие процессы могут обращаться к общим объектам с помощью прокси.

multiprocessing.Manager()

Возвращает начатый объект SyncManager, который может быть используемый для разделения объектов между процессами. Объект менеджера возвращенный соответствует порожденному дочернему процессу и имеет методы, которые создадут общие объекты и соответствующие возвращаемое прокси.

Процессы менеджера будут остановлены сразу после сбора мусора или завершения родительского процесса. Классы менеджера определяются в модуле multiprocessing.managers:

class multiprocessing.managers.BaseManager([address[, authkey]])

Создайте объект BaseManager.

После создания необходимо вызвать start() или get_server().serve_forever(), чтобы гарантировать, что объект менеджера ссылается на запущенный процесс менеджера.

address - адрес, по которому процесс менеджера прослушивает новые подключения. Если address None, то выбирается произвольная.

authkey - ключ идентификации, который будет используемый, чтобы проверить законность поступающих связей с процессом сервера. Если authkey - None тогда, current_process().authkey - используемый. В противном случае authkey является используемый и должен быть байтом строка.

start([initializer[, initargs]])

Начните подпроцессы, чтобы начать менеджера. Если initializer не является None, то подпроцессы вызовет initializer(*initargs) при запуске.

get_server()

Возвращает объект Server, представляющий фактический сервер под управлением менеджера. Объект Server поддерживает метод serve_forever():

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server дополнительно содержит address атрибут.

connect()

Подключить объект локального менеджера к процессу удаленного менеджера:

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()
shutdown()

Остановить процесс используемый менеджером. Это только доступно, если start() был используемый, чтобы начать процесс сервера.

Это можно вызвать несколько раз.

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

classmethod, который может быть используемый для регистрации типа или подлежащий выкупу с классом менеджера.

typeid - «идентификатор типа», который является используемый, чтобы определить особый тип общего объекта. Это должно быть строка.

callable является вызываемым используемый для создания объектов для этого идентификатора типа. Если диспетчер сущность будет подключен к серверу с помощью метода connect() или если аргумент create_method имеет значение False, его можно оставить как None.

proxytype является подкласс BaseProxy, который используемый для создания прокси для общих объектов с этим typeid. Если None, то класс прокси создается автоматически.

exposed - используемый, чтобы определить последовательность вызова метода, которые прокси для этого typeid нужно разрешить доступу, используя BaseProxy._callmethod(). (Если exposed - None тогда, proxytype._exposed_ - используемый вместо этого, если он существует.) в случае, если не указан список разоблаченных объектов, будут доступны все «публичные методы» общего объекта. (Здесь «метод публичный» означает любой атрибут, у которого есть метод __call__() и чьё имя не начинается с '_'.)

method_to_typeid - отображение используемый, чтобы определить тип возвращает тех выставленных методов, которые должны возвращает прокси. Это наносит на карту названия метода к typeid строки. (Если method_to_typeid - None тогда, proxytype._method_to_typeid_ - используемый вместо этого, если он существует.) Если вызов метода не будет ключом этого отображения или если отображение будет None тогда объект, то возвращенный методом будет скопирован значение.

create_method определяет, должен ли метод быть создан с именем typeid, который может быть используемый, чтобы сказать процессу сервера создавать новый общий объект и возвращает прокси для него. По умолчанию это значение равно True.

BaseManager сущности также имеют одно свойство только для чтения:

address

Адрес, используемый менеджером.

Изменено в версии 3.3: Объекты менеджера поддерживают протокол управления контекст - см. раздел Типы менеджера контекста. __enter__() запускает процесс сервера (если он еще не запущен), а затем возвращает объект диспетчера. __exit__() вызовы shutdown().

В предыдущих версиях __enter__() не запускал серверный процесс диспетчера, если он еще не был запущен.

class multiprocessing.managers.SyncManager

Подкласс BaseManager, который может быть используемый для синхронизации процессов. Объекты этого типа возвращенный multiprocessing.Manager().

Его методы обычно создают и возвращает Прокси объекты для многих типы данных используемый, которые будут синхронизированы через процессы. Это, в частности, включает общие списки и словари.

Barrier(parties[, action[, timeout]])

Создание общего объекта threading.Barrier и возвращает прокси для него.

Добавлено в версии 3.3.

BoundedSemaphore([value])

Создание общего объекта threading.BoundedSemaphore и возвращает прокси для него.

Condition([lock])

Создание общего объекта threading.Condition и возвращает прокси для него.

Если задано значение lock, то оно должно быть прокси для объекта threading.Lock или threading.RLock.

Изменено в версии 3.3: Добавлен метод wait_for().

Event()

Создание общего объекта threading.Event и возвращает прокси для него.

Lock()

Создание общего объекта threading.Lock и возвращает прокси для него.

Namespace()

Создание общего объекта Namespace и возвращает прокси для него.

Queue([maxsize])

Создание общего объекта queue.Queue и возвращает прокси для него.

RLock()

Создание общего объекта threading.RLock и возвращает прокси для него.

Semaphore([value])

Создание общего объекта threading.Semaphore и возвращает прокси для него.

Array(typecode, sequence)

Создайте массив и возвращает прокси-сервер для него.

Value(typecode, value)

Создайте объект с возможностью записи value атрибут и возвращает прокси для него.

dict()
dict(mapping)
dict(sequence)

Создание общего объекта dict и возвращает прокси для него.

list()
list(sequence)

Создание общего объекта list и возвращает прокси для него.

Изменено в версии 3.6: Общие объекты могут быть вложены. Например, объект общего контейнера, такой как общий список, может содержать другие общие объекты, которые будут управляться и синхронизироваться SyncManager.

class multiprocessing.managers.Namespace

Тип, который может регистрироваться в SyncManager.

Объект пространства имен не имеет публичных методов, но содержит записываемые атрибуты. Его представление показывает значения его атрибутов.

Однако при использовании прокси для объекта пространства имен атрибут, начинающийся с '_', будет атрибут прокси, а не атрибут ссылки:

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # это атрибут прокси
>>> print(Global)
Namespace(x=10, y='hello')

Настраиваемые менеджеры

Для создания собственного менеджера создается подкласс BaseManager и используется классметод register() для регистрации новых типов или вызываемых объектов в классе менеджера. Например:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

Использование удаленного менеджера

Можно запускать сервер менеджера на одной машине и использовать его клиентами с других машин (при условии, что соответствующие брандмауэры разрешают это).

Выполнение следующих команд создает сервер для одной общей очереди, к которой могут иметь доступ удаленные клиенты:

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Один клиент может получить доступ к серверу следующим образом:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

Его также может использовать другой клиент:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

Локальные процессы также могут обращаться к этой очереди, используя код сверху на клиенте для удаленного доступа к ней:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super(Worker, self).__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Прокси объекты

Прокси - объект, какой относится к общему объекту, который живет (по- видимому), в другом процессе. Говорят, что общий объект является ссылкой прокси. Несколько прокси-объектов могут иметь один и тот же ссылочный объект.

Прокси-объект содержат методы, которые вызывают соответствующие методы его ссылки (хотя не каждый метод ссылки обязательно будет доступен через прокси). Таким образом, прокси может быть используемый так же, как его референт:

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

Заметьте, что применение str() прокси будет возвращает представление референта, тогда как применение repr() будет возвращает представление прокси.

Важной особенностью прокси-объектов является то, что они могут быть переданы между процессами. Таким образом, референт может содержать Прокси объекты. Это разрешает вложение этих управляемых списков, словари и другого Прокси объекты:

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # ссылка a теперь содержит ссылку b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

Аналогично, прокси словарь и списка могут быть вложены друг в друга:

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

Если стандартные (не прокси) list или dict объекты содержатся в ссылке, изменения этих изменяемых значения не будут распространяться через менеджер, поскольку прокси не может знать, когда изменятся содержащиеся в них значения. Однако хранение значение в контейнерном прокси (который вызывает __setitem__ на объекте по доверенности) действительно размножается через менеджера и так эффективно изменить такой предмет, можно было повторно назначить измененный значение контейнерному прокси:

# создать список прокси и добавить изменяемый объект (словарь)
lproxy = manager.list()
lproxy.append({})
# теперь изменяемый словарь
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# на этом этапе изменения d еще не синхронизируются, но при обновлении словаря
# прокси уведомляется об изменении
lproxy[0] = d

Этот подход, возможно, менее удобен, чем использование вложенных Прокси объекты в большинстве случаев использования, но также демонстрирует уровень управления синхронизацией.

Примечание

Прокси-типы в multiprocessing не поддерживают сравнения по значение. Итак, для сущность у нас есть:

>>> manager.list([1,2,3]) == [1,2,3]
False

Вместо этого следует использовать копию ссылки при проведении сравнений.

class multiprocessing.managers.BaseProxy

Объекты по доверенности - сущности подклассы BaseProxy.

_callmethod(methodname[, args[, kwds]])

Вызов и возвращает результата метода ссылки прокси.

Если proxy - прокси, ссылочным элементом которого является obj, то выражение:

proxy._callmethod(methodname, args, kwds)

вычислит выражение:

getattr(obj, methodname)(*args, **kwds)

в процессе менеджере.

возвращенный значение будет копией результата вызова или прокси для нового общего объекта - см. документацию для аргумента method_to_typeid BaseManager.register().

Если в результате вызова возникает исключение, то вызов повторно инициируется функцией _callmethod(). Если в процессе менеджера возникает другое исключение, оно преобразуется в исключение RemoteError и инициируется _callmethod().

Обратите внимание, в частности, на то, что исключение будет создано, если methodname не было exposed.

Пример использования _callmethod():

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

Возвращает копию референта.

Если референт не может быть выбран, это вызовет исключение.

__repr__()

Возвращает представление прокси-объекта.

__str__()

Возвращает представление референта.

Очистка

Объект по доверенности использует weakref колбэк так, чтобы, когда это собрало мусор, он вычеркнул из списка себя от менеджера, который владеет его референтом.

Общий объект удаляется из процесса диспетчера, если на него больше не ссылается ни один прокси.

Процессные пулы

Можно создать пул процессов, который будет выполнять задачи, переданные ему с классом Pool.

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

Объект пула процессов, который управляет пулом рабочих процессов, в который могут отправляться задания. Он поддерживает асинхронные результаты с тайм-аутами и колбэки и имеет параллельную реализацию карты.

processes - количество используемых рабочих процессов. Если processes None, то число возвращенный по os.cpu_count() равно используемый.

Если initializer не является None, то каждый рабочий процесс будет вызывать initializer(*initargs) при запуске.

maxtasksperchild - количество задач, которые рабочий процесс может выполнить до выхода и замены новым рабочим процессом для освобождения неиспользуемых ресурсов. Дефолт maxtasksperchild - None, что означает процессы рабочего, будет жить пока бассейн.

context может быть используемый, чтобы определить контекст используемый для старта процессов рабочего. Обычно пул создается с помощью функции multiprocessing.Pool() или метода Pool() объекта контекст. В обоих случаях context устанавливается соответствующим образом.

Следует отметить, что методы объекта пула должны вызываться только процессом, создавшим пул.

Предупреждение

multiprocessing.pool объекты имеют внутренние ресурсы, которыми необходимо правильно управлять (как и любой другой ресурс), используя пул в качестве менеджера контекст или вызывая close() и terminate() вручную. Неспособность сделать это может привести к зависанию процесса при завершении.

Обратите внимание, что это не верно, чтобы полагаться на сборщик мусора, чтобы уничтожить пул, так как CPython не гарантирует, что финализатор пула будет вызван (см. object.__del__() для получения дополнительной информации).

Добавлено в версии 3.2: maxtasksperchild

Добавлено в версии 3.4: context

Примечание

Рабочие процессы внутри Pool обычно работают в течение всей продолжительности рабочей очереди пула. Частый шаблон, встречающийся в других системах (таких как Apache, mod_wsgi и т.д.) для освобождения ресурсов, находящихся в распоряжении работников, заключается в том, чтобы позволить работнику в пуле выполнить только определенный объем работы перед выходом, очисткой и новым процессом, который должен заменить старый. Аргумент maxtasksperchild Pool предоставляет эту возможность конечному пользователю.

apply(func[, args[, kwds]])

Вызовите func с аргументами args и ключевой аргументы kwds. Он блокируется до тех пор, пока результат не будет готов. Учитывая эти блоки, apply_async() лучше подходит для выполнения работы параллельно. Кроме того, func выполняется только в одном из работников пула.

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

Вариант метода apply(), который возвращает объект AsyncResult.

Если задан параметр callback, то он должен быть вызываемым и принимать один аргумент. То, когда результат становится готовым callback, применено к нему, это - то, если неудавшееся требование, в этом случае error_callback не применен вместо этого.

Если задан параметр error_callback, то он должен быть вызываемым и принимать один аргумент. Если целевая функция терпит неудачу, то error_callback называют за исключением сущность.

Обратные вызовы должны быть завершены немедленно, поскольку в противном случае поток, обрабатывающая результаты, будет заблокирована.

map(func, iterable[, chunksize])

Параллельный эквивалент встроенной функции map() (однако он поддерживает только один аргумент iterable для нескольких итерабелей см. starmap()). Он блокируется до тех пор, пока результат не будет готов.

Этот метод разбивает итабль на несколько чанки, которые он передает в пул процессов как отдельные задачи. (Приблизительный) размер этих чанки можно задать, установив для параметра chunksize положительное целое число.

Обратите внимание, что это может привести к интенсивному использованию памяти для очень длинных итераблей. Рассмотрите возможность использования imap() или imap_unordered() с явным параметром chunksize для повышения эффективности.

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

Вариант метода map(), который возвращает объект AsyncResult.

Если задан параметр callback, то он должен быть вызываемым и принимать один аргумент. То, когда результат становится готовым callback, применено к нему, это - то, если неудавшееся требование, в этом случае error_callback не применен вместо этого.

Если задан параметр error_callback, то он должен быть вызываемым и принимать один аргумент. Если целевая функция терпит неудачу, то error_callback называют за исключением сущность.

Обратные вызовы должны быть завершены немедленно, поскольку в противном случае поток, обрабатывающая результаты, будет заблокирована.

imap(func, iterable[, chunksize])

Более ленивый вариант map().

Аргумент chunksize совпадает с аргументом используемый методом map(). Очень долго iterables использование большого значение для chunksize может заставить работу закончить гораздо быстрее, чем использование дефолта значение 1.

Также если chunksize является 1, то метод next() итератора возвращенный методом imap() имеет необязательный параметр timeout: next(timeout) поднимет multiprocessing.TimeoutError, если результат не может быть возвращенный в течение timeout секунд.

imap_unordered(func, iterable[, chunksize])

То же как imap() за исключением того, что заказ результатов возвращенный iterator нужно считать произвольным. (Только при наличии только одного рабочего процесса порядок гарантирован как «правильный».

starmap(func, iterable[, chunksize])

Как map() за исключением того, что элементы iterable, как ожидают, будут iterables, которые распакованы как аргументы.

Следовательно iterable [(1,2), (3, 4)] приводит к [func(1,2), func(3,4)].

Добавлено в версии 3.3.

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

Комбинация starmap() и map_async(), которая итерирует по iterable итераблей и вызывает func с итераблями распакованными. Возвращает объект результата.

Добавлено в версии 3.3.

close()

Запрещает отправку дополнительных задач в пул. После завершения всех задач рабочие процессы завершаются.

terminate()

Немедленная остановка рабочих процессов без завершения незавершенной работы. То, когда объект бассейна - мусор, собралось, terminate() немедленно назовут.

join()

Дождитесь завершения рабочих процессов. Перед использованием close() необходимо вызвать terminate() или join().

Добавлено в версии 3.3: Объекты пула теперь поддерживают протокол управления контекст - см. раздел Типы менеджера контекста. __enter__() возвращает объект пула, __exit__() вызывает terminate().

class multiprocessing.pool.AsyncResult

Класс результата возвращенный по Pool.apply_async() и Pool.map_async().

get([timeout])

Возвращает результат при его поступлении. Если timeout не является None и результат не поступает в течение timeout секунд, то multiprocessing.TimeoutError поднимается. Если удаленное требование подняло исключение тогда, что исключение будет повторно поднято get().

wait([timeout])

Подождите, пока результат будет доступен, или до истечения timeout секунд.

ready()

Возвращает, завершен ли вызов.

successful()

Возвращает, выполнен ли вызов без возникновения исключения. Поднимет ValueError, если результат не готов.

Изменено в версии 3.7: Если результат не готов, ValueError поднимается вместо AssertionError.

В следующем примере показано использование пула:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # запустить 4 процесса воркера
        result = pool.apply_async(f, (10,)) # асинхронно вычислить "f(10)" в одном процессе
        print(result.get(timeout=1))        # печатает "100", если только ваш компьютер не работает *очень* медленно

        print(pool.map(f, range(10)))       # печать "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # печать "0"
        print(next(it))                     # печать "1"
        print(it.next(timeout=1))           # печать "4" если ваш компьютер не *очень* медленный

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # поднять multiprocessing.TimeoutError

Слушатели и клиенты

Обычно сообщение, проходящее между процессами, сделано, используя очереди, или при помощи Connection возражает возвращенный Pipe().

Однако модуль multiprocessing.connection обеспечивает некоторую гибкость. Он в основном дает высокоуровневый ориентированный на сообщения API для работы с сокеты или Windows с именем пайпы. У этого также есть поддержка digest authentication, используя модуль hmac, и опроса многочисленных связей одновременно.

multiprocessing.connection.deliver_challenge(connection, authkey)

Отправка случайно сгенерированного сообщения на другой конец соединения и ожидание ответа.

Если ответ соответствует дайджесту сообщения с использованием клавиши authkey, то приветственное сообщение отправляется на другой конец соединения. В противном случае поднимается AuthenticationError.

multiprocessing.connection.answer_challenge(connection, authkey)

Получение сообщения, вычисление дайджеста сообщения с использованием authkey в качестве ключа, а затем отправка дайджеста обратно.

Если приветственное сообщение не получено, возникает сообщение AuthenticationError.

multiprocessing.connection.Client(address[, family[, authkey]])

Попытка установить соединение с прослушивателем, который использует адрес address, возвращая Connection.

Тип соединения определяется аргументом family, но обычно его можно опустить, поскольку его обычно можно вывести из формата address. (См. Форматы адреса)

Если authkey задан, а не None, он должен быть байтом строка и будет используемый в качестве секретного ключа для вызова аутентификации на основе HMAC. Аутентификация не выполняется, если authkey является None. AuthenticationError возникает в случае сбоя аутентификации. См. Ключи аутентификации.

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

Обертка для связанной сокет или Windows с именем пайп, которая «слушает» подключения.

address - адрес, который должен быть используемый связанным сокет или именованным пайп объекта прослушивателя.

Примечание

Если адрес „0.0.0.0“ будет используемый, то адрес не будет соединяемой конечной точкой на Windows. Если требуется соединяемая конечная точка, следует использовать „127.0.0.1“.

family - тип используемого сокет (или именованного пайп). Это может быть один из строки 'AF_INET' (для TCP сокет), 'AF_UNIX' (для домена Unix сокет) или 'AF_PIPE' (для Windows с именем пайп). Из них только первый гарантированно доступен. Если family None, то семейство выводится из формата address. Если address также является None, то выбирается значение по умолчанию. Это семейство по умолчанию считается самым быстрым из доступных. См. Форматы адреса. Обратите внимание, что если family является 'AF_UNIX' и адрес None, то сокет будет создан в частном временном каталоге, созданном с помощью tempfile.mkstemp().

Если объект прослушивателя использует сокет, то backlog (по умолчанию 1) передается методу listen() сокет после его привязки.

Если authkey задан, а не None, он должен быть байтом строка и будет используемый в качестве секретного ключа для вызова аутентификации на основе HMAC. Аутентификация не выполняется, если authkey является None. AuthenticationError возникает в случае сбоя аутентификации. См. Ключи аутентификации.

accept()

Принять подключение на связанном сокете или именованный пайп объекта слушателя и возвращает объект Connection. Если идентификация предпринята и терпит неудачу, то поднимается AuthenticationError.

close()

Закрыть связанный сокет или именованный пайп объекта слушателя. Это вызывается автоматически при сборе мусора прослушивателя. Однако желательно называть его явно.

Объекты прослушивателя имеют следующие свойства только для чтения:

address

Адрес, используемый объектом Listener.

last_accepted

Адрес, с которого пришло последнее принятое соединение. Если это недоступно, то это None.

Добавлено в версии 3.3: Объекты прослушивателя теперь поддерживают протокол управления контекст - см. раздел Типы менеджера контекста. __enter__() возвращает объект прослушивателя, а __exit__() вызывает close().

multiprocessing.connection.wait(object_list, timeout=None)

Подождите, пока объект в object_list будет готов. Возвращает список тех объектов в object_list, которые готовы. Если timeout является float, то вызов блокируется максимум на столько секунд. Если timeout будет None тогда, то он заблокирует в течение неограниченного периода. Отрицательное время ожидания эквивалентно нулевому времени ожидания.

Как для Unix, так и для Windows объект может отображаться в object_list, если это так

Соединение или объект сокет готов, когда имеются данные, доступные для чтения из него, или другой конец закрыт.

Unix: wait(object_list, timeout) почти эквивалентный select.select(object_list, [], [], timeout). Разница в том, что, если select.select() прерывается сигналом, он может поднять OSError с числом ошибок EINTR, тогда как wait() не будет.

Windows: Элемент в object_list должен или быть целочисленным обработчиком, который ожидает (waitable) (согласно определению используемый документацией функции Win32 WaitForMultipleObjects()), или это может быть объект с методом fileno(), с каким возвращает ручка сокет или пайп обращаются. (Обратите внимание, что дескрипторы пайпа и сокет не являются дескрипторами ожидания.

Добавлено в версии 3.3.

Примеры

Следующий код сервера создает слушателя, который использует 'secret password' в качестве ключа идентификации. Затем он ожидает подключения и передает некоторые данные клиенту:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # семейство выведено как 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

Следующая код подключается к серверу и получает некоторые данные от сервера:

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

Следующая код использует wait() для ожидания сообщений от нескольких процессов одновременно:

import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # Мы закрываем записываемый конец пайп, чтобы убедиться, что p является
        # единственным процессом, который владеет дескриптором для него. Это гарантирует,
        # что когда p закроет свой дескриптор для записываемого конца, wait()
        # быстро сообщит о читаемом конце как о прочитанном.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

Форматы адреса

  • Адрес 'AF_INET' - кортеж формы (hostname, port), где hostname - строка, а port - целое число.
  • Адрес 'AF_UNIX' - это строка, представляющий имя файла в файловой системе.
  • Адрес 'AF_PIPE' представляет собой строку вида r'\\.\pipe{PipeName}'. Чтобы использовать Client() для подключения к именованному каналу на удаленном компьютере с именем ServerName, вместо этого следует использовать адрес формы r'\ServerName\pipe{PipeName}'.

Обратите внимание, что любая строка, начинающаяся с двух обратных косых черт, по умолчанию принимается как 'AF_PIPE' адрес, а не 'AF_UNIX' адрес.

Ключи аутентификации

Когда каждый использует Connection.recv, полученными данными является автоматически unpickled. К сожалению, отмена извлечения данных из ненадежного источника является угрозой безопасности. Поэтому Listener и Client() используйте модуль hmac для обеспечения дайджест-аутентификации.

Ключ аутентификации - это байт строка, который можно рассматривать как пароль: после установления соединения оба конца потребуют подтверждения того, что другой знает ключ аутентификации. (Демонстрирующий то, что оба конца используют тот же ключ, делает не, включают отправку ключа по связи.)

Если идентификацию запрошенный, но никакой ключ идентификации не определен тогда, возвращает значение current_process().authkey - используемый (см. Process). Этот значение будет автоматически унаследован любым объектом Process, что текущий процесс создает. Это означает, что (по умолчанию) все процессы многопроцессорной программы будут совместно использовать один ключ аутентификации, который может быть используемый при установке соединений между собой.

Подходящие ключи аутентификации также могут быть сгенерированы с использованием os.urandom().

Логирование

Доступна некоторая поддержка логирование. Однако следует отметить, что пакет logging не использует общие блокировки процессов, что позволяет (в зависимости от типа обработчик) смешивать сообщения из различных процессов.

multiprocessing.get_logger()

Возвращает логгер используемый по multiprocessing. При необходимости будет создан новый.

При первом создании логгер имеет уровень logging.NOTSET и не имеет обработчик по умолчанию. Сообщения, отправляемые этому логгер, по умолчанию не распространяются на корневой логгер.

Обратите внимание, что в Windows дочерние процессы наследуют только уровень логгер родительского процесса - любая другая настройка логгер не наследуется.

multiprocessing.log_to_stderr()

Эта функция выполняет вызов get_logger(), но в дополнение к возвращению логгер, созданного get_logger, она добавляет обработчик, который отправляет вывод в sys.stderr, используя формат '[%(levelname)s/%(processName)s] %(message)s'.

Ниже приведен пример сеанса с включенной функцией логирование:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

Полную таблицу уровней логирование см. в модуле logging.

Модуль multiprocessing.dummy

multiprocessing.dummy копирует API multiprocessing, но является не более чем оболочкой для модуля threading.

В частности, функция Pool, предоставляемая multiprocessing.dummy, возвращает экземпляр ThreadPool, который является подклассом Pool, который поддерживает все те же вызовы методов, но использует пул рабочих потоков, а не рабочих процессов.

class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])

Объект пула потоков, который управляет пулом рабочих потоков, которым могут быть отправлены задания. Экземпляры ThreadPool полностью совместимы по интерфейсу с экземплярами Pool, и их ресурсами также необходимо правильно управлять, используя пул в качестве диспетчера контекста или вызывая close() и terminate() вручную.

processes - количество используемых рабочих потоков. Если processes - None, то используется номер, возвращенный os.cpu_count().

Если initializer не None, то каждый рабочий процесс при запуске будет вызывать initializer(*initargs).

В отличие от Pool, maxtasksperchild и context не могут быть предоставлены.

Примечание

ThreadPool имеет тот же интерфейс, что и Pool, который разработан на основе пула процессов и предшествует введению модуля concurrent.futures. Таким образом, он наследует некоторые операции, которые не имеют смысла для пула, поддерживаемого потоками, и имеет свой собственный тип для представления состояния асинхронных заданий, AsyncResult, который не понимается никакими другими библиотеками.

Пользователям обычно следует использовать concurrent.futures.ThreadPoolExecutor, который имеет более простой интерфейс, изначально спроектированный для потоков, и который возвращает экземпляры concurrent.futures.Future, совместимые со многими другими библиотеками, включая asyncio.

Программирование рекомендаций

Существуют определенные руководящие принципы и идиомы, которых следует придерживаться при использовании multiprocessing.

Все методы начала

Следующее относится ко всем методам запуска.

Избегайте разделенного состояние

Насколько это возможно, следует стараться избегать переноса больших объемов данных между процессами.

Вероятно, лучше всего использовать очереди или пайпы для обмена данными между процессами, чем использовать примитивы синхронизации более низкого уровня.

Picklability

Убедитесь в том, что аргументы для методов прокси-серверов доступны.

Безопасность потоков прокси

Не используйте прокси-объект из нескольких поток, если вы не защитите его с помощью блокировки.

(Никогда нет проблемы с различными процессами, используя прокси same.)

Присоединение к процессам зомби

В Unix, когда процесс заканчивается, но не был присоединен, он становится зомби. Никогда не должно быть очень многих, потому что каждый раз новый процесс начинается (или active_children() называют), все законченные процессы, к которым еще не присоединились, будет присоединен. Кроме того, вызов Process.is_alive завершенного процесса присоединится к процессу. Тем не менее, это, вероятно, хорошая практика, чтобы явно присоединиться ко всем процессам, которые вы начинаете.

Лучше наследовать, чем pickle/unpickle

При использовании методов запуска spawn или forkserver многие типы из multiprocessing должны быть подбираемыми, чтобы их могли использовать дочерние процессы. Однако обычно следует избегать отправки общих объектов другим процессам с использованием пайпов или очередей. Вместо этого следует упорядочить программу так, чтобы процесс, которому необходим доступ к общему ресурсу, созданному в другом месте, мог наследовать его от предшествующего процесса.

Постарайтесь не заканчивать процессы

Используя метод Process.terminate, чтобы остановить процесс склонно заставить любые общие ресурсы (такие как замки, семафоры, пайпы и очереди) в настоящее время являющийся используемый процессом становиться сломанными или недоступными к другим процессам.

Поэтому, вероятно, лучше только рассмотреть использование Process.terminate на процессах, которые никогда не используют общих ресурсов.

Объединение процессов, использующих очереди

Следует иметь в виду, что процесс, поставивший элементы в очередь, будет ждать завершения до тех пор, пока все буферизированные элементы не будут поданы «питателем» поток в нижележащий пайп. (Дочерний процесс может вызвать метод Queue.cancel_join_thread очереди, чтобы избежать такого поведения.

Это означает, что при каждом использовании очереди необходимо убедиться, что все элементы, помещенные в очередь, будут удалены до присоединения процесса. В противном случае вы не можете быть уверены, что процессы, поставившие элементы в очередь, завершатся. Также следует помнить, что недемонические процессы будут присоединяться автоматически.

Примером взаимоблокировки является следующее:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # это мертвая блокировка
    obj = queue.get()

Фиксация здесь должна была бы обменять последние две линии (или просто удалить линию p.join()).

Явная передача ресурсов дочерним процессам

На Unix, используя метод начала fork, дочерний процесс может использовать общий ресурс, созданный в родительском процессе, используя глобальный ресурс. Однако лучше передать объект в качестве аргумента конструктору для дочернего процесса.

Кроме обеспечения совместимости код (потенциально) с Windows и другими методами запуска, это также гарантирует, что до тех пор, пока дочерний процесс еще жив, объект не будет мусором собран в родительском процессе. Это может быть важно, если какой-либо ресурс освобождается при сборе мусора объекта в родительском процессе.

Таким образом для сущность:

from multiprocessing import Process, Lock

def f():
    ... сделать что-то используя "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

следует переписать как:

from multiprocessing import Process, Lock

def f(l):
    ... сделать что-нибудь, используя "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

Остерегайтесь замены sys.stdin на «файл, как объект»

multiprocessing изначально безоговорочно вызвается:

os.close(sys.stdin.fileno())

в методе multiprocessing.Process._bootstrap() — это привело к проблемам с процессами в процессах. Это было изменено на:

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

Который решает основную проблему процессов, сталкивающихся друг с другом приводящим к плохой ошибке файла дескриптор, но вводит потенциальную опасность для заявлений, которые заменяют sys.stdin() «подобным файлу объектом» с буферизующей продукцией. Эта опасность заключается в том, что если несколько процессов вызывают close() на этом файловом объекте, это может привести к тому, что одни и те же данные будут очищены на объекте несколько раз, что приведет к повреждению.

Если вы пишете подобный файлу объект и осуществляете ваш собственный кэширование, вы можете сделать его безопасным от вилки, храня pid каждый раз, когда вы прилагаете к кэш и отказу от кэш, когда pid изменяется. Например:

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

Дополнительные сведения см. в разделах bpo-5155, bpo-5313 и bpo-5331

Методы запуска spawn и forkserver

Есть некоторые дополнительное ограничение, которые не относятся к методу начала fork.

Больше picklability

Убедитесь, что все аргументы для Process.__init__() доступны. Кроме того, если вы, подкласс Process тогда удостоверяется, что сущности будет picklable, когда метод Process.start назовут.

Глобальные переменные

Принять во внимание, что, если пробег код в дочернем процессе пытается получить доступ к глобальной переменной, то значение он видит (если таковые имеются) может не совпасть с значение в родительском процессе в то время, когда вызван Process.start.

Однако глобальные переменные, являющиеся только константами уровня модуля, не вызывают проблем.

Безопасный импорт основного модуля

Удостоверьтесь, что главный модуль может быть безопасно импортирован новым Python интерпретатор, не вызывая непреднамеренные побочные эффекты (такой старт нового процесса).

Например, использование метода начала spawn или forkserver, управляющего следующим модулем, потерпело бы неудачу с RuntimeError:

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

Вместо этого следует защитить «точку входа» программы, используя if __name__ == '__main__': следующим образом:

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(Строка freeze_support() может быть опущена, если программа будет запущена нормально, а не заморожена.

Это позволяет недавно порожденному Python интерпретатор безопасно импортировать модуль и затем управлять функцией foo() модуля.

Аналогичные ограничения применяются в случае создания пула или менеджера в главном модуле.

Примеры

Демонстрация создания и использования настраиваемых менеджеров и прокси:

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

Использование Pool:

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

Пример использования очередей для подачи задач в коллекцию рабочих процессов и сбора результатов: