Корутины и задачи

В этом разделе описано высокоуровневое API asyncio для работы с корутинами и задачами.

Корутины

Корутины, объявляются с помощью async/await синтаксиса, является предпочтительным способом записи asyncio приложений. Например, следующий фрагмент код (требует Python 3.7) напечатает «hello», ожидает 1 секунду, а затем печатает «world»:

>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

Заметим, что простой вызов корутина не приведёт к его выполнению:

>>> main()
<coroutine object main at 0x1053bb7c8>

Чтобы фактически запустить корутин, asyncio предоставляет три основных механизма:

  • Функция asyncio.run() для запуска функции точки входа верхнего уровня «main()» (см. приведенный выше пример)

  • Ожидающая корутина. Следующий фрагмент кода напечатает «hello» после ожидания в 1 секунды, а затем напечатает «world» после ожидания в течении еще одной 2 секунды:

    import asyncio
    import time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(what)
    
    async def main():
        print(f"started at {time.strftime('%X')}")
    
        await say_after(1, 'hello')
        await say_after(2, 'world')
    
        print(f"finished at {time.strftime('%X')}")
    
    asyncio.run(main())
    

    Ожидаемый вывод:

    started at 17:13:52
    hello
    world
    finished at 17:13:55
    
  • Функция asyncio.create_task() для одновременного запуска корутин как asyncio: класс: „Tasks < Task >“.

    Давайте изменим приведенный выше пример и запустим две say_after корутины конкурентно:

    async def main():
        task1 = asyncio.create_task(
            say_after(1, 'hello'))
    
        task2 = asyncio.create_task(
            say_after(2, 'world'))
    
        print(f"started at {time.strftime('%X')}")
    
        # Подождите, пока обе задачи не будут выполнены (должны принять
        # около 2 секунд.)
        await task1
        await task2
    
        print(f"finished at {time.strftime('%X')}")
    

    Обратите внимание, что ожидаемые выходные данные теперь показывают, что фрагмент выполняется на 1 секунду быстрее, чем ранее:

    started at 17:14:32
    hello
    world
    finished at 17:14:34
    

Await объекты

Мы говорим, что объект является awaitable объектом, если его можно использовать в await выражении. Многие API-интерфейсы asyncio предназначены для приема awaitables. Существует три основных типа awaitable объектов: корутины, Задачи и футуры.

Корутины

Python корутины являются awaitables и поэтому могут быть awaited из других корутин:

import asyncio

async def nested():
    return 42

async def main():
    # Ничего не произойдет, если мы просто вызовем "nested()".
    # Объект корутины создан, но не await,
    # так что *не будет работать вообще*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

Важно

В этой документации термин «корутин» может быть используемый для двух тесно связанных понятий

  • Функция корутины: функция async def;
  • Объект корутины: объект, возвращенный путем вызова функции корутины.

asyncio также поддерживает устаревшие основанные на генераторах корутины.

Tasks

Tasks используется для конкурентного планирования короутин.

Когда корутина обернута в Task с такими функциями, как asyncio.create_task(), корутина автоматически планируется запустить в ближайшее время:

import asyncio

async def nested():
    return 42

async def main():
    # Запланировать nested() ближайший одновременный запуск
    # с "main()".
    task = asyncio.create_task(nested())

    # "task" теперь может использовать отмену "nested()" или
    # можно просто ждать, пока он не завершится:
    await task

asyncio.run(main())

Futures

Объект Future - это специальный await объект низкого уровеня , представляющий конечный результат асинхронной операции.

Когда объект Future awaited, это означает, что корутин будет ждать, пока будущее будет решено в каком-то другом месте.

Футуры объекты в asyncio необходимы, чтобы позволить основанному на колбэк код быть используемый с async/await.

Обычно нет нужды создавать объекты Future на уровне приложения код.

Футуры объекты, иногда раскрываемые библиотеками и некоторыми asyncio API, могут быть awaited:

async def main():
    await function_that_returns_a_future_object()

    # это также верно:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

Хорошим примером низкоуровневой функции, возвращающей объект Future, является loop.run_in_executor().

Запуск программы asyncio

asyncio.run(coro, *, debug=False)

Выполняет корутину coro и возвращает результат.

Эта функция управляет переданным корутином, заботясь об управлении asyncio событийный цикл и завершение асинхронных генераторов.

Эта функция не может быть вызвана, когда в том же потоке выполняется другой asyncio событийный цикл.

Если debug True, событийный цикл будет выполняться в режиме отладки.

Эта функция всегда создает новый событийный цикл и закрывает его в конце. Его следует используемый в качестве основной точки входа для asyncio программ, и в идеале его следует вызывать только один раз.

Пример:

async def main():
    await asyncio.sleep(1)
    print('hello')

asyncio.run(main())

Добавлено в версии 3.7.

Примечание::
Исходный код asyncio.run() можно найти в Lib/asyncio/runners.py.

Создание задач

asyncio.create_task(coro, *, name=None)

Обёртывание coro корутины в Task и запланировать его выполнение. Возвращает объект задачи. Если name не None, он задается как имя задачи с помощью Task.set_name(). Задача выполняется в цикле, возвращенный get_running_loop(), RuntimeError возникает, если в текущем потоке нет запущенного цикла. Эта функция была добавлена в Python 3.7. Ранее Python 3.7 вместо этого можно использовать низкоуровневое функцию asyncio.ensure_future():

async def coro():
    ...

# В Python 3.7+
task = asyncio.create_task(coro())
...

# Это работает во всех версиях Python, но менее читабельно
task = asyncio.ensure_future(coro())
...

Добавлено в версии 3.7.

Изменено в версии 3.8: Добавлен параметр name.

Спать

coroutine asyncio.sleep(delay, result=None, *, loop=None)

Блокировать на delay секунд.

Если result предоставляется, он возвращенный вызывающему после завершения корутина.

sleep() всегда приостанавливает выполнение текущей задачи, позволяя выполнять другие задачи.

Deprecated since version 3.8, will be removed in version 3.10: Параметр loop.

Пример корутина, отображающего текущую дату каждую секунду в течение 5 секунд:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

Запуск задач конкурентно

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

Запускает await объекты в последовательности aws конкурентно.

Если какой-либо await объект в aws является корутином, он автоматически назначается как задача.

Если все await объекты выполнены успешно, результатом является сводный список возвращенный значений. Порядок значений результата соответствует порядку await в aws.

Если return_exceptions является False (по умолчанию), первое вызванное исключение немедленно распространяется на задачу, которая awaits на gather(). Другие await объекты в aws последовательности не будут отменены и продолжат работу.

При return_exceptions True исключения обрабатываются так же, как и успешные результаты, и агрегируются в списке результатов.

Если gather() отменён, все представленные awaitables (которые еще не завершены) также будут отменены.

Если какая-либо задача или футура в последовательности aws - отменена, это рассматривают, как будто сработало исключение CancelledError - требование gather() - не отменяется в этом случае. Это необходимо для предотвращения отмены одной отправленной задачи/футур, чтобы привести к отмене других задач/футур.

Deprecated since version 3.8, will be removed in version 3.10: Параметр loop.

Пример:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Запланировать дерево вызовов *конкурентно*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Ожидаемый вывод:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

Примечание

Если return_exceptions имеет значение False, отмена gather() после того, как он был помечен как выполненный, не отменит ни одного отправленного объекта ожидания. Например, сборка может быть помечена как выполненная после передачи исключения вызывающей стороне, поэтому вызов gather.cancel() после перехвата исключения (вызванного одним из ожидаемых объектов) из сборки не отменяет другие ожидаемые объекты.

Изменено в версии 3.7: Если gather отменяется, отмена распространяется независимо от return_exceptions.

Защита от отмены

awaitable asyncio.shield(aw, *, loop=None)

Защита await объекта от отмены.

Если aw корутин, он автоматически назначается как задача.

оператор:

res = await shield(something())

эквивалентно:

res = await something()

except, что если корутин, содержащий его, отменен, задача, выполняющаяся в something(), не отменяется. С точки зрения something(), отмены не произошло. Хотя его звонивший все равно отменяется, поэтому выражение «await» все равно вызывает CancelledError.

Если something() отменяется другими средствами (т.е. изнутри), которые также отменяют shield().

Если требуется полностью игнорировать отмену (не рекомендуется), функция shield() должна быть объединена с предложением try/except следующим образом:

try:
    res = await shield(something())
except CancelledError:
    res = None

Deprecated since version 3.8, will be removed in version 3.10: Параметр loop.

Таймауты

coroutine asyncio.wait_for(aw, timeout, *, loop=None)

Дождаться завершения aw awaitable с таймаутом.

Если aw корутин, он автоматически назначается как задача.

timeout может быть либо None, либо числом секунд ожидания с плавающей запятой, либо числом int. Если timeout None, блокируйте до завершения будущего.

Если возникает таймаут, задача отменяется и вызывается asyncio.TimeoutError.

Чтобы избежать отмены задачи, оберните ее в shield().

Функция будет ждать, пока футура будет фактически отменено, поэтому общее время ожидания может превысить timeout.

Если ожидание отменяется, то также отменяется и будущий aw.

Deprecated since version 3.8, will be removed in version 3.10: Параметр loop.

Пример:

async def eternity():
    # Спать в течение одного часа
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Подождите не более 1 секунды
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

# Ожидаемый вывод:
#
#     timeout!

Изменено в версии 3.7: Когда aw отменяется из-за тайм-аута, wait_for ожидает отмены aw. Ранее она сразу поднимала asyncio.TimeoutError.

Примитивы ожидания

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

Запуск ожидаемых объектов в итерации aws одновременно и заблокировать, пока не будет выполнено условие, указанное в return_when.

Возвращаетs два множества задачи/футуры: (done, pending).

Использование:

done, pending = await asyncio.wait(aws)

timeout (float или int), если он указан, можно используемый для управления максимальным количеством секунд ожидания перед возвращаетing.

Обратите внимание, что эта функция не вызывает asyncio.TimeoutError. Футуры или задачи, которые не были выполнены при наступлении тайм-аута, просто возвращенный во втором наборе.

return_when указывает, когда функция должна возвращать. Она должна быть одной из следующих констант:

Константа Описание
FIRST_COMPLETED Функция возвращает, когда любая футура заканчится или отменится.
FIRST_EXCEPTION Функция возвращает после завершения любого процесса футуры путём создания исключения. Если в футуре исключение не возникает, то оно эквивалентно ALL_COMPLETED.
ALL_COMPLETED Функция возвращает после завершения или отмены всех футур.

В отличие от wait_for(), wait() не отменяет футуры при наступлении тайм-аута.

Не рекомендуется, начиная с версии 3.8: Если какой-либо awaitable в aws является корутином, он автоматически назначается как задача. Передача объектов корутина непосредственно в wait() является устаревшей, так как приводит к запутанному поведению.

Deprecated since version 3.8, will be removed in version 3.10: Параметр loop.

Примечание

wait() автоматическое планирование корутин как задач, затем возвращает создаваемые объекты Task в множестве (done, pending). Поэтому следующая код не будет работать так, как ожидалось:

async def foo():
    return 42

coro = foo()
done, pending = await asyncio.wait({coro})

if coro in done:
    # Эта ветка никогда не будет запущена!

Вот как можно зафиксировать вышеуказанный фрагмент:

async def foo():
    return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

if task in done:
    # Теперь все будет работать так, как и ожидалось.

Не рекомендуется, начиная с версии 3.8: Передача корутиновых объектов непосредственно в wait() устарела.

asyncio.as_completed(aws, *, loop=None, timeout=None)

Запуск ожидаемых объектов в aws итеративно конкурентно. Вернуть итератор корунин. Каждую возвращенную корутниу можно ожидать, чтобы получить самый ранний следующий результат от итерируемого из оставшихся ожидаемых.

Вызывает asyncio.TimeoutError, если тайм-аут наступает до выполнения всех футуры.

Deprecated since version 3.8, will be removed in version 3.10: Параметр loop.

Пример:

for coro in as_completed(aws):
    earliest_result = await coro
    # ...

Планирование из других потоков

asyncio.run_coroutine_threadsafe(coro, loop)

Отправить корутину в событийный цикл. Потокобезопасный.

Возвращает concurrent.futures.Future дождаться результата из другого потока оС.

Эта функция вызывается из потока операционной системы, отличного от того, где выполняется событийный цикл. Пример:

# Создание корутины
coro = asyncio.sleep(1, result=3)

# Отправить сопрограмму в заданный цикл
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Ожидать результата с необязательным аргументом timeout
assert future.result(timeout) == 3

Если в короутине возникает исключение, возвращенный Future будет уведомлен. Также можно используемый отменить задачу в событийный цикл:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

См. раздел concurrency and multithreading документации.

В отличие от других asyncio функций эта функция требует явной передачи аргумента loop.

Добавлено в версии 3.5.1.

Интроспекция

asyncio.current_task(loop=None)

Возвращает текущий Task сущность или None, если задача не выполняется.

Если loop None get_running_loop() используемый получить текущий цикл.

Добавлено в версии 3.7.

asyncio.all_tasks(loop=None)

Возвращает набор ещё не завершенных объектов Task, запущенных в цикле.

Если loop None, get_running_loop() используемый получить цикл тока.

Добавлено в версии 3.7.

Объект задачи

class asyncio.Task(coro, *, loop=None, name=None)

Объект Future-like, использующий Python корутину. Не потокобезопасной.

Задачи используемый запускать корутины в событийный циклs. Если корутин awaits на будущее, задача приостанавливает выполнение корутина и ожидает завершения будущего. Когда done будучность, возобновляет исполнение завернутого корутина.

Событийный цикл использовать совместное планирование: событийный цикл выполняет одну задачу одновременно. В то время как задача awaits для завершения будущего, событийный цикл выполняет другие задачи, колбэки или выполняет операции ввода- вывода.

Используйте функцию asyncio.create_task() высокого уровня, чтобы создать задачи или функции низкоуровневое loop.create_task() или ensure_future(). Не рекомендуется создавать экземпляры задач вручную.

Для отмены выполняемой задачи используйте cancel() метод. Вызов этого вызова приведет к тому, что задача бросит CancelledError исключение в обернутый корутин. Если корутин awaiting на объекте Future во время отмены, объект Future будет отменен.

cancelled() можно используемый, чтобы проверить, была ли задача отменена. В метод возвращаетs True, если завернутый корутин не подавил CancelledError исключение и фактически был отменен.

asyncio.Task наследует от Future все свои API, кроме Future.set_result() и Future.set_exception().

Задачи поддерживают модуль contextvars. При создании задачи она копирует текущий контекст, а затем запускает корутин в скопированном контекст.

Изменено в версии 3.7: Добавлена поддержка модуля contextvars.

Изменено в версии 3.8: Добавлен параметр name.

Deprecated since version 3.8, will be removed in version 3.10: Параметр loop.

cancel()

Запрос отмены задачи.

Это позволяет создать CancelledError исключение для обернутого корутина на следующем цикле событийный цикл.

После этого у корутина есть шанс очистить или даже отклонить просьбу, подавив исключение с try…… except CancelledError… Блок finally. Поэтому, в отличие от Future.cancel(), Task.cancel() не гарантирует, что задача будет отменена, хотя подавление отмены полностью не распространено и активно отговаривается.

В следующем примере показано, как корутины могут перехватывать запрос на отмену:

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Ждать 1 секунду
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Создание задачи "cancel_me"
    task = asyncio.create_task(cancel_me())

    # Ждать 1 секунду
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Ожидаемый результат:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now
cancelled()

Возвращает True, если задача отменена.

Задача отменена, когда запрашивалась отмена с cancel() и обернутый корутин распространял в нее CancelledError исключение.

done()

Возвращает True, если задача done.

Задача done, когда упакованный корутин либо возвращенный значение, либо вызвал исключение, либо задача была отменена.

result()

Возвращает результат выполнения задачи.

Если задача является done, то результатом обернутого корутина является возвращенный (или если корутин вызвал исключение, то это исключение возникает повторно.)

Если задача была cancelled, это метод вызывает CancelledError исключение.

Если результат задачи еще не доступен, это метод вызывает InvalidStateError исключение.

exception()

Возвращает исключение задачи.

Если у обернутого корутина возникло исключение, это исключение является возвращенный. Если обернутая сопрограмма возвращенный обычно этот метод возвращаетs None.

Если задача была cancelled, это метод вызывает CancelledError исключение.

Если задача еще не done, это метод вызывает InvalidStateError исключение.

add_done_callback(callback, *, context=None)

Добавление колбэк для выполнения при done задачи.

Этот метод должен только быть используемый в низкоуровневое колбэк-based код.

Для получения дополнительной информации см. документацию Future.add_done_callback().

remove_done_callback(callback)

Удалить callback из списка колбэки.

Этот метод должен только быть используемый в низкоуровневое колбэк-based код.

Для получения дополнительной информации см. документацию Future.remove_done_callback().

get_stack(*, limit=None)

Возвращает список кадров стека для этой задачи.

Если обернутый корутин не выполнен, это возвращаетs стопку, где он подвешен. Если короутин успешно завершен или отменен, это возвращаетs пустой список. Если корутин был прерван исключением, это возвращаетs список кадров отслеживания.

Кадры всегда упорядочиваются от самых старых до самых новых.

Для подвешенного корутина возвращенный только одна рама стека.

Необязательный аргумент limit задает максимальное количество кадров для возвращает; по умолчанию все доступные кадры являются возвращенный. Порядок возвращенный списка отличается в зависимости от того, возвращенный ли стек или отслеживание: возвращенный самые новые кадры стека, но самые старые кадры отслеживания - возвращенный. (Это соответствует поведению модуля отслеживания.

print_stack(*, limit=None, file=None)

Печать стека или отслеживания для этой задачи.

При этом выводятся выходные данные, аналогичные данным модуля отслеживания для кадров, извлекаемых get_stack().

Аргумент limit передается непосредственно get_stack().

Аргумент file представляет собой поток I/O, в который записываются выходные данные; по умолчанию выходные данные записываются в sys.stderr.

get_coro()

Возвращает коротенький предмет, обернутый Task.

Добавлено в версии 3.8.

get_name()

Возвращает имя задачи.

Если ни одно имя не было явно назначено задаче, реализация задачи asyncio по умолчанию создает имя по умолчанию во время создания экземпляра.

Добавлено в версии 3.8.

set_name(value)

Задание имя задачи.

Аргументом value может быть любой объект, который затем преобразуется в строка.

В реализации задачи по умолчанию имя будет отображаться в repr() выходных данных объекта задачи.

Добавлено в версии 3.8.

classmethod all_tasks(loop=None)

Возвращает набор всех задач для событийный цикл.

По умолчанию все задачи для текущего событийный цикл возвращенный. Если loop None, get_event_loop() функция используемый для получения текущего цикла.

Deprecated since version 3.7, will be removed in version 3.9: Не называйте это метод задач. Вместо этого используйте функцию asyncio.all_tasks().

classmethod current_task(loop=None)

Возвращает текущей задачи или None.

Если loop None, get_event_loop() функция используемый для получения текущего цикла.

Deprecated since version 3.7, will be removed in version 3.9: Не называйте это метод задач. Вместо этого используйте функцию asyncio.current_task().

Основанные на генераторах сопрограммы

Примечание:

Поддержка основанных на генераторах корутин **запрещено** и планируется к удалению в
Python 3.10.

Корутины на основе генератора предшествовали синтаксису async/await. Они представляют собой Python генераторы, которые используют yield from выражения для await на футуры и других корутинах.

Генераторные короутины должны быть задекорированы @asyncio.coroutine, хотя это не применяется.

@asyncio.coroutine

Декоратор для маркировки генератор-based корутинов.

Этот декоратор обеспечивает совместимость устаревших корутинов генератор-based с async/await код:

@asyncio.coroutine
def old_style_coroutine():
    yield from asyncio.sleep(1)

async def main():
    await old_style_coroutine()

Этот декоратор не должен быть используемый для async def корутинов.

Deprecated since version 3.8, will be removed in version 3.10: Используйте async def вместо этого.

asyncio.iscoroutine(obj)

Возвращает True, если obj объект корутины.

Этот метод отличается от inspect.iscoroutine() потому что это возвращаетs True для основанных на генератор сопрограмм.

asyncio.iscoroutinefunction(func)

Возвращает True, если func функция корутина.

Этот метод отличается от inspect.iscoroutinefunction(), потому что это возвращаетs True для основанных на генератор функций сопрограммы украсило with:func: „сопрограмма <сопрограмма>“.