concurrent.futures
— Запуск параллельных задач¶
Добавлено в версии 3.2.
Исходный код: Lib/concurrent/futures/thread.py и Lib/concurrent/futures/process.py
Модуль concurrent.futures
предоставляет высокоуровневый интерфейс для асинхронного
выполнения вызовов.
Асинхронное выполнение можно выполняться потоками, используя ThreadPoolExecutor
или
отдельными процессами, используя ProcessPoolExecutor
. Оба реализуют один и тот же
интерфейс, который определяется абстрактным классом Executor
.
Объекты Executor¶
-
class
concurrent.futures.
Executor
¶ Абстрактный класс, предоставляющий методы для асинхронного выполнения вызовов. Его следует используемый не напрямую, а через его конкретные подклассы.
-
submit
(fn, *args, **kwargs)¶ Планирование выполнения вызываемого fn как
fn(*args **kwargs)
и возвращает объектFuture
, представляющего выполнение вызываемого:with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
-
map
(func, *iterables, timeout=None, chunksize=1)¶ Аналогично
map(func, *iterables)
кроме:- iterables собираются немедленно, а не лениво;
- func выполняется асинхронно, и несколько вызовов func могут выполняться одновременно.
Возвращенный итератор вызывает
concurrent.futures.TimeoutError
, если__next__()
вызывается и результат недоступен через timeout секунд после исходного вызоваExecutor.map()
. timeout может быть int или float. Если timeout не указан или отсутствует, время ожидания не ограничено.Если func вызов вызывает исключение, то это исключение возникает при получении его значения из итератора.
При использовании
ProcessPoolExecutor
, это метод перемалывает iterables в чанки, которые он передает в пул как отдельные задачи. (Приблизительный) размер этих чанков можно задать, установив для параметра chunksize положительное целое число. Для очень длинных итераторов использование большого значения для chunksize может значительно повысить производительность по сравнению с размером по умолчанию 1. СThreadPoolExecutor
chunksize не имеет эффекта.Изменено в версии 3.5: Добавлен аргумент chunksize.
-
shutdown
(wait=True)¶ Сообщить исполнителю, что он должен освободить ресурсы, которые он использует при выполнении текущей ожидающих футуры. Вызовы
Executor.submit()
иExecutor.map()
, сделанные после завершения работы, вызовутRuntimeError
.Если wait будет
True
тогда, то этот метод не возвращает пока все ожидающие футуры не выполнятся и ресурсы, связанные с исполнителем, не будут освобождены. Если wait будетFalse
тогда, то этот метод немедленно возвращает и ресурсы, связанные с исполнителем, будут освобождены, когда все ожидающие футуры завершат выполнение. Независимо от значения wait, вся программа Python не завершит работу, пока не будут выполнены все ожидающие футуры.Можно избежать необходимости явного вызова этого метода, если используется инструкция
with
, которая отключаетExecutor
(ожидание, как если быExecutor.shutdown()
вызывались с wait, установленным вTrue
):import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
-
ThreadPoolExecutor¶
ThreadPoolExecutor
- это подкласс Executor
, использующий пул потоков
для асинхронного выполнения вызовов. Взаимоблокировки могут возникать, когда
вызываемый объект, связанный с Future
, ожидает результатов другого
Future
. Например:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b никогда не завершится, потому что ждет a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a никогда не завершится, потому что ждет b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
И:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# Это никогда не завершится, потому что есть только один рабочий поток и
# он выполняет эту функцию.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
-
class
concurrent.futures.
ThreadPoolExecutor
(max_workers=None, thread_name_prefix='', initializer=None, initargs=())¶ Подкласс
Executor
, использующий пул из не более max_workers потоков для асинхронного выполнения вызовов.initializer является необязательным вызываемым элементом, вызываемым в начале каждого рабочего потока; initargs является кортежем аргументов, переданных инициализатору. Если initializer создаёт исключение, все текущие ожидающие задания будут создавать
BrokenThreadPool
, а также любые попытки отправить больше заданий в пул.Изменено в версии 3.5: Если max_workers значение
None
или не указано, то по умолчанию будет установлено количество процессоров на машине, умноженное на5
, при условии, чтоThreadPoolExecutor
часто используемый перекрываz I/O вместо работы CPU и количество воркеров должно быть выше числа воркеров дляProcessPoolExecutor
.Добавлено в версии 3.6: Добавлен аргумент thread_name_prefix, позволяющий пользователям управлять именами
threading.Thread
для рабочих потоков, созданных пулом для упрощения отладки.Изменено в версии 3.7: Добавлены аргументы initializer и initargs.
Изменено в версии 3.8: Значение по умолчанию max_workers изменяется на
min(32, os.cpu_count() + 4)
. Это значение по умолчанию сохраняет не менее 5 воркеров для I/O связанных задач. Он использует не более 32 ядер CPU для задач, связанных с CPU, которые освобождают GIL. И это позволяет избежать неявного использования очень больших ресурсов на многоядерных машинах.ThreadPoolExecutor теперь повторно использует неактивные рабочие потоки перед запуском max_workers рабочих потоков.
Пример ThreadPoolExecutor¶
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Получить одну страницу и сообщить URL-адрес и содержание
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# Мы можем использовать оператор with, чтобы обеспечить быструю очистку потоков.
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Запустить операцию загрузки и отметить каждую футуру своим URL-адресом
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor¶
Класс ProcessPoolExecutor
- это подкласс Executor
, использующий пул процессов
для асинхронного выполнения вызовов. ProcessPoolExecutor
использует модуль multiprocessing
,
который позволяет ему выполнять обходные шаги глобальной блокировки интерпретатора,
но также означает, что могут быть выполнены и возвращены только picklable объекты.
Модуль __main__
должен быть импортирован подпроцессами воркера. Это значит,
что ProcessPoolExecutor
не будет работать в интерактивном интерпретатор.
Вызов Executor
или Future
метода из вызываемого, переданного
ProcessPoolExecutor
, приведет к взаимоблокировке.
-
class
concurrent.futures.
ProcessPoolExecutor
(max_workers=None, mp_context=None, initializer=None, initargs=())¶ Подкласс
Executor
, который выполняет вызовы асинхронно, используя пул не более max_workers процессов. Если max_workers —None
или не задано, по умолчанию будет использоваться количество процессоров на машине. Если max_workers меньше или равно0
, то будет поднятValueError
. В Windows max_workers должен быть меньше или равен61
. Если это не так, то будет поднятValueError
. Если max_workers - этоNone
, то по умолчанию будет выбрано самое большее61
, даже если доступно больше процессоров. mp_context может быть контекстом многопроцессорности или None. Он будет использоваться для запуска рабочих. Если mp_context равенNone
или не задан, используется контекст многопроцессорной обработки по умолчанию.initializer — необязательный вызываемый объект, который вызывается в начале каждого рабочего процесса; initargs — кортеж аргументов, переданных инициализатору. Если initializer вызовет исключение, все текущие ожидающие задания вызовут
BrokenProcessPool
, а также любые попытки отправить больше заданий в пул.Изменено в версии 3.3: При резком завершении одного из рабочих процессов возникает ошибка
BrokenProcessPool
. Ранее поведение не определялось, но операции с исполнителем или его футуры часто замораживалась или дедлочилась.Изменено в версии 3.7: Добавлен аргумент mp_context, позволяющий пользователям управлять start_method для рабочих процессов, созданных пулом.
Добавлены аргументы initializer и initargs.
Пример ProcessPoolExecutor¶
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Объекты Future¶
Модуль Future
класс инкапсулирует асинхронное выполнение вызываемого
объекта. Future
сущности создаются Executor.submit()
.
-
class
concurrent.futures.
Future
¶ Инкапсулирует асинхронное выполнение вызываемого объекта.
Future
сущности создаютсяExecutor.submit()
и не должны создаваться непосредственно, кроме тестирования.-
cancel
()¶ Попытка отменить вызов. Если вызов выполняется или завершен и не может быть отменен, метод возвращает
False
, в противном случае вызов отменяется, а метод возвращаетTrue
.
-
cancelled
()¶ Если вызов был успешно отменен, возвращает значение True.
-
running
()¶ Возвращает значение
True
, если вызов выполняется и не может быть отменен.
-
done
()¶ Если вызов был успешно отменён или завершён, вернёт значение True.
-
result
(timeout=None)¶ Возвращает значение, возвращенное вызовом. Если вызов еще не завершен, этот метод будет ждать до timeout секунд. Если вызов не был завершен в течение timeout секунд, то возникает
concurrent.futures.TimeoutError
. timeout может быть int или float. Если timeout не указан или отсутствует, время ожидания не ограничено.Если футура будет отменено до завершения, то будет поднято
CancelledError
.В случае вызова это метод вызовет такое же исключение.
-
exception
(timeout=None)¶ Возвращает исключение, вызванное вызовом. Если вызов еще не завершен, этот метод будет ждать до timeout секунд. Если вызов не был завершен в течение timeout секунд, то возникает
concurrent.futures.TimeoutError
. timeout может быть int или float. Если timeout не указан или отсутствует, время ожидания не ограничено.Если футура будет отменено до завершения, то
CancelledError
будут подняты.Если вызов завершен без вызова, возвращается «Нет».
-
add_done_callback
(fn)¶ Прикрепляет вызываемый fn к футуре. fn будет вызван с футурой в качестве единственного аргумента, когда футура будет отменена или закончена.
Добавленные вызываемые объекты вызываются в том порядке, в котором они были добавлены, и всегда вызываются в потоке, принадлежащем процессу, который их добавил. Если вызываемый объект вызывает подкласс
Exception
, он регистрируется и игнорируется. Если вызываемый объект вызывает подклассBaseException
, поведение не определено.Если футура уже завершилась или отменена, будет немедленно вызвана fn.
Следующие
Future
методы предназначается для использования в юнит тестах и реализацийExecutor
.-
set_running_or_notify_cancel
()¶ Этот метод должен вызываться только реализациями
Executor
перед выполнением работы, связанной сFuture
и юнит тестами.Если метод возвращает
False
тогда,Future
был отменен, т.е.Future.cancel()
вызван и возвращён True. Все потоки, ожидающие завершенияFuture
(т.е. черезas_completed()
илиwait()
), будут пробуждены.Если метод возвращает
True
, тоFuture
не был отменен и переведён в рабочее состояние, т.е. вызовыFuture.running()
вернутTrue
.Этот метод может быть вызван только один раз и не может быть вызван после вызова
Future.set_result()
илиFuture.set_exception()
.
-
set_result
(result)¶ Задает для результата работы, связанной с
Future
, значение result.Этот метод должен быть используемый только
Executor
реализациями и юнит тестами.Изменено в версии 3.8: Этот метод вызывает
concurrent.futures.InvalidStateError
, еслиFuture
уже выполнена.
-
Функции модуля¶
-
concurrent.futures.
wait
(fs, timeout=None, return_when=ALL_COMPLETED)¶ Дождаться завершения
Future
сущности (возможно, созданного различными экземплярамиExecutor
), заданного fs. Возвращает именованный 2-кортеж множеств. Первое множество, названноевыполненное
, содержит футуры, которые завершились (завершённыеили отменённые футуры), прежде чем ожидание завершилось. Второе множество , названноене_выполненные
, содержит футуры, которые не завершены (отложенные или запущенные футуры).timeout используется для управления максимальным количеством секунд ожидания перед возвращением. timeout может быть int или float. Если timeout не указан или отсутствует, время ожидания не ограничено.
return_when указывает, когда эта функция должна вернуться. Она должна быть одной из следующих констант:
Константа Описание FIRST_COMPLETED
Функция возвращает, когда любая футура заканчивается или будет отменена. FIRST_EXCEPTION
Функция возвращает, когда футура завершается вызвав исключение. Если в футуре нет исключения, оно эквивалентно ALL_COMPLETED
.ALL_COMPLETED
Функция возвращает, когда все футуры завершены или будут отменены.
-
concurrent.futures.
as_completed
(fs, timeout=None)¶ Возвращает итератор по
Future
сущности (возможно, созданный различными экземплярамиExecutor
), заданному fs, который возвращает футуры по мере завершения (завершённые или аннулированные футуры). Все дублируемые футуры fs возвращаются один раз. Все футуры, завершенные до вызоваas_completed()
, будут получены первыми. Возвращенный итератор вызываетconcurrent.futures.TimeoutError
, если__next__()
вызывается и результат недоступен через timeout секунд после исходного вызоваas_completed()
. timeout может быть int или float. Если timeout не указан или отсутствует, время ожидания не ограничено.
См.также
- PEP 3148 – футуры - выполнение асинхронных вычислений.
- Предложение, описывающее эту функцию для включения в стандартную библиотеку Python.
Классы исключений¶
-
exception
concurrent.futures.
CancelledError
¶ Возникает при отмене футуры.
-
exception
concurrent.futures.
TimeoutError
¶ Возникает, когда операция футуры превышает заданное время ожидания.
-
exception
concurrent.futures.
BrokenExecutor
¶ Полученный из
RuntimeError
, это исключение поднимает класс, когда исполнитель сломан по некоторым причинам и не может быть использован, чтобы представить или выполнить новые задачи.Добавлено в версии 3.7.
-
exception
concurrent.futures.
InvalidStateError
¶ Возникает при выполнении операции над футурой, которая не разрешена в текущем состояние.
Добавлено в версии 3.8.
-
exception
concurrent.futures.thread.
BrokenThreadPool
¶ Полученный из
BrokenExecutor
, это исключение поднимает класс, когда один из воркеровThreadPoolExecutor
провалил инициализацию.Добавлено в версии 3.7.
-
exception
concurrent.futures.process.
BrokenProcessPool
¶ Производное от
BrokenExecutor
(ранееRuntimeError
), это исключение класса возникает, когда один из воркеровProcessPoolExecutor
прекратил работу неочищенным способом (например, если он был убит снаружи).Добавлено в версии 3.3.