queue — Класс синхронизированной очереди

Исходный код: Lib/queue.py


Модуль queue реализует многопользовательские очереди с несколькими производителями. Он особенно полезн в многопоточном программировании, когда информация должна быть безопасно обмениваться между несколькими потоками. Класс Queue в этом модуле реализует всю необходимую семантику блокировки.

Модуль реализует три типа очереди, которые отличаются только порядком извлечения записей. В очереди FIFO первые добавленные задачи извлекаются первыми. В очереди LIFO самая последняя добавленная запись является первой извлеченной (работающей как стек). С помощью очереди приоритетов записи сохраняются в сортировке (с использованием модуля heapq) и сначала извлекается запись с наименьшим значением.

Внутри эти три типа очередей используют блокировки для временной блокировки конкурирующих потоки; однако они не предназначены для обработки повторного входа в поток.

Кроме того, модуль осуществляет «простой» тип очереди FIFO, SimpleQueue, определенное реализация которого обеспечивает дополнительные гарантии в обмен на меньшую функциональность.

Модуль queue определяет следующие классы и исключения:

class queue.Queue(maxsize=0)

Конструктор очереди FIFO. maxsize - целое число, устанавливающее верхний предел количества элементов, которые могут быть помещены в очередь. Вставка блокируется после достижения этого размера до тех пор, пока не будут израсходованы элементы очереди. Если значение maxsize меньше или равно нулю, размер очереди бесконечен.

class queue.LifoQueue(maxsize=0)

Конструктор очереди LIFO. maxsize - целое число, устанавливающее верхний предел количества элементов, которые могут быть помещены в очередь. Вставка блокируется после достижения этого размера до тех пор, пока не будут израсходованы элементы очереди. Если значение maxsize меньше или равно нулю, размер очереди бесконечен.

class queue.PriorityQueue(maxsize=0)

Конструктор приоритетной очереди. maxsize - целое число, устанавливающее верхний предел количества элементов, которые могут быть помещены в очередь. Вставка блокируется после достижения этого размера до тех пор, пока не будут израсходованы элементы очереди. Если значение maxsize меньше или равно нулю, размер очереди бесконечен.

Записи с наименьшим значением извлекаются первыми (запись с наименьшим значением возвращаются по одному sorted(list(entries))[0]). Типичный шаблон для записей является кортежем в форме: (priority_number, data).

Если элементы data несопоставимы, данные могут быть упакованы в класс, который игнорирует элемент данных и сравнивает только номер приоритета:

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)
class queue.SimpleQueue

Конструктор для неограниченной очереди FIFO. В простых очередях отсутствуют расширенные функции, такие как отслеживание задач.

Добавлено в версии 3.7.

exception queue.Empty

Поднимается исключение, когда неблокирующий get() (или get_nowait()) вызываются на объекте Queue, который пуст.

exception queue.Full

Поднимается исключение, когда неблокирующий put() (или put_nowait()) вызываются на объекте Queue, который полон.

Объекты Queue

Объекты Queue (Queue, LifoQueue или PriorityQueue) предоставляют публичные методы, описанные ниже.

Queue.qsize()

Возвращает приблизительный размер очереди. Обратите внимание, что qsize() > 0 не гарантирует, что последующи вызов get() не будет блокироваться, и qsize() < maxsize гарантирует, что put() не будет блокироваться.

Queue.empty()

Возвращает True, если очередь пуста, False иначе. Если empty() возвращает True это не гарантирует, что последующее вызов put() не заблокирует. Аналогично, если empty() возвращает False это не гарантирует, что последующий вызов для get() не будет заблокирован.

Queue.full()

Возвращает True, если очередь полна, False иначе. Если full() возвращает True это не гарантирует, что последующий вызов для get() не будет заблокирован. Аналогично, если full() возвращает False это не гарантирует, что последующий вызов put() не будет блокироваться.

Queue.put(item, block=True, timeout=None)

Поместить item в очередь. Если дополнительный args block true, и timeout - None (по умолчанию), блок при необходимости, пока свободный слот не доступен. Если timeout является положительным числом, он блокирует максимум timeout секунд и вызывает исключение Full, если свободный слот не был доступен в течение этого времени. В противном случае (block является false) поместить элемент в очередь, если свободный слот немедленно доступен, в противном случае вызвать исключение Full (timeout игнорируется в этом случае).

Queue.put_nowait(item)

Эквивалентно put(item, False).

Queue.get(block=True, timeout=None)

Удаление и возвращение элемента из очереди. Если необязательные аргументы block имеют значение true, а timeout - None (по умолчанию), при необходимости блокирует до тех пор, пока элемент не будет доступен. Если timeout является положительным числом, он блокируется максимум timeout секунд и вызывает исключение Empty, если за это время не было доступно ни одного элемента. В противном случае (block является false), возвращает элемент, если он немедленно доступен, иначе вызвать исключение Empty (timeout игнорируется в этом случае).

До 3.0 на системах POSIX, и для всех версий на Windows, если block true и timeout - None, эта операция входит в непрерывное ожидание на основной блокировке. Это означает, что исключения не могут возникать, и, в частности, SIGINT не инициирует KeyboardInterrupt.

Queue.get_nowait()

Эквивалентно get(False).

Два метода предлагаются, чтобы поддержать отслеживание, были ли поставленные в очередь задачи полностью обработаны потребителем демона потоков.

Queue.task_done()

Указание, что ранее поставленная в очередь задача завершена. Используется потребителем очереди - потоками. Для каждого используемого get(), чтобы выбрать задачу, последующий вызов task_done() сообщает очереди, что обработка задачи завершена.

Если join() в настоящее время заблокирует, то он возобновит, когда все элементы были обработаны (подразумевать, что вызов task_done() было получено для каждого элемента, который был put() в очередь).

Вызывает ValueError, если вызывается больше раз, чем было элементов, помещенных в очередь.

Queue.join()

Блокировать, пока все элементы в очереди не будут получены и обработаны.

Количество незавершенных задач увеличивается при каждом добавлении элемента в очередь. Количество спускается каждый раз, когда потребитель, которого поток называет task_done(), чтобы указать, что элемент был восстановлен и вся работа над ним, полон. Когда число незавершенных задач падает до нуля, join() разблокирует.

Пример ожидания выполнения поставленных в очередь задач:

import threading, queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# включить рабочий поток
threading.Thread(target=worker, daemon=True).start()

# отправить тридцать запросов на выполнение работнику
for item in range(30):
    q.put(item)
print('All task requests sent\n', end='')

# блокировать, пока все задачи не будут выполнены
q.join()
print('All work completed')

Объекты SimpleQueue

SimpleQueue объекты предоставляют публичные методы, описанные ниже.

SimpleQueue.qsize()

Возвращает приблизительный размер очереди. Обратите внимание, что qsize() > 0 не гарантирует, что последующий get() не будет заблокирован.

SimpleQueue.empty()

Возвращает True, если очередь пуста, False иначе. Если empty() возвращает False это не гарантирует, что последующее вызов get() не заблокируется.

SimpleQueue.put(item, block=True, timeout=None)

Поместить item в очередь. Метод никогда не блокирует и всегда успешен (за исключением потенциальных ошибок низкоуровневое, таких как сбой выделения памяти). Дополнительные аргументы block и timeout игнорируются и предоставляются только для совместимости с Queue.put().

Детали реализации CPython: Этот метод имеет реализацию C, который является повторением. То есть вызов put() или get() может быть прерван другим вызовом put() в том же самом поток без взаимоблокировки или повреждения внутренних состояние внутри очереди. Это делает его подходящим для использования в деструкторах, таких как методы __del__ или weakref колбэки.

SimpleQueue.put_nowait(item)

Эквивалентно put(item), предусмотрено для совместимости с Queue.put_nowait().

SimpleQueue.get(block=True, timeout=None)

Удаление и возвращение элемента из очереди. Если необязательные аргументы block имеют значение true, а timeout - None (по умолчанию), при необходимости блокирует до тех пор, пока элемент не будет доступен. Если timeout является положительным числом, он блокирует более timeout секунд и вызывает исключение Empty, если за это время не было доступно ни одного элемента. В противном случае (block является false), возвращает элемент, если он немедленно доступен, иначе вызвать исключение Empty (timeout игнорируется в этом случае).

SimpleQueue.get_nowait()

Эквивалентно get(False).

См.также

Класс multiprocessing.Queue
Класс очереди для использования в многопроцессном (а не многопоточном) контексте.

collections.deque - альтернативное реализация неограниченных очередей с быстрым атомарным append() и операциями popleft(), которые не требуют блокировки и также поддерживают индексацию.