Очереди

Source code: Lib/asyncio/queues.py


Очереди asyncio разработаны аналогично классам модуля queue. Хотя asyncio очереди не потокобезопасны, они предназначены для использования специально в async/await код.

Обратите внимание, что методы очереди asyncio не имеют параметра timeout; используйте asyncio.wait_for() функцию для выполнения операций очереди с таймаутом.

См. также раздел Примеры ниже.

Очередь

class asyncio.Queue(maxsize=0, *, loop=None)

Очередь первого входа, первого выхода (FIFO).

Если maxsize меньше или равно нулю, размер очереди бесконечен. Если это целое число больше 0, то await put() блокируется, когда очередь достигает maxsize, пока элемент не будет удален по get().

В отличие от стандартных queue многопоточности библиотеки, размер очереди всегда известен и может быть возвращенный путем вызова qsize() метод.

Deprecated since version 3.8, will be removed in version 3.10: Параметр loop.

Это класс not thread safe.

maxsize

Число элементов, разрешенных в очереди.

empty()

Возвращает True, если очередь пуста, False в противном случае.

full()

Возвращает True, если в очереди maxsize элементов.

Если очередь была инициализирована с maxsize=0 (дефолт), то full() никогда возвращаетs True.

coroutine get()

Удаление и возвращает элемента из очереди. Если очередь пуста, дождитесь доступности элемента.

get_nowait()

Возвращает элемент, если он доступен немедленно, в противном случае вызовите QueueEmpty.

coroutine join()

Блокировать до тех пор, пока все элементы в очереди не будут получены и обработаны.

Количество незавершенных задач увеличивается при добавлении элемента в очередь. Счетчик уменьшается каждый раз, когда потребитель вызывает task_done(), чтобы указать, что элемент был извлечен и вся работа над ним завершена. Когда количество незавершенных задач падает до нуля, join() разблокируется.

coroutine put(item)

Поместить элемент в очередь. Если очередь заполнена, дождитесь наличия свободного слота перед добавлением элемента.

put_nowait(item)

Поместить элемент в очередь без блокировки.

Если свободный слот не доступен немедленно, поднимите QueueFull.

qsize()

Возвращает количество элементов в очереди.

task_done()

Укажите, что ранее поставленная в очередь задача завершена.

Используется потребителями очереди. Для каждого get(), используемый для выборки задачи, последующий вызов task_done() сообщает очереди, что обработка задачи завершена.

Если join() в данный момент блокируется, он возобновится после обработки всех элементов (это означает, что был получен task_done() вызов для каждого элемента, put() в очередь).

Вызывает ValueError, если вызывается больше раз, чем элементы, помещенные в очередь.

Приоритетная очередь

class asyncio.PriorityQueue

Вариант Queue; извлекает записи в порядке приоритета (первый нижний).

Записи обычно представляют собой кортежи (priority_number, data) формы.

Очередь LIFO

class asyncio.LifoQueue

Вариант Queue, который сначала извлекает последние добавленные записи (последний вход, первый выход).

Исключения

exception asyncio.QueueEmpty

Это исключение возникает при вызове get_nowait() метод в пустой очереди.

exception asyncio.QueueFull

Исключение возникает при вызове put_nowait() метод в очереди, которая достигла своего максимального размера.

Примеры

Очереди можно используемый для распределения рабочей нагрузки между несколькими параллельными задачами:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Получить "рабочий элемент" вне очереди.
        sleep_for = await queue.get()

        # Спать "sleep_for" секунд.
        await asyncio.sleep(sleep_for)

        # Сообщение очереди, для обработки «рабочего элемента».
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Создайть очередь, которую мы будем использовать для хранения нашей "рабочей нагрузки".
    queue = asyncio.Queue()

    # Генерирует случайные тайминги и вставляет их в очередь.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Создать три рабочих задачи для одновременной обработки очереди.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Подождать, пока очередь не будет полностью обработана.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Отменить рабочие задания.
    for task in tasks:
        task.cancel()
    # Подождать, пока все рабочие задачи не будут отменены.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())