Очереди¶
Source code: Lib/asyncio/queues.py
Очереди asyncio разработаны аналогично классам модуля queue
. Хотя
asyncio очереди не потокобезопасны, они предназначены для использования специально
в async/await код.
Обратите внимание, что методы очереди asyncio не имеют параметра
timeout; используйте asyncio.wait_for()
функцию для выполнения операций очереди с
таймаутом.
См. также раздел Примеры ниже.
Очередь¶
-
class
asyncio.
Queue
(maxsize=0, *, loop=None)¶ Очередь первого входа, первого выхода (FIFO).
Если maxsize меньше или равно нулю, размер очереди бесконечен. Если это целое число больше
0
, тоawait put()
блокируется, когда очередь достигает maxsize, пока элемент не будет удален поget()
.В отличие от стандартных
queue
многопоточности библиотеки, размер очереди всегда известен и может быть возвращенный путем вызоваqsize()
метод.Deprecated since version 3.8, will be removed in version 3.10: Параметр loop.
Это класс not thread safe.
-
maxsize
¶ Число элементов, разрешенных в очереди.
-
empty
()¶ Возвращает
True
, если очередь пуста,False
в противном случае.
-
full
()¶ Возвращает
True
, если в очередиmaxsize
элементов.Если очередь была инициализирована с
maxsize=0
(дефолт), тоfull()
никогда возвращаетsTrue
.
-
coroutine
get
()¶ Удаление и возвращает элемента из очереди. Если очередь пуста, дождитесь доступности элемента.
-
get_nowait
()¶ Возвращает элемент, если он доступен немедленно, в противном случае вызовите
QueueEmpty
.
-
coroutine
join
()¶ Блокировать до тех пор, пока все элементы в очереди не будут получены и обработаны.
Количество незавершенных задач увеличивается при добавлении элемента в очередь. Счетчик уменьшается каждый раз, когда потребитель вызывает
task_done()
, чтобы указать, что элемент был извлечен и вся работа над ним завершена. Когда количество незавершенных задач падает до нуля,join()
разблокируется.
-
coroutine
put
(item)¶ Поместить элемент в очередь. Если очередь заполнена, дождитесь наличия свободного слота перед добавлением элемента.
-
put_nowait
(item)¶ Поместить элемент в очередь без блокировки.
Если свободный слот не доступен немедленно, поднимите
QueueFull
.
-
qsize
()¶ Возвращает количество элементов в очереди.
-
task_done
()¶ Укажите, что ранее поставленная в очередь задача завершена.
Используется потребителями очереди. Для каждого
get()
, используемый для выборки задачи, последующий вызовtask_done()
сообщает очереди, что обработка задачи завершена.Если
join()
в данный момент блокируется, он возобновится после обработки всех элементов (это означает, что был полученtask_done()
вызов для каждого элемента,put()
в очередь).Вызывает
ValueError
, если вызывается больше раз, чем элементы, помещенные в очередь.
-
Приоритетная очередь¶
Очередь LIFO¶
Исключения¶
-
exception
asyncio.
QueueEmpty
¶ Это исключение возникает при вызове
get_nowait()
метод в пустой очереди.
-
exception
asyncio.
QueueFull
¶ Исключение возникает при вызове
put_nowait()
метод в очереди, которая достигла своего максимального размера.
Примеры¶
Очереди можно используемый для распределения рабочей нагрузки между несколькими параллельными задачами:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Получить "рабочий элемент" вне очереди.
sleep_for = await queue.get()
# Спать "sleep_for" секунд.
await asyncio.sleep(sleep_for)
# Сообщение очереди, для обработки «рабочего элемента».
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Создайть очередь, которую мы будем использовать для хранения нашей "рабочей нагрузки".
queue = asyncio.Queue()
# Генерирует случайные тайминги и вставляет их в очередь.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Создать три рабочих задачи для одновременной обработки очереди.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Подождать, пока очередь не будет полностью обработана.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Отменить рабочие задания.
for task in tasks:
task.cancel()
# Подождать, пока все рабочие задачи не будут отменены.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())