Транспорты и протоколы¶
Preface
Транспорты и протоколы используемый низкоуровневый API событийного цикла, например,
loop.create_connection()
. Они используют колбэк-based стиль программирования и позволяют
высокопроизводительные реализации сетевых или IPC протоколов (например, HTTP).
По сути, транспорты и протоколы должны используемый только в библиотеках и фреймворков, а не в asyncio приложениях высокого уровня.
Эта страница документации охватывает оба Транспорты and Протоколы.
Introduction
На самом высоком уровне транспорт связан с тем, как передаются байты, в то время как протокол определяет, какие байты передавать (и в какой-то степени, когда).
Другой способ сказать одно и то же: транспорт - это абстракция для сокет (или аналогичной конечной точки ввода-вывода), а протокол - это абстракция для приложения, с точки зрения транспорта.
Еще одна точка зрения состоит в том, что интерфейсы транспорта и протокола вместе определяют абстрактный интерфейс для использования сетевого ввода-вывода и межпроцессного ввода-вывода.
Всегда существует связь 1:1 между транспортом и объектами протокола: протокол вызывает транспортные методы для передачи данных, в то время как протокол транспортных вызовов методы для передачи ему данных, которые были приняты.
Большинство событийный цикл методы, ориентированных на соединение (например,
loop.create_connection()
), обычно принимают аргумент protocol_factory,
используемый для создания Protocol объекта для принятого соединения,
представленного Transport объектом. Такие методы обычно возвращ
кортеж (transport, protocol)
.
Contents
Эта страница документации содержит следующие разделы
- Документы раздела Транспорты asyncio
BaseTransport
,ReadTransport
,WriteTransport
,Transport
,DatagramTransport
иSubprocessTransport
классы. - Документы раздела Протоколы asyncio
BaseProtocol
,Protocol
,BufferedProtocol
,DatagramProtocol
иSubprocessProtocol
классы. - В Примеры разделе показано, как работать с переносами, протоколами и API низкоуровневое событийный цикл.
Транспорты¶
Исходный код: Lib/asyncio/transports.py
Транспорт классы обеспечивается asyncio
для абстракции различных видов
каналов связи.
Транспортные объекты всегда создаются событийным циклом asyncio.
asyncio реализует перенос для TCP, UDP, SSL и подпроцессы пайпы. Имеющиеся на транспорте методы зависят от вида транспорта.
Транспортные классы не потокобезопасно.
Иерархия транспортов¶
-
class
asyncio.
BaseTransport
¶ Базовая класс для всех транспортировок. Содержит методы, совместно используемые всеми asyncio транспортами.
-
class
asyncio.
WriteTransport
(BaseTransport)¶ Базовый транспорт для соединений только для записи.
Случаи WriteTransport класс - возвращаетed от
loop.connect_write_pipe()
событийный цикл метод и являются также используемый подпроцессы-связанным методы какloop.subprocess_exec()
.
-
class
asyncio.
ReadTransport
(BaseTransport)¶ Базовый транспорт для соединений только для чтения.
Случаи ReadTransport класс - возвращаетed от
loop.connect_read_pipe()
событийный цикл метод и являются также используемый подпроцессы-связанным методы какloop.subprocess_exec()
.
-
class
asyncio.
Transport
(WriteTransport, ReadTransport)¶ Интерфейс, представляющий двунаправленный транспорт, например TCP-соединение.
Пользователь не создает экземпляр транспорта непосредственно; они вызывают функцию утилиты, передавая ей фабрику протокола и другую информацию, необходимую для создания транспорта и протокола.
Случаи Transport класс - возвращаетed от или используемый событийный цикл методы как
loop.create_connection()
,loop.create_unix_connection()
,loop.create_server()
,loop.sendfile()
, и т.д.
-
class
asyncio.
DatagramTransport
(BaseTransport)¶ Транспорт для соединений дейтаграмм (UDP).
Сущности DatagramTransport класс - возвращаетed от
loop.create_datagram_endpoint()
событийный цикл метод.
-
class
asyncio.
SubprocessTransport
(BaseTransport)¶ Абстракция, представляющая соединение между родительской и дочерней ОС - процессами.
Сужности SubprocessTransport класс - возвращенный от событийный цикл методы
loop.subprocess_shell()
иloop.subprocess_exec()
.
Основной транспорт¶
-
BaseTransport.
close
()¶ Закрыть транспорт.
Если транспорт имеет буфер для исходящих данных, буферизованные данные будут очищены асинхронно. Больше данных получено не будет. После удаления всех буферизованных данных
protocol.connection_lost()
метод протокола вызывается сNone
в качестве аргумента.
-
BaseTransport.
is_closing
()¶ Возвращает
True
, закрывается ли транспортировка.
-
BaseTransport.
get_extra_info
(name, default=None)¶ Возвращает сведения о транспорте или базовых ресурсах, которые он использует.
name представляет собой строка, представляющую часть транспортной информации, которую необходимо получить.
default значение, которое необходимо возвращает, если информация недоступна или если транспорт не поддерживает запрос к ней с данной реализацией событийный цикл третьей стороны или на текущей платформе.
Например, следующая код пытается получить базовый сокет объект транспорта:
sock = transport.get_extra_info('socket') if sock is not None: print(sock.getsockopt(...))
Категории информации, которая может быть запрошена для некоторых транспортов:
- сокет:
'peername'
: удаленный адрес, к которому подключен сокет, результатsocket.socket.getpeername()
(None
об ошибке)'socket'
:socket.socket
сущность'sockname'
: собственный адрес сокет, результатsocket.socket.getsockname()
- SSL сокет:
'compression'
: алгоритм сжатия, используемый как строка, илиNone
, если соединение не сжато; результатssl.SSLSocket.compression()
'cipher'
: трехзначный кортеж, содержащий имя используемый шифра, версию протокола SSL, определяющего его использование, и количество используемый секретных битов; результатssl.SSLSocket.cipher()
'peercert'
: свидетельство ровесника; результатssl.SSLSocket.getpeercert()
'sslcontext'
:ssl.SSLContext
сущность'ssl_object'
:ssl.SSLObject
илиssl.SSLSocket
сущность
- пайп:
'pipe'
: объект пайп
- подпроцессы:
'subprocess'
:subprocess.Popen
сущность
- сокет:
-
BaseTransport.
set_protocol
(protocol)¶ Установить новый протокол.
Протокол коммутации должен выполняться только в том случае, если оба протокола задокументированы для поддержки коммутатора.
-
BaseTransport.
get_protocol
()¶ Возвращает текущий протокол.
Транспоры только для чтения¶
-
ReadTransport.
is_reading
()¶ Возвращает
True
, получает ли транспорт новые данные.Добавлено в версии 3.7.
-
ReadTransport.
pause_reading
()¶ Приостановка приемного конца транспорта. Данные не будут передаваться в
protocol.data_received()
метод протокола, покаresume_reading()
не будет вызван.Изменено в версии 3.7: Идемпотентный метод, то есть его можно вызвать, когда транспорт уже paиспользуемый или закрыт.
-
ReadTransport.
resume_reading
()¶ Возобновляет прием.
protocol.data_received()
метод протокола будет вызван еще раз, если какие-то данные будут доступны для чтения.Изменено в версии 3.7: Идемпотентный метод, то есть его можно вызвать, когда транспорт уже читает.
Транспорты только для записи¶
-
WriteTransport.
abort
()¶ Немедленно закрыть транспорт, не ожидая завершения ожидающих операций. Буферизованные данные будут потеряны. Больше данных получено не будет.
protocol.connection_lost()
метод протокола в конечном итоге будет вызван сNone
в качестве его аргумента.
-
WriteTransport.
can_write_eof
()¶ Возвращает
True
если транспорт поддерживаетwrite_eof()
,False
если нет.
-
WriteTransport.
get_write_buffer_size
()¶ Возвращает текущего размера выходного буфера, используемый транспортом.
-
WriteTransport.
get_write_buffer_limits
()¶ Получить high и low водяные знаки для управления потоком записи. Возвращает кортеж
(low, high)
где low и high - положительное число байт.Использовать
set_write_buffer_limits()
для установки пределов.Добавлено в версии 3.4.2.
-
WriteTransport.
set_write_buffer_limits
(high=None, low=None)¶ Установить high и low водяные знаки для управления потоком записи.
Эти два значения (измеренные в байтах) управляют при вызове
protocol.pause_writing()
protocol.resume_writing()
и методы протокола. Если указан, нижний водяной знак должен быть меньше или равен верхнему водяному знаку. Ни high, ни low не могут быть отрицательными.pause_writing()
вызывается, когда размер буфера становится больше или равен значению high. Если запись была paиспользуемый,resume_writing()
вызывается, когда размер буфера становится меньше или равен значению low.Значения по умолчанию зависят от реализации. Если задан только верхний водяной знак, то нижний водяной знак по умолчанию имеет специфическое для реализации значение, меньшее или равное верхнему водяному знаку. Установка нулевого значения high также приводит к low нулевого значения и вызывает вызов
pause_writing()
всякий раз, когда буфер становится непустым. Установка нулевого значения low приводит к тому, чтоresume_writing()
вызывается только после того, как буфер пуст. Использование нуля для любого предела обычно является субоптимальным, поскольку оно уменьшает возможности одновременного выполнения операций ввода-вывода и вычислений.Использовать
get_write_buffer_limits()
, чтобы получить ограничения.
-
WriteTransport.
write
(data)¶ Запишите несколько data байт в транспорт.
Этот метод не блокируется; он буферизует данные и упорядочивает их асинхронную отправку.
-
WriteTransport.
writelines
(list_of_data)¶ Запись списка (или любого итеративного) байтов данных в транспорт. Это функционально эквивалентно вызову
write()
для каждого элемента, выдаваемого итеративным, но может быть реализовано более эффективно.
-
WriteTransport.
write_eof
()¶ Закрыть конец записи переноса после очистки всех буферизованных данных. Данные все еще могут быть получены.
Эта метод может вызвать
NotImplementedError
, если транспорт (например, SSL) не поддерживает полузакрытые соединения.
Дейтаграммные транспорты¶
-
DatagramTransport.
sendto
(data, addr=None)¶ Отправить data байты удаленному одноранговому узлу, заданному addr (целевой адрес, зависящий от транспорта). Если addr
None
, данные отправляются на целевой адрес, указанный при создании транспорта.Этот метод не блокируется; он буферизует данные и упорядочивает их асинхронную отправку.
-
DatagramTransport.
abort
()¶ Немедленно закрыть транспорт, не ожидая завершения ожидающих операций. Буферизованные данные будут потеряны. Больше данных получено не будет.
protocol.connection_lost()
метод протокола в конечном итоге будет вызван сNone
в качестве его аргумента.
Транспорты подпроцесса¶
-
SubprocessTransport.
get_pid
()¶ Возвращает идентификатор процесса подпроцессы как целое число.
-
SubprocessTransport.
get_pipe_transport
(fd)¶ Возвращает транспорт для пайп связи, соответствующего целочисленному дескриптору файла fd:
0
: читаемый потоковый перенос стандартного входного сигнала (stdin) илиNone
, если подпроцессы не был создан сstdin=PIPE
1
: записываемый потоковый транспорт стандартного выходного сигнала (stdout), илиNone
, если подпроцессы не был создан сstdout=PIPE
2
: записываемый потоковый транспорт стандартной ошибки (stderr) илиNone
, если подпроцессы не был создан сstderr=PIPE
другим fd:None
-
SubprocessTransport.
get_returncode
()¶ Возвращает подпроцессы возвращает код как целое число или
None
, если у этого нет возвращаемого значения, который подобен аттрибутуsubprocess.Popen.returncode
.
-
SubprocessTransport.
kill
()¶ Убить подпроцессы.
В системах POSIX функция посылает подпроцессы SIGKILL. В Windows этот метод является алиас для
terminate()
.См. также
subprocess.Popen.kill()
.
-
SubprocessTransport.
send_signal
(signal)¶ Отправить номер signal в подпроцессы, как в
subprocess.Popen.send_signal()
.
-
SubprocessTransport.
terminate
()¶ Остановите подпроцессы.
В системах POSIX этот метод отправляет SIGTERM в подпроцессы. В Windows для остановки подпроцессы вызывается функция Windows API TerminateProcess ().
См. также
subprocess.Popen.terminate()
.
Протоколы¶
Исходный код: Lib/asyncio/protocols.py
asyncio предоставляет набор абстрактных базовых классов, которые должны быть используемый для реализации сетевых протоколов. Эти классы должны быть используемый вместе с transports.
Подклассы абстрактного базового протокола классы могут реализовывать некоторые или все методы. Все эти методы колбэки: они вызываются переносами по определенным событиям, например, при получении некоторых данных. Базовый протокол метод должен вызываться соответствующим транспортом.
Основные протоколы¶
-
class
asyncio.
BaseProtocol
¶ Базовый протокол с методы, совместно используемыми всеми протоколами.
-
class
asyncio.
Protocol
(BaseProtocol)¶ Базовая класс для реализации потоковых протоколов (TCP, Unix сокеты и т.д.).
-
class
asyncio.
BufferedProtocol
(BaseProtocol)¶ Базовая класс для реализации протоколов потоковой передачи с ручным управлением буфером приема.
-
class
asyncio.
DatagramProtocol
(BaseProtocol)¶ Базовая класс для реализации протоколов дейтаграмм (UDP).
-
class
asyncio.
SubprocessProtocol
(BaseProtocol)¶ Базовая класс для реализации протоколов связи с дочерними процессами (однонаправленный пайпы).
Основной протокол¶
Все asyncio протоколы могут реализовывать колбэки базового протокола.
Connection Callbacks
Колбэки соединений вызываются по всем протоколам, ровно один раз на успешное соединение. Все остальные колбэки протокола могут быть вызваны только между этими двумя методами.
-
BaseProtocol.
connection_made
(transport)¶ Вызывается при установлении соединения.
Аргумент transport - это транспорт, представляющий соединение. Протокол отвечает за хранение ссылки на его транспорт.
-
BaseProtocol.
connection_lost
(exc)¶ Вызывается при потере или закрытии соединения.
Аргумент является объектом исключения или
None
. Последнее означает, что принято регулярное EOF, или соединение было прервано или закрыто этой стороной соединения.
Flow Control Callbacks
Колбэки управления потоком могут вызываться переносами для приостановки или возобновления записи, выполняемой протоколом.
Для получения дополнительной информации см. документацию set_write_buffer_limits()
метод.
-
BaseProtocol.
pause_writing
()¶ Вызывается, когда буфер транспорта переходит через верхний водяной знак.
-
BaseProtocol.
resume_writing
()¶ Вызывается, когда буфер транспорта сливается ниже нижнего водяного знака.
Если размер буфера равен высокому водяному знаку, pause_writing()
не вызывается:
размер буфера должен быть строго превышен.
Наоборот, resume_writing()
вызывается, когда размер буфера равен или меньше нижнего
водяного знака. Эти конечные условия важны для обеспечения того, чтобы все шло
так, как ожидалось, когда любая из отметок равна нулю.
Потоковые протоколы¶
Событие методы, такое как loop.create_server()
, loop.create_unix_server()
, loop.create_connection()
,
loop.create_unix_connection()
, loop.connect_accepted_socket()
, loop.connect_read_pipe()
и loop.connect_write_pipe()
принимает фабрики это возвращает, текущий протоколы.
-
Protocol.
data_received
(data)¶ Вызывается при получении некоторых данных. data - непустой байтовый объект, содержащий входящие данные.
Буферизация, разделение или повторная сборка данных зависит от транспорта. В общем, не стоит полагаться на конкретную семантику и вместо этого делать свой парсинг универсальным и гибким. Однако данные всегда принимаются в правильном порядке.
В то время как соединение открыто, метод можно вызывать произвольное число раз.
Однако
protocol.eof_received()
называют не более одного раза. После вызова „eof _ received ()“data_received()
больше не вызывается.
-
Protocol.
eof_received
()¶ Вызывается, когда другой конец сигнала не посылает больше никаких данных (например, путем вызова
transport.write_eof()
, если другой конец также использует asyncio).Эта метод может возвращает ложное значение (включая
None
), в этом случае транспорт закроется сам. И наоборот, если это метод возвращаетs истинное значение, то протокол используемый определяет, следует ли закрыть транспорт. Поскольку реализация по умолчанию возвращаетsNone
, она неявно закрывает соединение.Некоторые транспорты, включая SSL, не поддерживают полузакрытые соединения, в этом случае возвращаетing true из этого метод приведет к закрытию соединения.
Машина состояний:
start -> connection_made
[-> data_received]*
[-> eof_received]?
-> connection_lost -> end
Буферизированные потоковые протоколы¶
Добавлено в версии 3.7: Важно: это было добавлено к asyncio в Python 3.7 на временной основе! Это экспериментальный API, который можно полностью изменить или удалить в Python 3.8.
Буферизованные протоколы можно используемый с любым событийный цикл метод, поддерживающим Потоковые протоколы.
BufferedProtocol
реализации обеспечивают явное ручное выделение буфера приема и
управление им. Событийный циклs этого можно использовать буфер, предоставляемый
протоколом, чтобы избежать ненужных копий данных. Это может привести к заметному
повышению производительности протоколов, которые получают большие объемы данных.
Сложные реализации протоколов могут значительно сократить количество выделенных
буферов.
На колбэки BufferedProtocol
вызваны следующие сущности:
-
BufferedProtocol.
get_buffer
(sizehint)¶ Вызывается для выделения нового буфера приема.
sizehint - рекомендуемый минимальный размер буфера возвращаемый. Приемлемо возвращает буферов меньшего размера или большего размера, чем это предполагает sizehint. Если установлено значение -1, размер буфера может быть произвольным. Ошибка при возвращает буфера с нулевым размером.
get_buffer()
должен возвращает объект, реализующий buffer protocol.
-
BufferedProtocol.
buffer_updated
(nbytes)¶ Вызывается при обновлении буфера принятыми данными.
nbytes - общее количество байтов, записанных в буфер.
-
BufferedProtocol.
eof_received
()¶ См. документацию по
protocol.eof_received()
метод.
get_buffer()
можно вызывать произвольное число раз во время соединения. Однако
protocol.eof_received()
вызывается максимум один раз и, если называется, get_buffer()
и
buffer_updated()
не будут вызываться после него.
Машина состояний:
start -> connection_made
[-> get_buffer
[-> buffer_updated]?
]*
[-> eof_received]?
-> connection_lost -> end
Дейтаграммные протоколы¶
Сущности протокола дейтаграмм должны создаваться фабриками протоколов,
переданными loop.create_datagram_endpoint()
метод.
-
DatagramProtocol.
datagram_received
(data, addr)¶ Вызывается при получении дейтаграммы. data - это байтовый объект, содержащий входящие данные. addr - адрес равноправного объекта, посылающего данные; точный формат зависит от транспорта.
-
DatagramProtocol.
error_received
(exc)¶ Вызывается, когда предыдущая операция отправки или получения вызывает
OSError
. exc -OSError
сущность.Метод вызывается в редких случаях, когда транспорт (например, UDP) обнаруживает, что дейтаграмма не может быть доставлена получателю. Однако во многих условиях непоставляемые дейтаграммы будут отбрасываться без предупреждения.
Примечание
В BSD системах (macOS, FreeBSD и т.д.) управление потоками не поддерживается для протоколов дейтаграмм, потому что нет надежного способа обнаружить сбой при отправке, вызванные записью слишком большого количества пакетов.
Сокет всегда отображается как «готовый», и избыточные пакеты отбрасываются.
OSError
с errno
, установленным в errno.ENOBUFS
, может быть
поднят или не поднят; Если он будет поднят, о нем будет сообщено
DatagramProtocol.error_received()
, но в противном случае он будет
проигнорирован.
Протоколы подпроцесса¶
Сущности протокола дейтаграмм должны создаваться фабриками протоколов,
переданными loop.subprocess_exec()
и loop.subprocess_shell()
методы.
-
SubprocessProtocol.
pipe_data_received
(fd, data)¶ Вызывается, когда дочерний процесс записывает данные в пайп stdout или stderr.
fd - целочисленный дескриптор файла пайп.
data - непустой байтовый объект, содержащий полученные данные.
-
SubprocessProtocol.
pipe_connection_lost
(fd, exc)¶ Вызывается, когда один из пайпы, взаимодействующих с дочерним процессом, закрыт.
fd является целочисленным дескриптором файла, который был закрыт.
-
SubprocessProtocol.
process_exited
()¶ Вызывается после завершения дочернего процесса.
Примеры¶
Эхо сервер TCP¶
Создайте сервер TCP echo с помощью loop.create_server()
метод, отправьте обратно
полученные данные и закрыть соединение:
import asyncio
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
print('Send: {!r}'.format(message))
self.transport.write(data)
print('Close the client socket')
self.transport.close()
async def main():
# Получить ссылку на событийный цикл, как мы планируем использовать
# низкоуровневое API
loop = asyncio.get_running_loop()
server = await loop.create_server(
lambda: EchoServerProtocol(),
'127.0.0.1', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())
См.также
В TCP эхо сервер использующий потоки примереиспользуется функция asyncio.start_server()
высокого уровня.
Эхо клиент TCP¶
Клиент TCP echo, использующий loop.create_connection()
метод, отправляет данные и
ожидает закрытия соединения:
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
def connection_made(self, transport):
transport.write(self.message.encode())
print('Data sent: {!r}'.format(self.message))
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
self.on_con_lost.set_result(True)
async def main():
# Получите ссылку на событийный цикл, как мы планируем использовать
# низкоуровневое API
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = 'Hello World!'
transport, protocol = await loop.create_connection(
lambda: EchoClientProtocol(message, on_con_lost),
'127.0.0.1', 8888)
# Дождитесь сигнала протокола о потере соединения и
# закрыть транспорт.
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
См.также
В TCP echo client using streams примере используется функция asyncio.open_connection()
высокого уровня.
Эхо сервер UDP¶
Эхо-сервер UDP, используя loop.create_datagram_endpoint()
метод, отправляет обратно полученные
данные:
import asyncio
class EchoServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print('Received %r from %s' % (message, addr))
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
async def main():
print("Starting UDP server")
# Получите ссылку на событийный цикл, как мы планируем использовать
# низкоуровневое API
loop = asyncio.get_running_loop()
# Для обслуживания всех будет создан один сущность протокола
# запросы клиента.
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoServerProtocol(),
local_addr=('127.0.0.1', 9999))
try:
await asyncio.sleep(3600) # Serve for 1 hour.
finally:
transport.close()
asyncio.run(main())
Эхо клиент UDP¶
Эхо-клиент UDP, используя loop.create_datagram_endpoint()
метод, отправляет данные и закрывает
транспорт при получении ответа:
import asyncio
class EchoClientProtocol:
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
self.transport = None
def connection_made(self, transport):
self.transport = transport
print('Send:', self.message)
self.transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
print("Received:", data.decode())
print("Close the socket")
self.transport.close()
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print("Connection closed")
self.on_con_lost.set_result(True)
async def main():
# Получить ссылку на событийный цикл, как мы планируем использовать
# низкоуровневое API
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = "Hello World!"
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoClientProtocol(message, on_con_lost),
remote_addr=('127.0.0.1', 9999))
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
Соединение существующих сокетов¶
Дождитесь, пока сокет получит данные с использованием loop.create_connection()
,
метод с протоколом:
import asyncio
import socket
class MyProtocol(asyncio.Protocol):
def __init__(self, on_con_lost):
self.transport = None
self.on_con_lost = on_con_lost
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print("Received:", data.decode())
# Мы закончили: закрыть транспорт;
# connection_lost() будет вызван автоматически.
self.transport.close()
def connection_lost(self, exc):
# Этот сокет закрыт
self.on_con_lost.set_result(True)
async def main():
# Получить ссылку на событийный цикл, как мы планируем использовать
# низкоуровневое API
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
# Создать пару подключённых сокетов
rsock, wsock = socket.socketpair()
# Зарегистрируйте сокет для ожидания данных.
transport, protocol = await loop.create_connection(
lambda: MyProtocol(on_con_lost), sock=rsock)
# Смоделируйте прием данных из сети.
loop.call_soon(wsock.send, 'abc'.encode())
try:
await protocol.on_con_lost
finally:
transport.close()
wsock.close()
asyncio.run(main())
См.также
В watch a file descriptor for read events примере
для регистрации FD используется низкоуровневое loop.add_reader()
метод.
В зарегистрировать открытый сокет для ожидания данных с помощью потоков
примере используются потоки высокого уровня, созданные функцией open_connection()
в корутине.
loop.subprocess_exec() и SubprocessProtocol¶
Примером протокола подпроцессы используемый получение выходных данных подпроцессы и ожидание выхода подпроцессы.
Подпроцесс создается методом loop.subprocess_exec()
:
import asyncio
import sys
class DateProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future):
self.exit_future = exit_future
self.output = bytearray()
def pipe_data_received(self, fd, data):
self.output.extend(data)
def process_exited(self):
self.exit_future.set_result(True)
async def get_date():
# Получить ссылку на событийный цикл, как мы планируем использовать
# низкоуровневое API.
loop = asyncio.get_running_loop()
code = 'import datetime; print(datetime.datetime.now())'
exit_future = asyncio.Future(loop=loop)
# Создайте подпроцессы, которым управляет DateProtocol;
# перенаправить стандартный вывод в пайп.
transport, protocol = await loop.subprocess_exec(
lambda: DateProtocol(exit_future),
sys.executable, '-c', code,
stdin=None, stderr=None)
# Дождаться выхода подпроцесса, используя process_exited()
# метод протокола.
await exit_future
# Закрытие пайп stdout.
transport.close()
# Прочитать выходные данные, собранные
# pipe_data_received() методом протокола.
data = bytes(protocol.output)
return data.decode('ascii').rstrip()
date = asyncio.run(get_date())
print(f"Current date: {date}")
См. также некоторые примеры, написанные с использованием API высокого уровня.