Потоки¶
Исходный код: Lib/asyncio/streams.py
Потоки являются высокоуровневыми примитивами async/await для работы с сетевыми соединениями. Потоки позволяют отправлять и принимать данные без использования колбэков или низкоуровневых протоколов и транспортов.
Ниже приведен пример TCP эхо-клиента, записанного с использованием потоков asyncio:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
См. также раздел Примеры ниже.
Stream Functions
Для создания и работы с потоками можно asyncio следующие функции используемый верхнего уровня
-
coroutine
asyncio.
open_connection
(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)¶ Установить сетевое соединение и возвращает пару объектов
(reader, writer)
.Объекты возвращенный reader и writer - сущности
StreamReader
иStreamWriter
классы.Аргумент loop необязателен и всегда может быть определен автоматически при awaited этой функции из корутина.
limit решает, что размер буфера ограничивает используемый возвращенный
StreamReader
сущность. По умолчанию для limit установлено значение 64 кБ.Остальные аргументы передаются непосредственно
loop.create_connection()
.Добавлено в версии 3.7: Параметр ssl_handshake_timeout.
-
coroutine
asyncio.
start_server
(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)¶ Запуск сервера сокетов.
При установлении нового клиентского соединения вызывается client_connected_cb колбэк. Он принимает пару
(reader, writer)
как два аргумента, сущностиStreamReader
иStreamWriter
классы.client_connected_cb может быть обычной вызываемой или coroutine function; если это функция корутина, она будет автоматически запланирована как
Task
.Аргумент loop необязателен и всегда может быть определен автоматически при метод этого awaited из корутина.
limit решает, что размер буфера ограничивает используемый возвращенный
StreamReader
сущность. По умолчанию для limit установлено значение 64 КБ.Остальные аргументы передаются непосредственно
loop.create_server()
.Добавлено в версии 3.7: Параметры ssl_handshake_timeout и start_serving.
Unix Sockets
-
coroutine
asyncio.
open_unix_connection
(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)¶ Установить сокет соединение Unix и возвращает пару
(reader, writer)
.Аналогичен
open_connection()
, но работает на Unix-сокеты.См. также документацию
loop.create_unix_connection()
.Availability: Unix.
Добавлено в версии 3.7: Параметр ssl_handshake_timeout.
Изменено в версии 3.7: Теперь параметр path может быть путеподобный объект
-
coroutine
asyncio.
start_unix_server
(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)¶ Запустите сервер Unix сокет.
Похож на
start_server()
, но работает с Unix сокетами.См. также документацию
loop.create_unix_server()
.Availability: Unix.
Добавлено в версии 3.7: Параметры ssl_handshake_timeout и start_serving.
Изменено в версии 3.7: Теперь параметр path может быть путеподобный объект.
StreamReader¶
-
class
asyncio.
StreamReader
¶ Представляет объект чтения, предоставляющий API для чтения данных из потока ввода-вывода.
Не рекомендуется создавать экземпляры StreamReader объектов напрямую; вместо этого используйте
open_connection()
иstart_server()
.-
coroutine
read
(n=-1)¶ Чтение до n байт. Если n не указан или установлен в
-1
, считывайте до EOF и возвращает все считанные байты.Если получено сообщение EOF и внутренний буфер пуст, возвращает пустой объект
bytes
.
-
coroutine
readline
()¶ Прочитайте одну строку, где «line» - последовательность байтов, оканчивающихся на
\n
.Если функция EOF принята и
\n
не найдена, метод возвращаетs частично считывать данные.Если получено сообщение EOF и внутренний буфер пуст, возвращает пустой объект
bytes
.
-
coroutine
readexactly
(n)¶ Прочитайте ровно n байт.
Поднимите
IncompleteReadError
, если достигнуто значение EOF, прежде чем n можно будет прочитать. Используйте атрибутIncompleteReadError.partial
для получения частично считанных данных.
-
coroutine
readuntil
(separator=b'\n')¶ Считывайте данные из потока до тех пор, пока separator не будет найден.
После успешного завершения данные и разделитель будут удалены из внутреннего буфера (израсходованы). Возвращаетed данные будут включать разделитель в конце.
Если объем считываемых данных превышает заданный предел потока, возникает
LimitOverrunError
исключение, и данные остаются во внутреннем буфере и могут быть считаны повторно.Если EOF достигается до обнаружения полного разделителя, возникает
IncompleteReadError
исключение и внутренний буфер сбрасывается. АтрибутIncompleteReadError.partial
может содержать часть разделителя.Добавлено в версии 3.5.2.
-
at_eof
()¶ Возвращает
True
, если буфер пуст и был вызванfeed_eof()
.
-
coroutine
StreamWriter¶
-
class
asyncio.
StreamWriter
¶ Представляет объект модуля записи, предоставляющий API для записи данных в поток ввода-вывода.
Не рекомендуется создавать экземпляры StreamWriter объектов напрямую; вместо этого используйте
open_connection()
иstart_server()
.-
write
(data)¶ метод пытается немедленно записать data в основной сокет. Если это не удается, данные помещаются во внутренний буфер записи до тех пор, пока они не будут отправлены.
Следует метод используемый вместе с
drain()
метод:stream.write(data) await stream.drain()
-
writelines
(data)¶ Этот метод немедленно записывает список (или любой итерируемый) байтов в базовый сокет. Если это не удается, данные помещаются во внутренний буфер записи до тех пор, пока они не будут отправлены.
Следует метод используемый вместе с
drain()
метод:stream.writelines(lines) await stream.drain()
-
close
()¶ Этот метод закрывает поток и лежащие в его основе сокет.
Следует метод используемый вместе с
wait_closed()
метод:stream.close() await stream.wait_closed()
-
can_write_eof
()¶ Возвращает
True
, поддерживает ли базовый транспортwrite_eof()
метод,False
в противном случае.
-
write_eof
()¶ Закройте конец записи потока после сброса буферизованных данных записи.
-
transport
¶ Возвращает базовый asyncio транспорт.
-
get_extra_info
(name, default=None)¶ Доступ к дополнительной транспортной информации; дополнительные сведения см. в разделе
BaseTransport.get_extra_info()
.
-
coroutine
drain
()¶ Дождитесь возобновления записи в поток. Пример:
writer.write(data) await writer.drain()
Это метод управления потоком, который взаимодействует с основным буфером записи ввода-вывода. Когда размер буфера достигает высокого водяного знака, drain() блокируется до тех пор, пока размер буфера не дренируется до низкого водяного знака, и запись может быть возобновлена. Когда ждать нечего,
drain()
сразу возвращаетs.
-
is_closing
()¶ Возвращает
True
, закрыт ли поток или находится в процессе закрытия.Добавлено в версии 3.7.
-
Примеры¶
Клиент TCP echo использует потоки¶
Эхо-клиент TCP с помощью функции asyncio.open_connection()
:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
asyncio.run(tcp_echo_client('Hello World!'))
См.также
Пример протокола TCP эхо клиента
использует низкоуровневое loop.create_connection()
метод.
Сервер TCP echo использует потоки¶
Эхо-сервер TCP с помощью функции asyncio.start_server()
:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()
asyncio.run(main())
См.также
В TCP echo server protocol примере используется loop.create_server()
метод.
Получите заголовки HTTP¶
Простой пример запроса HTTP-заголовков URL, переданных в командной строке:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Игнорировать тело, закрыть сокет
writer.close()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
Использование:
python example.py http://example.com/path/page.html
или с HTTPS:
python example.py https://example.com/path/page.html
Зарегистрировать открытый сокет для ожидать данных с помощью потоков¶
Корутин ожидает, пока сокет не получит данные с помощью функции open_connection()
:
import asyncio
import socket
async def wait_for_data():
# Получить ссылку на текущий цикл событий, потому что
# мы хотим получить доступ к низкоуровневым API.
loop = asyncio.get_running_loop()
# Создайте пару соединенных сокетов.
rsock, wsock = socket.socketpair()
# Зарегистрируйте открытый сокет, чтобы дождаться данных.
reader, writer = await asyncio.open_connection(sock=rsock)
# Имитация приема данных из сети
loop.call_soon(wsock.send, 'abc'.encode())
# Ждать данных
data = await reader.read(100)
# Получить данные, если все готово: закрыть сокет
print("Received:", data.decode())
writer.close()
# Закрыть второй сокет
wsock.close()
asyncio.run(wait_for_data())
См.также
Пример зарегистририровать открытый сокет для ожидания данных с помощью
протокола использует протокол
низкоуровневое и loop.create_connection()
метод.
Пример смотрите файловый дескриптор для событий чтения
использует низкоуровневое loop.add_reader()
метод, чтобы
наблюдения файлового дискриптора.