Транспорт і протоколи¶
Передмова
Транспорти та протоколи використовуються API циклу подій низького рівня, наприклад loop.create_connection()
. Вони використовують стиль програмування на основі зворотного виклику та дозволяють високопродуктивні реалізації мережевих або IPC-протоколів (наприклад, HTTP).
По суті, транспорти та протоколи слід використовувати лише в бібліотеках і фреймворках, а не в асинхронних програмах високого рівня.
Ця сторінка документації охоплює як Transports, так і Protocols.
Вступ
На найвищому рівні транспорт стосується того, як передаються байти, тоді як протокол визначає, які байти передавати (і певною мірою коли).
Інший спосіб сказати те саме: транспорт є абстракцією для сокета (або подібної кінцевої точки вводу/виводу), тоді як протокол є абстракцією для програми з точки зору транспорту.
Ще один погляд полягає в тому, що транспортний і протокольний інтерфейси разом визначають абстрактний інтерфейс для використання мережевого вводу-виводу та міжпроцесного введення-виведення.
Між транспортними об’єктами та об’єктами протоколу завжди існує зв’язок 1:1: протокол викликає транспортні методи для надсилання даних, тоді як транспорт викликає протокольні методи для передачі йому отриманих даних.
Більшість методів циклу подій, орієнтованих на підключення (таких як loop.create_connection()
), зазвичай приймають аргумент protocol_factory, який використовується для створення об’єкта Protocol для прийнятного з’єднання, представленого об’єктом Transport. Такі методи зазвичай повертають кортеж (транспорт, протокол)
.
Зміст
Ця сторінка документації містить такі розділи:
Розділ Transports документує asyncio
BaseTransport
,ReadTransport
,WriteTransport
,Transport
,DatagramTransport
іSubprocessTransport
класи.Розділ Protocols документує асинхронні класи
BaseProtocol
,Protocol
,BufferedProtocol
,DatagramProtocol
іSubprocessProtocol
.Розділ Examples демонструє, як працювати з транспортами, протоколами та API низькорівневого циклу подій.
Транспорти¶
Вихідний код: Lib/asyncio/transports.py
Транспорти — це класи, надані asyncio
для абстрагування різних видів каналів зв’язку.
Транспортні об’єкти завжди створюються асинхронним циклом подій.
asyncio реалізує транспорти для каналів TCP, UDP, SSL і підпроцесів. Методи, доступні на транспорті, залежать від типу транспорту.
Транспортні класи не потоково безпечні.
Ієрархія транспортів¶
- class asyncio.BaseTransport¶
Базовий клас для всіх видів транспорту. Містить методи, спільні для всіх асинхронних транспортів.
- class asyncio.WriteTransport(BaseTransport)¶
Базовий транспорт для підключень лише для запису.
Екземпляри класу WriteTransport повертаються з методу циклу подій
loop.connect_write_pipe()
, а також використовуються пов’язаними з підпроцесами методами, наприкладloop.subprocess_exec()
.
- class asyncio.ReadTransport(BaseTransport)¶
Базовий транспорт для підключень лише для читання.
Екземпляри класу ReadTransport повертаються з методу циклу подій
loop.connect_read_pipe()
, а також використовуються пов’язаними з підпроцесами методами, наприкладloop.subprocess_exec()
.
- class asyncio.Transport(WriteTransport, ReadTransport)¶
Інтерфейс, що представляє двонаправлений транспорт, наприклад TCP-з’єднання.
Користувач не створює екземпляр транспорту безпосередньо; вони викликають службову функцію, передаючи їй фабрику протоколів та іншу інформацію, необхідну для створення транспорту та протоколу.
Екземпляри класу Transport повертаються або використовуються такими методами циклу подій, як
loop.create_connection()
,loop.create_unix_connection()
,loop.create_server()
,loop. sendfile()
тощо.
- class asyncio.DatagramTransport(BaseTransport)¶
Транспорт для з’єднань дейтаграм (UDP).
Екземпляри класу DatagramTransport повертаються з методу циклу подій
loop.create_datagram_endpoint()
.
- class asyncio.SubprocessTransport(BaseTransport)¶
Абстракція для представлення зв’язку між батьківським і дочірнім процесами ОС.
Екземпляри класу SubprocessTransport повертаються з методів циклу подій
loop.subprocess_shell()
іloop.subprocess_exec()
.
Базовий транспорт¶
- BaseTransport.close()¶
Закрити транспорт.
If the transport has a buffer for outgoing data, buffered data will be flushed asynchronously. No more data will be received. After all buffered data is flushed, the protocol’s
protocol.connection_lost()
method will be called withNone
as its argument. The transport should not be used once it is closed.
- BaseTransport.is_closing()¶
Повертає
True
, якщо транспорт закривається або закритий.
- BaseTransport.get_extra_info(name, default=None)¶
Повертає інформацію про транспорт або основні ресурси, які він використовує.
ім’я — це рядок, що представляє частину транспортної інформації, яку потрібно отримати.
default — це значення, яке повертається, якщо інформація недоступна або якщо транспорт не підтримує її запит за допомогою сторонньої реалізації циклу подій або на поточній платформі.
Наприклад, наступний код намагається отримати базовий об’єкт сокета транспорту:
sock = transport.get_extra_info('socket') if sock is not None: print(sock.getsockopt(...))
Категорії інформації, які можна запитувати на деяких транспортах:
розетка:
'peername'
: віддалена адреса, до якої підключено сокет, результатsocket.socket.getpeername()
(None
у разі помилки)'socket'
:socket.socket
екземпляр'sockname'
: власна адреса сокета, результатsocket.socket.getsockname()
SSL-сокет:
'стиснення'
: алгоритм стиснення, що використовується як рядок, абоNone
, якщо з’єднання не стиснуте; результатssl.SSLSocket.compression()
'cipher'
: кортеж з трьох значень, що містить назву використовуваного шифру, версію протоколу SSL, яка визначає його використання, і кількість секретних бітів, що використовуються; результатssl.SSLSocket.cipher()
'peercert'
: одноранговий сертифікат; результатssl.SSLSocket.getpeercert()
'sslcontext'
: екземплярssl.SSLContext
'ssl_object'
:ssl.SSLObject
абоssl.SSLSocket
екземпляр
труба:
'pipe'
: об’єкт труби
підпроцес:
екземпляр
'subprocess'
:subprocess.Popen
- BaseTransport.set_protocol(protocol)¶
Встановіть новий протокол.
Протокол перемикання слід виконувати лише тоді, коли обидва протоколи задокументовано для підтримки перемикання.
- BaseTransport.get_protocol()¶
Повернути поточний протокол.
Транспорти лише для читання¶
- ReadTransport.is_reading()¶
Повертає
True
, якщо транспорт отримує нові дані.Added in version 3.7.
- ReadTransport.pause_reading()¶
Призупинити приймальний кінець транспорту. Жодні дані не будуть передані в метод протоколу
protocol.data_received()
, доки не буде викликаноresume_reading()
.Змінено в версії 3.7: Метод є ідемпотентним, тобто його можна викликати, коли транспорт вже призупинено або закрито.
- ReadTransport.resume_reading()¶
Відновіть приймальний кінець. Метод
protocol.data_received()
протоколу буде викликано ще раз, якщо деякі дані доступні для читання.Змінено в версії 3.7: Метод є ідемпотентним, тобто його можна викликати, коли транспорт вже читає.
Транспорти лише для запису¶
- WriteTransport.abort()¶
Закрийте транспорт негайно, не чекаючи завершення незавершених операцій. Буферизовані дані буде втрачено. Дані більше не надходитимуть. Метод
protocol.connection_lost()
протоколу зрештою буде викликано зNone
як аргумент.
- WriteTransport.can_write_eof()¶
Повертає
True
, якщо транспорт підтримуєwrite_eof()
,False
, якщо ні.
- WriteTransport.get_write_buffer_size()¶
Повертає поточний розмір вихідного буфера, який використовується транспортом.
- WriteTransport.get_write_buffer_limits()¶
Отримайте високий і низький водяні знаки для керування потоком запису. Повертає кортеж
(low, high)
, де low і high є додатною кількістю байтів.Використовуйте
set_write_buffer_limits()
, щоб встановити обмеження.Added in version 3.4.2.
- WriteTransport.set_write_buffer_limits(high=None, low=None)¶
Встановіть високий і низький водяні знаки для керування потоком запису.
Ці два значення (вимірюються в кількості байтів) контролюють, коли викликаються методи протоколу
protocol.pause_writing()
іprotocol.resume_writing()
. Якщо вказано, нижній водяний знак має бути меншим або дорівнювати високому водяному знаку. Ні високий, ні низький не можуть бути негативними.pause_writing()
викликається, коли розмір буфера стає більшим або рівним значенню high. Якщо запис призупинено,resume_writing()
викликається, коли розмір буфера стає меншим або рівним низькому значенню.Значення за замовчуванням залежать від реалізації. Якщо вказано лише високий водяний знак, низький водяний знак за замовчуванням має значення, що залежить від реалізації, менше або дорівнює верхньому водяному знаку. Якщо встановити high на нуль, low також буде встановлено на нуль і спричинить виклик
pause_writing()
щоразу, коли буфер стає непорожнім. Якщо встановити low на нуль,resume_writing()
буде викликатися лише після того, як буфер буде порожнім. Використання нуля для будь-якого обмеження, як правило, є неоптимальним, оскільки воно зменшує можливості одночасного виконання вводу-виводу та обчислень.Використовуйте
get_write_buffer_limits()
, щоб отримати обмеження.
- WriteTransport.write(data)¶
Запишіть кілька байтів data в транспорт.
Цей спосіб не блокує; він буферизує дані та організовує їх асинхронне надсилання.
- WriteTransport.writelines(list_of_data)¶
Запишіть список (або будь-яку ітерацію) байтів даних у транспорт. Це функціонально еквівалентно виклику
write()
для кожного елемента, отриманого iterable, але може бути реалізовано більш ефективно.
- WriteTransport.write_eof()¶
Закрийте кінець запису транспорту після очищення всіх буферизованих даних. Дані ще можуть бути отримані.
Цей метод може викликати
NotImplementedError
, якщо транспорт (наприклад, SSL) не підтримує напівзакриті з’єднання.
Транспортування дейтаграм¶
- DatagramTransport.sendto(data, addr=None)¶
Надішліть байти data до віддаленого однорангового вузла, заданого addr (цільова адреса, що залежить від транспорту). Якщо addr має значення
None
, дані надсилаються на цільову адресу, указану під час створення транспорту.Цей спосіб не блокує; він буферизує дані та організовує їх асинхронне надсилання.
- DatagramTransport.abort()¶
Закрийте транспорт негайно, не чекаючи завершення незавершених операцій. Буферизовані дані буде втрачено. Дані більше не надходитимуть. Метод
protocol.connection_lost()
протоколу зрештою буде викликано зNone
як аргумент.
Транспортування підпроцесів¶
- SubprocessTransport.get_pid()¶
Повертає ідентифікатор процесу підпроцесу як ціле число.
- SubprocessTransport.get_pipe_transport(fd)¶
Повертає транспорт для комунікаційного каналу, що відповідає цілочисельному файловому дескриптору fd:
0
: доступний для читання потоковий транспорт стандартного введення (stdin) абоNone
, якщо підпроцес не було створено за допомогоюstdin=PIPE
1
: доступний для запису потоковий транспорт стандартного виводу (stdout) абоNone
, якщо підпроцес не було створено за допомогоюstdout=PIPE
2
: доступний для запису потоковий транспорт стандартної помилки (stderr) абоNone
, якщо підпроцес не було створено за допомогоюstderr=PIPE
інший fd:
None
- SubprocessTransport.get_returncode()¶
Повертає код повернення підпроцесу як ціле число або
None
, якщо він не повернувся, що подібно до атрибутаsubprocess.Popen.returncode
.
- SubprocessTransport.kill()¶
Закрийте підпроцес.
У системах POSIX функція надсилає SIGKILL підпроцесу. У Windows цей метод є псевдонімом для
terminate()
.Дивіться також
subprocess.Popen.kill()
.
- SubprocessTransport.send_signal(signal)¶
Надішліть номер сигналу підпроцесу, як у
subprocess.Popen.send_signal()
.
- SubprocessTransport.terminate()¶
Зупиніть підпроцес.
On POSIX systems, this method sends
SIGTERM
to the subprocess. On Windows, the Windows API functionTerminateProcess()
is called to stop the subprocess.Дивіться також
subprocess.Popen.terminate()
.
Протоколи¶
Вихідний код: Lib/asyncio/protocols.py
asyncio надає набір абстрактних базових класів, які слід використовувати для реалізації мережевих протоколів. Ці класи призначені для використання разом із transports.
Підкласи абстрактних базових класів протоколу можуть реалізовувати деякі або всі методи. Усі ці методи є зворотними викликами: вони викликаються транспортами під час певних подій, наприклад, коли надходять якісь дані. Метод базового протоколу має викликатися відповідним транспортом.
Базові протоколи¶
- class asyncio.BaseProtocol¶
Базовий протокол із методами, які використовують усі протоколи.
- class asyncio.Protocol(BaseProtocol)¶
Базовий клас для реалізації потокових протоколів (TCP, Unix-сокети тощо).
- class asyncio.BufferedProtocol(BaseProtocol)¶
Базовий клас для реалізації потокових протоколів із ручним керуванням приймальним буфером.
- class asyncio.DatagramProtocol(BaseProtocol)¶
Базовий клас для реалізації протоколів дейтаграм (UDP).
- class asyncio.SubprocessProtocol(BaseProtocol)¶
Базовий клас для реалізації протоколів, що спілкуються з дочірніми процесами (односпрямовані канали).
Базовий протокол¶
Усі асинхронні протоколи можуть реалізовувати зворотні виклики базового протоколу.
Зворотні виклики підключення
Зворотні виклики підключення викликаються для всіх протоколів рівно один раз за успішне підключення. Усі інші зворотні виклики протоколу можна викликати лише між цими двома методами.
- BaseProtocol.connection_made(transport)¶
Викликається, коли встановлено з’єднання.
Аргумент transport — це транспорт, що представляє з’єднання. Протокол відповідає за збереження посилання на свій транспорт.
- BaseProtocol.connection_lost(exc)¶
Викликається, коли з’єднання втрачено або закрито.
Аргументом є або об’єкт винятку, або
None
. Останнє означає, що отримано звичайний EOF, або з’єднання було перервано чи закрито цією стороною з’єднання.
Зворотні виклики керування потоком
Зворотні виклики керування потоком можуть бути викликані транспортами, щоб призупинити або відновити запис, який виконується протоколом.
Додаткову інформацію дивіться в документації методу set_write_buffer_limits()
.
- BaseProtocol.pause_writing()¶
Викликається, коли транспортний буфер переходить верхній водяний знак.
- BaseProtocol.resume_writing()¶
Викликається, коли транспортний буфер закінчується нижче низького водяного знака.
Якщо розмір буфера дорівнює верхньому водяному знаку, pause_writing()
не викликається: розмір буфера має суворо перевищувати.
І навпаки, resume_writing()
викликається, коли розмір буфера дорівнює або менше ніж нижній водяний знак. Ці кінцеві умови важливі для забезпечення того, щоб усе відбувалося так, як очікувалося, коли будь-яка позначка дорівнює нулю.
Протоколи потокової передачі¶
Методи подій, такі як loop.create_server()
, loop.create_unix_server()
, loop.create_connection()
, loop.create_unix_connection()
, loop.connect_accepted_socket()
, loop.connect_read_pipe()
і loop.connect_write_pipe()
приймають фабрики, які повертають потокові протоколи.
- Protocol.data_received(data)¶
Викликається при отриманні деяких даних. data — це об’єкт із непорожніми байтами, що містить вхідні дані.
Від транспортування залежить, чи будуть дані буферизовані, фрагментовані чи повторно зібрані. Загалом, вам не слід покладатися на конкретну семантику, а натомість робити аналіз загальним і гнучким. Однак дані завжди надходять у правильному порядку.
Метод можна викликати довільну кількість разів, поки з’єднання відкрито.
However,
protocol.eof_received()
is called at most once. Onceeof_received()
is called,data_received()
is not called anymore.
- Protocol.eof_received()¶
Викликається, коли інший кінець сигналізує, що більше не надсилатиме даних (наприклад, викликом
transport.write_eof()
, якщо інший кінець також використовує asyncio).Цей метод може повернути хибне значення (включаючи
None
), у цьому випадку транспорт закриється сам. І навпаки, якщо цей метод повертає істинне значення, протокол, який використовується, визначає, чи закривати транспорт. Оскільки реалізація за замовчуванням повертаєNone
, вона неявно закриває з’єднання.Деякі транспортні засоби, включно з SSL, не підтримують напівзакриті з’єднання, і в цьому випадку повернення true з цього методу призведе до закриття з’єднання.
Державна машина:
start -> connection_made
[-> data_received]*
[-> eof_received]?
-> connection_lost -> end
Буферизовані потокові протоколи¶
Added in version 3.7.
Буферизовані протоколи можна використовувати з будь-яким методом циклу подій, який підтримує Streaming Protocols.
Реалізації BufferedProtocol
дозволяють явно вручну розподіляти та контролювати буфер отримання. Потім цикли подій можуть використовувати буфер, наданий протоколом, щоб уникнути непотрібних копій даних. Це може призвести до помітного підвищення продуктивності для протоколів, які отримують великі обсяги даних. Складні реалізації протоколів можуть значно зменшити кількість розподілів буферів.
Наступні зворотні виклики викликаються в екземплярах BufferedProtocol
:
- BufferedProtocol.get_buffer(sizehint)¶
Викликається для виділення нового буфера отримання.
sizehint — рекомендований мінімальний розмір для поверненого буфера. Дозволено повертати менші або більші буфери, ніж пропонує sizehint. Якщо встановлено значення -1, розмір буфера може бути довільним. Помилкою є повернення буфера з нульовим розміром.
get_buffer()
має повертати об’єкт, що реалізує протокол буфера.
- BufferedProtocol.buffer_updated(nbytes)¶
Викликається, коли буфер оновлюється отриманими даними.
nbytes — це загальна кількість байтів, які були записані в буфер.
- BufferedProtocol.eof_received()¶
Перегляньте документацію методу
protocol.eof_received()
.
get_buffer()
можна викликати довільну кількість разів під час з’єднання. Однак protocol.eof_received()
викликається щонайбільше один раз, і, якщо буде викликано, get_buffer()
і buffer_updated()
не будуть викликатися після нього.
Державна машина:
start -> connection_made
[-> get_buffer
[-> buffer_updated]?
]*
[-> eof_received]?
-> connection_lost -> end
Протоколи дейтаграм¶
Екземпляри протоколу дейтаграм мають бути створені фабриками протоколів, переданими в метод loop.create_datagram_endpoint()
.
- DatagramProtocol.datagram_received(data, addr)¶
Викликається, коли отримано дейтаграму. data — це об’єкт bytes, що містить вхідні дані. addr — це адреса вузла, який надсилає дані; точний формат залежить від транспорту.
- DatagramProtocol.error_received(exc)¶
Викликається, коли попередня операція надсилання чи отримання викликає
OSError
. exc — це екземплярOSError
.Цей метод викликається в рідкісних випадках, коли транспорт (наприклад, UDP) виявляє, що дейтаграму не вдалося доставити одержувачу. Однак у багатьох випадках дейтаграми, які неможливо доставити, будуть мовчки видалені.
Примітка
У системах BSD (macOS, FreeBSD тощо) керування потоком не підтримується для протоколів дейтаграм, оскільки немає надійного способу виявити помилки надсилання, спричинені записом занадто великої кількості пакетів.
Сокет завжди виглядає «готовим», а зайві пакети відкидаються. Помилка OSEror
з errno
встановленим на errno.ENOBUFS
може виникати або не виникати; якщо воно піднято, про це буде повідомлено DatagramProtocol.error_received()
, але в інших випадках воно буде проігноровано.
Протоколи підпроцесів¶
Екземпляри протоколу підпроцесу мають бути створені фабриками протоколів, переданими методам loop.subprocess_exec()
і loop.subprocess_shell()
.
- SubprocessProtocol.pipe_data_received(fd, data)¶
Викликається, коли дочірній процес записує дані в канал stdout або stderr.
fd — цілочисельний файловий дескриптор каналу.
data — об’єкт із непорожніми байтами, що містить отримані дані.
- SubprocessProtocol.pipe_connection_lost(fd, exc)¶
Викликається, коли одна з труб, що спілкуються з дочірнім процесом, закрита.
fd — цілочисельний файловий дескриптор, який було закрито.
- SubprocessProtocol.process_exited()¶
Викликається, коли дочірній процес завершився.
It can be called before
pipe_data_received()
andpipe_connection_lost()
methods.
Приклади¶
TCP Echo Server¶
Створіть TCP-сервер ехо за допомогою методу loop.create_server()
, надішліть назад отримані дані та закрийте з’єднання:
import asyncio
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
print('Send: {!r}'.format(message))
self.transport.write(data)
print('Close the client socket')
self.transport.close()
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
server = await loop.create_server(
lambda: EchoServerProtocol(),
'127.0.0.1', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())
Дивись також
У прикладі TCP echo server using streams використовується функція asyncio.start_server()
високого рівня.
TCP Echo Client¶
TCP-клієнт відлуння, використовуючи метод loop.create_connection()
, надсилає дані та чекає, доки з’єднання не буде закрито:
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
def connection_made(self, transport):
transport.write(self.message.encode())
print('Data sent: {!r}'.format(self.message))
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = 'Hello World!'
transport, protocol = await loop.create_connection(
lambda: EchoClientProtocol(message, on_con_lost),
'127.0.0.1', 8888)
# Wait until the protocol signals that the connection
# is lost and close the transport.
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
Дивись також
У прикладі TCP echo client using streams використовується функція asyncio.open_connection()
високого рівня.
Сервер UDP Echo¶
Ехо-сервер UDP за допомогою методу loop.create_datagram_endpoint()
повертає отримані дані:
import asyncio
class EchoServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print('Received %r from %s' % (message, addr))
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
async def main():
print("Starting UDP server")
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
# One protocol instance will be created to serve all
# client requests.
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoServerProtocol(),
local_addr=('127.0.0.1', 9999))
try:
await asyncio.sleep(3600) # Serve for 1 hour.
finally:
transport.close()
asyncio.run(main())
UDP Echo Client¶
Ехо-клієнт UDP, використовуючи метод loop.create_datagram_endpoint()
, надсилає дані та закриває транспорт, коли отримує відповідь:
import asyncio
class EchoClientProtocol:
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
self.transport = None
def connection_made(self, transport):
self.transport = transport
print('Send:', self.message)
self.transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
print("Received:", data.decode())
print("Close the socket")
self.transport.close()
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print("Connection closed")
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = "Hello World!"
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoClientProtocol(message, on_con_lost),
remote_addr=('127.0.0.1', 9999))
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
Підключення наявних розеток¶
Зачекайте, поки сокет отримає дані за допомогою методу loop.create_connection()
з протоколом:
import asyncio
import socket
class MyProtocol(asyncio.Protocol):
def __init__(self, on_con_lost):
self.transport = None
self.on_con_lost = on_con_lost
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print("Received:", data.decode())
# We are done: close the transport;
# connection_lost() will be called automatically.
self.transport.close()
def connection_lost(self, exc):
# The socket has been closed
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
# Create a pair of connected sockets
rsock, wsock = socket.socketpair()
# Register the socket to wait for data.
transport, protocol = await loop.create_connection(
lambda: MyProtocol(on_con_lost), sock=rsock)
# Simulate the reception of data from the network.
loop.call_soon(wsock.send, 'abc'.encode())
try:
await protocol.on_con_lost
finally:
transport.close()
wsock.close()
asyncio.run(main())
Дивись також
У прикладі спостерігати за файловим дескриптором для подій читання використовується метод низького рівня loop.add_reader()
для реєстрації FD.
У прикладі register an open socket to wait for data using streams використовуються потоки високого рівня, створені функцією open_connection()
у співпрограмі.
loop.subprocess_exec() і SubprocessProtocol¶
Приклад протоколу підпроцесу, який використовується для отримання результатів підпроцесу та очікування виходу підпроцесу.
Підпроцес створюється методом loop.subprocess_exec()
:
import asyncio
import sys
class DateProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future):
self.exit_future = exit_future
self.output = bytearray()
self.pipe_closed = False
self.exited = False
def pipe_connection_lost(self, fd, exc):
self.pipe_closed = True
self.check_for_exit()
def pipe_data_received(self, fd, data):
self.output.extend(data)
def process_exited(self):
self.exited = True
# process_exited() method can be called before
# pipe_connection_lost() method: wait until both methods are
# called.
self.check_for_exit()
def check_for_exit(self):
if self.pipe_closed and self.exited:
self.exit_future.set_result(True)
async def get_date():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
code = 'import datetime; print(datetime.datetime.now())'
exit_future = asyncio.Future(loop=loop)
# Create the subprocess controlled by DateProtocol;
# redirect the standard output into a pipe.
transport, protocol = await loop.subprocess_exec(
lambda: DateProtocol(exit_future),
sys.executable, '-c', code,
stdin=None, stderr=None)
# Wait for the subprocess exit using the process_exited()
# method of the protocol.
await exit_future
# Close the stdout pipe.
transport.close()
# Read the output which was collected by the
# pipe_data_received() method of the protocol.
data = bytes(protocol.output)
return data.decode('ascii').rstrip()
date = asyncio.run(get_date())
print(f"Current date: {date}")
Дивіться також той самий приклад, написаний з використанням API високого рівня.