Потоки

Исходный код: Lib/asyncio/streams.py


Потоки являются высокоуровневыми примитивами async/await для работы с сетевыми соединениями. Потоки позволяют отправлять и принимать данные без использования колбэков или низкоуровневых протоколов и транспортов.

Ниже приведен пример TCP эхо-клиента, записанного с использованием потоков asyncio:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

См. также раздел Примеры ниже.

Stream Functions

Для создания и работы с потоками можно asyncio следующие функции используемый верхнего уровня

coroutine asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)

Установить сетевое соединение и возвращает пару объектов (reader, writer).

Объекты возвращенный reader и writer - сущности StreamReader и StreamWriter классы.

Аргумент loop необязателен и всегда может быть определен автоматически при awaited этой функции из корутина.

limit решает, что размер буфера ограничивает используемый возвращенный StreamReader сущность. По умолчанию для limit установлено значение 64 кБ.

Остальные аргументы передаются непосредственно loop.create_connection().

Добавлено в версии 3.7: Параметр ssl_handshake_timeout.

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

Запуск сервера сокетов.

При установлении нового клиентского соединения вызывается client_connected_cb колбэк. Он принимает пару (reader, writer) как два аргумента, сущности StreamReader и StreamWriter классы.

client_connected_cb может быть обычной вызываемой или coroutine function; если это функция корутина, она будет автоматически запланирована как Task.

Аргумент loop необязателен и всегда может быть определен автоматически при метод этого awaited из корутина.

limit решает, что размер буфера ограничивает используемый возвращенный StreamReader сущность. По умолчанию для limit установлено значение 64 КБ.

Остальные аргументы передаются непосредственно loop.create_server().

Добавлено в версии 3.7: Параметры ssl_handshake_timeout и start_serving.

Unix Sockets

coroutine asyncio.open_unix_connection(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)

Установить сокет соединение Unix и возвращает пару (reader, writer).

Аналогичен open_connection(), но работает на Unix-сокеты.

См. также документацию loop.create_unix_connection().

Availability: Unix.

Добавлено в версии 3.7: Параметр ssl_handshake_timeout.

Изменено в версии 3.7: Теперь параметр path может быть путеподобный объект

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

Запустите сервер Unix сокет.

Похож на start_server(), но работает с Unix сокетами.

См. также документацию loop.create_unix_server().

Availability: Unix.

Добавлено в версии 3.7: Параметры ssl_handshake_timeout и start_serving.

Изменено в версии 3.7: Теперь параметр path может быть путеподобный объект.

StreamReader

class asyncio.StreamReader

Представляет объект чтения, предоставляющий API для чтения данных из потока ввода-вывода.

Не рекомендуется создавать экземпляры StreamReader объектов напрямую; вместо этого используйте open_connection() и start_server().

coroutine read(n=-1)

Чтение до n байт. Если n не указан или установлен в -1, считывайте до EOF и возвращает все считанные байты.

Если получено сообщение EOF и внутренний буфер пуст, возвращает пустой объект bytes.

coroutine readline()

Прочитайте одну строку, где «line» - последовательность байтов, оканчивающихся на \n.

Если функция EOF принята и \n не найдена, метод возвращаетs частично считывать данные.

Если получено сообщение EOF и внутренний буфер пуст, возвращает пустой объект bytes.

coroutine readexactly(n)

Прочитайте ровно n байт.

Поднимите IncompleteReadError, если достигнуто значение EOF, прежде чем n можно будет прочитать. Используйте атрибут IncompleteReadError.partial для получения частично считанных данных.

coroutine readuntil(separator=b'\n')

Считывайте данные из потока до тех пор, пока separator не будет найден.

После успешного завершения данные и разделитель будут удалены из внутреннего буфера (израсходованы). Возвращаетed данные будут включать разделитель в конце.

Если объем считываемых данных превышает заданный предел потока, возникает LimitOverrunError исключение, и данные остаются во внутреннем буфере и могут быть считаны повторно.

Если EOF достигается до обнаружения полного разделителя, возникает IncompleteReadError исключение и внутренний буфер сбрасывается. Атрибут IncompleteReadError.partial может содержать часть разделителя.

Добавлено в версии 3.5.2.

at_eof()

Возвращает True, если буфер пуст и был вызван feed_eof().

StreamWriter

class asyncio.StreamWriter

Представляет объект модуля записи, предоставляющий API для записи данных в поток ввода-вывода.

Не рекомендуется создавать экземпляры StreamWriter объектов напрямую; вместо этого используйте open_connection() и start_server().

write(data)

метод пытается немедленно записать data в основной сокет. Если это не удается, данные помещаются во внутренний буфер записи до тех пор, пока они не будут отправлены.

Следует метод используемый вместе с drain() метод:

stream.write(data)
await stream.drain()
writelines(data)

Этот метод немедленно записывает список (или любой итерируемый) байтов в базовый сокет. Если это не удается, данные помещаются во внутренний буфер записи до тех пор, пока они не будут отправлены.

Следует метод используемый вместе с drain() метод:

stream.writelines(lines)
await stream.drain()
close()

Этот метод закрывает поток и лежащие в его основе сокет.

Следует метод используемый вместе с wait_closed() метод:

stream.close()
await stream.wait_closed()
can_write_eof()

Возвращает True, поддерживает ли базовый транспорт write_eof() метод, False в противном случае.

write_eof()

Закройте конец записи потока после сброса буферизованных данных записи.

transport

Возвращает базовый asyncio транспорт.

get_extra_info(name, default=None)

Доступ к дополнительной транспортной информации; дополнительные сведения см. в разделе BaseTransport.get_extra_info().

coroutine drain()

Дождитесь возобновления записи в поток. Пример:

writer.write(data)
await writer.drain()

Это метод управления потоком, который взаимодействует с основным буфером записи ввода-вывода. Когда размер буфера достигает высокого водяного знака, drain() блокируется до тех пор, пока размер буфера не дренируется до низкого водяного знака, и запись может быть возобновлена. Когда ждать нечего, drain() сразу возвращаетs.

is_closing()

Возвращает True, закрыт ли поток или находится в процессе закрытия.

Добавлено в версии 3.7.

coroutine wait_closed()

Дождитесь закрытия потока.

Следует вызвать после close(), чтобы дождаться закрытия базового соединения.

Добавлено в версии 3.7.

Примеры

Клиент TCP echo использует потоки

Эхо-клиент TCP с помощью функции asyncio.open_connection():

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()

asyncio.run(tcp_echo_client('Hello World!'))

См.также

Пример протокола TCP эхо клиента использует низкоуровневое loop.create_connection() метод.

Сервер TCP echo использует потоки

Эхо-сервер TCP с помощью функции asyncio.start_server():

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

См.также

В TCP echo server protocol примере используется loop.create_server() метод.

Получите заголовки HTTP

Простой пример запроса HTTP-заголовков URL, переданных в командной строке:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Игнорировать тело, закрыть сокет
    writer.close()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Использование:

python example.py http://example.com/path/page.html

или с HTTPS:

python example.py https://example.com/path/page.html

Зарегистрировать открытый сокет для ожидать данных с помощью потоков

Корутин ожидает, пока сокет не получит данные с помощью функции open_connection():

import asyncio
import socket

async def wait_for_data():
    # Получить ссылку на текущий цикл событий, потому что
    # мы хотим получить доступ к низкоуровневым API.
    loop = asyncio.get_running_loop()

    # Создайте пару соединенных сокетов.
    rsock, wsock = socket.socketpair()

    # Зарегистрируйте открытый сокет, чтобы дождаться данных.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Имитация приема данных из сети
    loop.call_soon(wsock.send, 'abc'.encode())

    # Ждать данных
    data = await reader.read(100)

    # Получить данные, если все готово: закрыть сокет
    print("Received:", data.decode())
    writer.close()

    # Закрыть второй сокет
    wsock.close()

asyncio.run(wait_for_data())

См.также

Пример зарегистририровать открытый сокет для ожидания данных с помощью протокола использует протокол низкоуровневое и loop.create_connection() метод.

Пример смотрите файловый дескриптор для событий чтения использует низкоуровневое loop.add_reader() метод, чтобы наблюдения файлового дискриптора.