Транспорты и протоколы

Preface

Транспорты и протоколы используемый низкоуровневый API событийного цикла, например, loop.create_connection(). Они используют колбэк-based стиль программирования и позволяют высокопроизводительные реализации сетевых или IPC протоколов (например, HTTP).

По сути, транспорты и протоколы должны используемый только в библиотеках и фреймворков, а не в asyncio приложениях высокого уровня.

Эта страница документации охватывает оба Транспорты and Протоколы.

Introduction

На самом высоком уровне транспорт связан с тем, как передаются байты, в то время как протокол определяет, какие байты передавать (и в какой-то степени, когда).

Другой способ сказать одно и то же: транспорт - это абстракция для сокет (или аналогичной конечной точки ввода-вывода), а протокол - это абстракция для приложения, с точки зрения транспорта.

Еще одна точка зрения состоит в том, что интерфейсы транспорта и протокола вместе определяют абстрактный интерфейс для использования сетевого ввода-вывода и межпроцессного ввода-вывода.

Всегда существует связь 1:1 между транспортом и объектами протокола: протокол вызывает транспортные методы для передачи данных, в то время как протокол транспортных вызовов методы для передачи ему данных, которые были приняты.

Большинство событийный цикл методы, ориентированных на соединение (например, loop.create_connection()), обычно принимают аргумент protocol_factory, используемый для создания Protocol объекта для принятого соединения, представленного Transport объектом. Такие методы обычно возвращ кортеж (transport, protocol).

Contents

Эта страница документации содержит следующие разделы

Транспорты

Исходный код: Lib/asyncio/transports.py


Транспорт классы обеспечивается asyncio для абстракции различных видов каналов связи.

Транспортные объекты всегда создаются событийным циклом asyncio.

asyncio реализует перенос для TCP, UDP, SSL и подпроцессы пайпы. Имеющиеся на транспорте методы зависят от вида транспорта.

Транспортные классы не потокобезопасно.

Иерархия транспортов

class asyncio.BaseTransport

Базовая класс для всех транспортировок. Содержит методы, совместно используемые всеми asyncio транспортами.

class asyncio.WriteTransport(BaseTransport)

Базовый транспорт для соединений только для записи.

Случаи WriteTransport класс - возвращаетed от loop.connect_write_pipe() событийный цикл метод и являются также используемый подпроцессы-связанным методы как loop.subprocess_exec().

class asyncio.ReadTransport(BaseTransport)

Базовый транспорт для соединений только для чтения.

Случаи ReadTransport класс - возвращаетed от loop.connect_read_pipe() событийный цикл метод и являются также используемый подпроцессы-связанным методы как loop.subprocess_exec().

class asyncio.Transport(WriteTransport, ReadTransport)

Интерфейс, представляющий двунаправленный транспорт, например TCP-соединение.

Пользователь не создает экземпляр транспорта непосредственно; они вызывают функцию утилиты, передавая ей фабрику протокола и другую информацию, необходимую для создания транспорта и протокола.

Случаи Transport класс - возвращаетed от или используемый событийный цикл методы как loop.create_connection(), loop.create_unix_connection(), loop.create_server(), loop.sendfile(), и т.д.

class asyncio.DatagramTransport(BaseTransport)

Транспорт для соединений дейтаграмм (UDP).

Сущности DatagramTransport класс - возвращаетed от loop.create_datagram_endpoint() событийный цикл метод.

class asyncio.SubprocessTransport(BaseTransport)

Абстракция, представляющая соединение между родительской и дочерней ОС - процессами.

Сужности SubprocessTransport класс - возвращенный от событийный цикл методы loop.subprocess_shell() и loop.subprocess_exec().

Основной транспорт

BaseTransport.close()

Закрыть транспорт.

Если транспорт имеет буфер для исходящих данных, буферизованные данные будут очищены асинхронно. Больше данных получено не будет. После удаления всех буферизованных данных protocol.connection_lost() метод протокола вызывается с None в качестве аргумента.

BaseTransport.is_closing()

Возвращает True, закрывается ли транспортировка.

BaseTransport.get_extra_info(name, default=None)

Возвращает сведения о транспорте или базовых ресурсах, которые он использует.

name представляет собой строка, представляющую часть транспортной информации, которую необходимо получить.

default значение, которое необходимо возвращает, если информация недоступна или если транспорт не поддерживает запрос к ней с данной реализацией событийный цикл третьей стороны или на текущей платформе.

Например, следующая код пытается получить базовый сокет объект транспорта:

sock = transport.get_extra_info('socket')
if sock is not None:
    print(sock.getsockopt(...))

Категории информации, которая может быть запрошена для некоторых транспортов:

  • сокет:
  • SSL сокет:
    • 'compression': алгоритм сжатия, используемый как строка, или None, если соединение не сжато; результат ssl.SSLSocket.compression()
    • 'cipher': трехзначный кортеж, содержащий имя используемый шифра, версию протокола SSL, определяющего его использование, и количество используемый секретных битов; результат ssl.SSLSocket.cipher()
    • 'peercert': свидетельство ровесника; результат ssl.SSLSocket.getpeercert()
    • 'sslcontext': ssl.SSLContext сущность
    • 'ssl_object': ssl.SSLObject или ssl.SSLSocket сущность
  • пайп:
    • 'pipe': объект пайп
  • подпроцессы:
BaseTransport.set_protocol(protocol)

Установить новый протокол.

Протокол коммутации должен выполняться только в том случае, если оба протокола задокументированы для поддержки коммутатора.

BaseTransport.get_protocol()

Возвращает текущий протокол.

Транспоры только для чтения

ReadTransport.is_reading()

Возвращает True, получает ли транспорт новые данные.

Добавлено в версии 3.7.

ReadTransport.pause_reading()

Приостановка приемного конца транспорта. Данные не будут передаваться в protocol.data_received() метод протокола, пока resume_reading() не будет вызван.

Изменено в версии 3.7: Идемпотентный метод, то есть его можно вызвать, когда транспорт уже paиспользуемый или закрыт.

ReadTransport.resume_reading()

Возобновляет прием. protocol.data_received() метод протокола будет вызван еще раз, если какие-то данные будут доступны для чтения.

Изменено в версии 3.7: Идемпотентный метод, то есть его можно вызвать, когда транспорт уже читает.

Транспорты только для записи

WriteTransport.abort()

Немедленно закрыть транспорт, не ожидая завершения ожидающих операций. Буферизованные данные будут потеряны. Больше данных получено не будет. protocol.connection_lost() метод протокола в конечном итоге будет вызван с None в качестве его аргумента.

WriteTransport.can_write_eof()

Возвращает True если транспорт поддерживает write_eof(), False если нет.

WriteTransport.get_write_buffer_size()

Возвращает текущего размера выходного буфера, используемый транспортом.

WriteTransport.get_write_buffer_limits()

Получить high и low водяные знаки для управления потоком записи. Возвращает кортеж (low, high) где low и high - положительное число байт.

Использовать set_write_buffer_limits() для установки пределов.

Добавлено в версии 3.4.2.

WriteTransport.set_write_buffer_limits(high=None, low=None)

Установить high и low водяные знаки для управления потоком записи.

Эти два значения (измеренные в байтах) управляют при вызове protocol.pause_writing() protocol.resume_writing() и методы протокола. Если указан, нижний водяной знак должен быть меньше или равен верхнему водяному знаку. Ни high, ни low не могут быть отрицательными.

pause_writing() вызывается, когда размер буфера становится больше или равен значению high. Если запись была paиспользуемый, resume_writing() вызывается, когда размер буфера становится меньше или равен значению low.

Значения по умолчанию зависят от реализации. Если задан только верхний водяной знак, то нижний водяной знак по умолчанию имеет специфическое для реализации значение, меньшее или равное верхнему водяному знаку. Установка нулевого значения high также приводит к low нулевого значения и вызывает вызов pause_writing() всякий раз, когда буфер становится непустым. Установка нулевого значения low приводит к тому, что resume_writing() вызывается только после того, как буфер пуст. Использование нуля для любого предела обычно является субоптимальным, поскольку оно уменьшает возможности одновременного выполнения операций ввода-вывода и вычислений.

Использовать get_write_buffer_limits(), чтобы получить ограничения.

WriteTransport.write(data)

Запишите несколько data байт в транспорт.

Этот метод не блокируется; он буферизует данные и упорядочивает их асинхронную отправку.

WriteTransport.writelines(list_of_data)

Запись списка (или любого итеративного) байтов данных в транспорт. Это функционально эквивалентно вызову write() для каждого элемента, выдаваемого итеративным, но может быть реализовано более эффективно.

WriteTransport.write_eof()

Закрыть конец записи переноса после очистки всех буферизованных данных. Данные все еще могут быть получены.

Эта метод может вызвать NotImplementedError, если транспорт (например, SSL) не поддерживает полузакрытые соединения.

Дейтаграммные транспорты

DatagramTransport.sendto(data, addr=None)

Отправить data байты удаленному одноранговому узлу, заданному addr (целевой адрес, зависящий от транспорта). Если addr None, данные отправляются на целевой адрес, указанный при создании транспорта.

Этот метод не блокируется; он буферизует данные и упорядочивает их асинхронную отправку.

DatagramTransport.abort()

Немедленно закрыть транспорт, не ожидая завершения ожидающих операций. Буферизованные данные будут потеряны. Больше данных получено не будет. protocol.connection_lost() метод протокола в конечном итоге будет вызван с None в качестве его аргумента.

Транспорты подпроцесса

SubprocessTransport.get_pid()

Возвращает идентификатор процесса подпроцессы как целое число.

SubprocessTransport.get_pipe_transport(fd)

Возвращает транспорт для пайп связи, соответствующего целочисленному дескриптору файла fd:

  • 0: читаемый потоковый перенос стандартного входного сигнала (stdin) или None, если подпроцессы не был создан с stdin=PIPE 1: записываемый потоковый транспорт стандартного выходного сигнала (stdout), или None, если подпроцессы не был создан с stdout=PIPE 2: записываемый потоковый транспорт стандартной ошибки (stderr) или None, если подпроцессы не был создан с stderr=PIPE другим fd: None
SubprocessTransport.get_returncode()

Возвращает подпроцессы возвращает код как целое число или None, если у этого нет возвращаемого значения, который подобен аттрибуту subprocess.Popen.returncode.

SubprocessTransport.kill()

Убить подпроцессы.

В системах POSIX функция посылает подпроцессы SIGKILL. В Windows этот метод является алиас для terminate().

См. также subprocess.Popen.kill().

SubprocessTransport.send_signal(signal)

Отправить номер signal в подпроцессы, как в subprocess.Popen.send_signal().

SubprocessTransport.terminate()

Остановите подпроцессы.

В системах POSIX этот метод отправляет SIGTERM в подпроцессы. В Windows для остановки подпроцессы вызывается функция Windows API TerminateProcess ().

См. также subprocess.Popen.terminate().

SubprocessTransport.close()

Убить подпроцессы, позвонив kill() метод.

Если у подпроцессы еще нет возвращаемых и близких транспортных stdin, stdout и stderr пайпов.

Протоколы

Исходный код: Lib/asyncio/protocols.py


asyncio предоставляет набор абстрактных базовых классов, которые должны быть используемый для реализации сетевых протоколов. Эти классы должны быть используемый вместе с transports.

Подклассы абстрактного базового протокола классы могут реализовывать некоторые или все методы. Все эти методы колбэки: они вызываются переносами по определенным событиям, например, при получении некоторых данных. Базовый протокол метод должен вызываться соответствующим транспортом.

Основные протоколы

class asyncio.BaseProtocol

Базовый протокол с методы, совместно используемыми всеми протоколами.

class asyncio.Protocol(BaseProtocol)

Базовая класс для реализации потоковых протоколов (TCP, Unix сокеты и т.д.).

class asyncio.BufferedProtocol(BaseProtocol)

Базовая класс для реализации протоколов потоковой передачи с ручным управлением буфером приема.

class asyncio.DatagramProtocol(BaseProtocol)

Базовая класс для реализации протоколов дейтаграмм (UDP).

class asyncio.SubprocessProtocol(BaseProtocol)

Базовая класс для реализации протоколов связи с дочерними процессами (однонаправленный пайпы).

Основной протокол

Все asyncio протоколы могут реализовывать колбэки базового протокола.

Connection Callbacks

Колбэки соединений вызываются по всем протоколам, ровно один раз на успешное соединение. Все остальные колбэки протокола могут быть вызваны только между этими двумя методами.

BaseProtocol.connection_made(transport)

Вызывается при установлении соединения.

Аргумент transport - это транспорт, представляющий соединение. Протокол отвечает за хранение ссылки на его транспорт.

BaseProtocol.connection_lost(exc)

Вызывается при потере или закрытии соединения.

Аргумент является объектом исключения или None. Последнее означает, что принято регулярное EOF, или соединение было прервано или закрыто этой стороной соединения.

Flow Control Callbacks

Колбэки управления потоком могут вызываться переносами для приостановки или возобновления записи, выполняемой протоколом.

Для получения дополнительной информации см. документацию set_write_buffer_limits() метод.

BaseProtocol.pause_writing()

Вызывается, когда буфер транспорта переходит через верхний водяной знак.

BaseProtocol.resume_writing()

Вызывается, когда буфер транспорта сливается ниже нижнего водяного знака.

Если размер буфера равен высокому водяному знаку, pause_writing() не вызывается: размер буфера должен быть строго превышен.

Наоборот, resume_writing() вызывается, когда размер буфера равен или меньше нижнего водяного знака. Эти конечные условия важны для обеспечения того, чтобы все шло так, как ожидалось, когда любая из отметок равна нулю.

Потоковые протоколы

Событие методы, такое как loop.create_server(), loop.create_unix_server(), loop.create_connection(), loop.create_unix_connection(), loop.connect_accepted_socket(), loop.connect_read_pipe() и loop.connect_write_pipe() принимает фабрики это возвращает, текущий протоколы.

Protocol.data_received(data)

Вызывается при получении некоторых данных. data - непустой байтовый объект, содержащий входящие данные.

Буферизация, разделение или повторная сборка данных зависит от транспорта. В общем, не стоит полагаться на конкретную семантику и вместо этого делать свой парсинг универсальным и гибким. Однако данные всегда принимаются в правильном порядке.

В то время как соединение открыто, метод можно вызывать произвольное число раз.

Однако protocol.eof_received() называют не более одного раза. После вызова „eof _ received ()“ data_received() больше не вызывается.

Protocol.eof_received()

Вызывается, когда другой конец сигнала не посылает больше никаких данных (например, путем вызова transport.write_eof(), если другой конец также использует asyncio).

Эта метод может возвращает ложное значение (включая None), в этом случае транспорт закроется сам. И наоборот, если это метод возвращаетs истинное значение, то протокол используемый определяет, следует ли закрыть транспорт. Поскольку реализация по умолчанию возвращаетs None, она неявно закрывает соединение.

Некоторые транспорты, включая SSL, не поддерживают полузакрытые соединения, в этом случае возвращаетing true из этого метод приведет к закрытию соединения.

Машина состояний:

start -> connection_made
    [-> data_received]*
    [-> eof_received]?
-> connection_lost -> end

Буферизированные потоковые протоколы

Добавлено в версии 3.7: Важно: это было добавлено к asyncio в Python 3.7 на временной основе! Это экспериментальный API, который можно полностью изменить или удалить в Python 3.8.

Буферизованные протоколы можно используемый с любым событийный цикл метод, поддерживающим Потоковые протоколы.

BufferedProtocol реализации обеспечивают явное ручное выделение буфера приема и управление им. Событийный циклs этого можно использовать буфер, предоставляемый протоколом, чтобы избежать ненужных копий данных. Это может привести к заметному повышению производительности протоколов, которые получают большие объемы данных. Сложные реализации протоколов могут значительно сократить количество выделенных буферов.

На колбэки BufferedProtocol вызваны следующие сущности:

BufferedProtocol.get_buffer(sizehint)

Вызывается для выделения нового буфера приема.

sizehint - рекомендуемый минимальный размер буфера возвращаемый. Приемлемо возвращает буферов меньшего размера или большего размера, чем это предполагает sizehint. Если установлено значение -1, размер буфера может быть произвольным. Ошибка при возвращает буфера с нулевым размером.

get_buffer() должен возвращает объект, реализующий buffer protocol.

BufferedProtocol.buffer_updated(nbytes)

Вызывается при обновлении буфера принятыми данными.

nbytes - общее количество байтов, записанных в буфер.

BufferedProtocol.eof_received()

См. документацию по protocol.eof_received() метод.

get_buffer() можно вызывать произвольное число раз во время соединения. Однако protocol.eof_received() вызывается максимум один раз и, если называется, get_buffer() и buffer_updated() не будут вызываться после него.

Машина состояний:

start -> connection_made
    [-> get_buffer
        [-> buffer_updated]?
    ]*
    [-> eof_received]?
-> connection_lost -> end

Дейтаграммные протоколы

Сущности протокола дейтаграмм должны создаваться фабриками протоколов, переданными loop.create_datagram_endpoint() метод.

DatagramProtocol.datagram_received(data, addr)

Вызывается при получении дейтаграммы. data - это байтовый объект, содержащий входящие данные. addr - адрес равноправного объекта, посылающего данные; точный формат зависит от транспорта.

DatagramProtocol.error_received(exc)

Вызывается, когда предыдущая операция отправки или получения вызывает OSError. exc - OSError сущность.

Метод вызывается в редких случаях, когда транспорт (например, UDP) обнаруживает, что дейтаграмма не может быть доставлена получателю. Однако во многих условиях непоставляемые дейтаграммы будут отбрасываться без предупреждения.

Примечание

В BSD системах (macOS, FreeBSD и т.д.) управление потоками не поддерживается для протоколов дейтаграмм, потому что нет надежного способа обнаружить сбой при отправке, вызванные записью слишком большого количества пакетов.

Сокет всегда отображается как «готовый», и избыточные пакеты отбрасываются. OSError с errno, установленным в errno.ENOBUFS, может быть поднят или не поднят; Если он будет поднят, о нем будет сообщено DatagramProtocol.error_received(), но в противном случае он будет проигнорирован.

Протоколы подпроцесса

Сущности протокола дейтаграмм должны создаваться фабриками протоколов, переданными loop.subprocess_exec() и loop.subprocess_shell() методы.

SubprocessProtocol.pipe_data_received(fd, data)

Вызывается, когда дочерний процесс записывает данные в пайп stdout или stderr.

fd - целочисленный дескриптор файла пайп.

data - непустой байтовый объект, содержащий полученные данные.

SubprocessProtocol.pipe_connection_lost(fd, exc)

Вызывается, когда один из пайпы, взаимодействующих с дочерним процессом, закрыт.

fd является целочисленным дескриптором файла, который был закрыт.

SubprocessProtocol.process_exited()

Вызывается после завершения дочернего процесса.

Примеры

Эхо сервер TCP

Создайте сервер TCP echo с помощью loop.create_server() метод, отправьте обратно полученные данные и закрыть соединение:

import asyncio


class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()


async def main():
    # Получить ссылку на событийный цикл, как мы планируем использовать
    # низкоуровневое API
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        lambda: EchoServerProtocol(),
        '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()


asyncio.run(main())

См.также

В TCP эхо сервер использующий потоки примереиспользуется функция asyncio.start_server() высокого уровня.

Эхо клиент TCP

Клиент TCP echo, использующий loop.create_connection() метод, отправляет данные и ожидает закрытия соединения:

import asyncio


class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        self.on_con_lost.set_result(True)


async def main():
    # Получите ссылку на событийный цикл, как мы планируем использовать
    # низкоуровневое API
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = 'Hello World!'

    transport, protocol = await loop.create_connection(
        lambda: EchoClientProtocol(message, on_con_lost),
        '127.0.0.1', 8888)

    # Дождитесь сигнала протокола о потере соединения и
    # закрыть транспорт.
    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

См.также

В TCP echo client using streams примере используется функция asyncio.open_connection() высокого уровня.

Эхо сервер UDP

Эхо-сервер UDP, используя loop.create_datagram_endpoint() метод, отправляет обратно полученные данные:

import asyncio


class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)


async def main():
    print("Starting UDP server")

    # Получите ссылку на событийный цикл, как мы планируем использовать
    # низкоуровневое API
    loop = asyncio.get_running_loop()

    # Для обслуживания всех будет создан один сущность протокола
    # запросы клиента.
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoServerProtocol(),
        local_addr=('127.0.0.1', 9999))

    try:
        await asyncio.sleep(3600)  # Serve for 1 hour.
    finally:
        transport.close()


asyncio.run(main())

Эхо клиент UDP

Эхо-клиент UDP, используя loop.create_datagram_endpoint() метод, отправляет данные и закрывает транспорт при получении ответа:

import asyncio


class EchoClientProtocol:
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Connection closed")
        self.on_con_lost.set_result(True)


async def main():
    # Получить ссылку на событийный цикл, как мы планируем использовать
    # низкоуровневое API
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = "Hello World!"

    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoClientProtocol(message, on_con_lost),
        remote_addr=('127.0.0.1', 9999))

    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

Соединение существующих сокетов

Дождитесь, пока сокет получит данные с использованием loop.create_connection(), метод с протоколом:

import asyncio
import socket


class MyProtocol(asyncio.Protocol):

    def __init__(self, on_con_lost):
        self.transport = None
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # Мы закончили: закрыть транспорт;
        # connection_lost() будет вызван автоматически.
        self.transport.close()

    def connection_lost(self, exc):
        # Этот сокет закрыт
        self.on_con_lost.set_result(True)


async def main():
    # Получить ссылку на событийный цикл, как мы планируем использовать
    # низкоуровневое API
    loop = asyncio.get_running_loop()
    on_con_lost = loop.create_future()

    # Создать пару подключённых сокетов
    rsock, wsock = socket.socketpair()

    # Зарегистрируйте сокет для ожидания данных.
    transport, protocol = await loop.create_connection(
        lambda: MyProtocol(on_con_lost), sock=rsock)

    # Смоделируйте прием данных из сети.
    loop.call_soon(wsock.send, 'abc'.encode())

    try:
        await protocol.on_con_lost
    finally:
        transport.close()
        wsock.close()

asyncio.run(main())

См.также

В watch a file descriptor for read events примере для регистрации FD используется низкоуровневое loop.add_reader() метод.

В зарегистрировать открытый сокет для ожидания данных с помощью потоков примере используются потоки высокого уровня, созданные функцией open_connection() в корутине.

loop.subprocess_exec() и SubprocessProtocol

Примером протокола подпроцессы используемый получение выходных данных подпроцессы и ожидание выхода подпроцессы.

Подпроцесс создается методом loop.subprocess_exec():

import asyncio
import sys

class DateProtocol(asyncio.SubprocessProtocol):
    def __init__(self, exit_future):
        self.exit_future = exit_future
        self.output = bytearray()

    def pipe_data_received(self, fd, data):
        self.output.extend(data)

    def process_exited(self):
        self.exit_future.set_result(True)

async def get_date():
    # Получить ссылку на событийный цикл, как мы планируем использовать
    # низкоуровневое API.
    loop = asyncio.get_running_loop()

    code = 'import datetime; print(datetime.datetime.now())'
    exit_future = asyncio.Future(loop=loop)

    # Создайте подпроцессы, которым управляет DateProtocol;
    # перенаправить стандартный вывод в пайп.
    transport, protocol = await loop.subprocess_exec(
        lambda: DateProtocol(exit_future),
        sys.executable, '-c', code,
        stdin=None, stderr=None)

    # Дождаться выхода подпроцесса, используя process_exited()
    # метод протокола.
    await exit_future

    # Закрытие пайп stdout.
    transport.close()

    # Прочитать выходные данные, собранные
    # pipe_data_received() методом протокола.
    data = bytes(protocol.output)
    return data.decode('ascii').rstrip()

date = asyncio.run(get_date())
print(f"Current date: {date}")

См. также некоторые примеры, написанные с использованием API высокого уровня.