トランスポートとプロトコル

まえがき

トランスポートとプロトコルは loop.create_connection() のような 低水準の イベントループ API から使われます。これらはコールバックに基づくプログラミングスタイルを使うことでネットワークや IPC プロトコル (HTTP など) の高性能な実装を可能にします。

基本的にトランスポートとプロトコルはライブラリやフレームワークからのみ使われるべきであり、高水準の asyncio アプリケーションから使われるものではありません。

このドキュメントは TransportsProtocols を扱います。

はじめに

最上位の観点からは、トランスポートは どのように バイトデータを送信するかに影響を与え、いっぽうプロトコルは どの バイトデータを送信するかを決定します (また、ある程度は いつ も決定します) 。

同じことを違う言い方で表現します: トランスポートはソケット (または同様の I/O 端点) の抽象化であり、プロトコルはトランスポートから見たときのアプリケーションの抽象化です。

さらにもう一つの見方として、トランスポートとプロトコルの2つのインターフェースは、協調してネットワーク I/O やプロセス間 I/O の抽象インターフェースを定義しています。

トランスポートオブジェクトとプロトコルオブジェクトの間には常に1対1の関係があります: プロトコルはデータを送信するためにトランスポートのメソッドを呼び出し、トランスポートは受信したデータを渡すためにプロトコルのメソッドを呼び出します。

ほとんどの接続に基づくイベントループメソッド (loop.create_connection() など) は通常 protocol_factory 引数を受け取り、 Transport オブジェクトで表現される確立した接続に対する Protocol オブジェクトを生成するために使います。そのようなメソッドは通常 (transport, protocol) タプルを返します。

内容

このページは以下の節から構成されます:

トランスポート

ソースコード: Lib/asyncio/transports.py


トランスポートはさまざまな通信方法を抽象化するために asyncio が提供するクラス群です。

トランスポートオブジェクトは常に asyncio イベントループ によってインスタンス化されます。

asyncio は TCP, UDP, SSL, およびサブプロセスパイプのトランスポートを実装しています。利用可能なトランスポートのメソッドはトランスポートの種類に依存します。

トランスポートクラスは スレッド安全ではありません

トランスポートのクラス階層構造

class asyncio.BaseTransport

全てのトランスポートの基底クラスです。すべての asyncio トランスポートが共有するメソッドを含んでいます。

class asyncio.WriteTransport(BaseTransport)

書き込み専用の接続に対する基底トランスポートクラスです。

WriteTransport クラスのインスタンスは loop.connect_write_pipe() イベントループメソッドから返され、 loop.subprocess_exec() のようなサブプロセスに関連するメソッドから利用されます。

class asyncio.ReadTransport(BaseTransport)

読み込み専用の接続に対する基底トランスポートクラスです。

Instances of the ReadTransport class are returned from the loop.connect_read_pipe() event loop method and are also used by subprocess-related methods like loop.subprocess_exec().

class asyncio.Transport(WriteTransport, ReadTransport)

TCP 接続のような、読み出しと書き込みの双方向のトランスポートを表現するインターフェースです。

ユーザーはトランスポートを直接インスタンス化することはありません; ユーザーは、ユーティリティ関数にプロトコルファクトリとその他トランスポートとプロトコルを作成するために必要な情報を渡して呼び出します。

Transport クラスのインスタンスは、loop.create_connection(), loop.create_unix_connection(), loop.create_server(), loop.sendfile() などのイベントループメソッドから返されたり、これらのメソッドから利用されたりします。

class asyncio.DatagramTransport(BaseTransport)

データグラム (UDP) 接続のためのトランスポートです。

DatagramTransport クラスのインスタンスは loop.create_datagram_endpoint() イベントループメソッドから返されます。

class asyncio.SubprocessTransport(BaseTransport)

親プロセスとその子プロセスの間の接続を表現する抽象クラスです。

SubprocessTransport クラスのインスタンスは loop.subprocess_shell()loop.subprocess_exec() の2つのイベントループメソッドから返されます。

基底トランスポート

BaseTransport.close()

トランスポートをクローズします。

トランスポートが発信データのバッファーを持っていた場合、バッファーされたデータは非同期にフラッシュされます。それ以降データは受信されません。バッファーされていたデータがすべてフラッシュされた後、そのプロトコルの protocol.connection_lost() メソッドが引数 None で呼び出されます。一度閉じたトランスポートは、使用されるべきではありません。

BaseTransport.is_closing()

トランスポートを閉じている最中か閉じていた場合 True を返します。

BaseTransport.get_extra_info(name, default=None)

トランスポートまたはそれが背後で利用しているリソースの情報を返します。

name は取得するトランスポート特有の情報を表す文字列です。

default は、取得したい情報が取得可能でなかったり、サードパーティのイベントループ実装や現在のプラットフォームがその情報の問い合わせをサポートしていない場合に返される値です。

例えば、以下のコードはトランスポート内のソケットオブジェクトを取得しようとします:

sock = transport.get_extra_info('socket')
if sock is not None:
    print(sock.getsockopt(...))

いくつかのトランスポートで問い合わせ可能な情報のカテゴリを示します:

  • ソケット:

  • SSL ソケット:

    • 'compression': 圧縮アルゴリズムで、ssl.SSLSocket.compression() の結果になります。圧縮されていないときは None になります

    • 'cipher': 3 個の値 (使用されている暗号アルゴリズムの名称、使用が定義されている SSL プロトコルのバージョン、および使用されている秘密鍵のビット数) からなるタプルで、ssl.SSLSocket.cipher() の結果になります

    • 'peercert': ピアの証明書で、ssl.SSLSocket.getpeercert() の結果になります

    • 'sslcontext': ssl.SSLContext のインスタンスになります

    • 'ssl_object': ssl.SSLObject または ssl.SSLSocket インスタンス

  • パイプ:

    • 'pipe': パイプオブジェクトです

  • サブプロセス:

BaseTransport.set_protocol(protocol)

トランスポートに新しいプロトコルを設定します。

プロトコルの切り替えは、両方のプロトコルのドキュメントで切り替えがサポートされている場合にのみ行うべきです。

BaseTransport.get_protocol()

現在のプロトコルを返します。

読み出し専用のトランスポート

ReadTransport.is_reading()

トランスポートが新しいデータを受信中の場合 True を返します。

バージョン 3.7 で追加.

ReadTransport.pause_reading()

トランスポートの受信側を一時停止します。 resume_reading() メソッドが呼び出されるまでプロトコルの protocol.data_received() メソッドにデータは渡されません。

バージョン 3.7 で変更: このメソッドはべき等です。すなわちトランスポートがすでに停止していたりクローズしていても呼び出すことができます。

ReadTransport.resume_reading()

受信を再開します。データが読み込み可能になるとプロトコルの protocol.data_received() メソッドが再び呼び出されるようになります。

バージョン 3.7 で変更: このメソッドはべき等です。すなわちトランスポートがすでにデータを読み込み中であっても呼び出すことができます。

書き込み専用のトランスポート

WriteTransport.abort()

未完了の処理が完了するのを待たず、即座にトランスポートをクローズします。バッファーされているデータは失われます。このメソッドの呼び出し以降データは受信されません。最終的にプロトコルの protocol.connection_lost() メソッドが引数 None で呼び出されます。

WriteTransport.can_write_eof()

トランスポートが write_eof() メソッドをサポートしている場合 True を返し、そうでない場合は False を返します。

WriteTransport.get_write_buffer_size()

トランスポートで使用されている出力バッファーの現在のサイズを返します。

WriteTransport.get_write_buffer_limits()

書き込みフロー制御の 最高 および 最低 水位点を取得します。 (low, high) タプルを返します。ここで lowhigh はバイト数をあらわす正の整数です。

水位点の設定は set_write_buffer_limits() で行います。

バージョン 3.4.2 で追加.

WriteTransport.set_write_buffer_limits(high=None, low=None)

書き込みフロー制御の 最高 および 最低 水位点を設定します。

(バイト数をあらわす) これら2つの値はプロトコルの protocol.pause_writing()protocol.resume_writing() の2つのメソッドがいつ呼ばれるかを制御します。指定する場合、 lowhigh と等しいかまたは high より小さくなければなりません。また、 highlow も負の値を指定することはできません。

pause_writing() はバッファーサイズが high の値以上になった場合に呼び出されます。書き込みが一時停止している場合、バッファーサイズが low の値以下になると resume_writing() メソッドが呼び出されます。

デフォルト値は実装固有になります。 high のみ与えられた場合、 lowhigh 以下の実装固有のデフォルト値になります。 high をゼロに設定すると low も強制的にゼロになり、バッファーが空でなくなるとすぐに pause_writing() メソッドが呼び出されるようになります。 low をゼロに設定すると、バッファーが空にな resume_writing() が呼び出されるようになります。どちらかにゼロを設定することは I/O と計算を並行に実行する機会を減少させるため、一般に最適ではありません。

上限値と下限値を取得するには get_write_buffer_limits() メソッドを使ってください。

WriteTransport.write(data)

トランスポートにバイト列 data を書き込みます。

このメソッドはブロックしません; データをバッファーし、非同期に送信する準備を行います。

WriteTransport.writelines(list_of_data)

バイト列のデータのリスト (またはイテラブル) をトランスポートに書き込みます。この振る舞いはイテラブルを yield して各要素で write() を呼び出すことと等価ですが、より効率的な実装となる場合があります。

WriteTransport.write_eof()

バッファーされた全てのデータをフラッシュした後トランスポートの送信側をクローズします。送信側をクローズした後もデータを受信することは可能です。

このメソッドはトランスポート (例えば SSL) がハーフクローズドな接続をサポートしていない場合 NotImplementedError を送出します。

データグラムトランスポート

DatagramTransport.sendto(data, addr=None)

リモートピア addr (トランスポート依存の対象アドレス) にバイト列 data を送信します。addrNone の場合、データはトランスポートの作成時に指定された送信先に送られます。

このメソッドはブロックしません; データをバッファーし、非同期に送信する準備を行います。

DatagramTransport.abort()

未完了の処理が完了するのを待たず、即座にトランスポートをクローズします。バッファーされているデータは失われます。このメソッドの呼び出し以降データは受信されません。最終的にプロトコルの protocol.connection_lost() メソッドが引数 None で呼び出されます。

サブプロセス化されたトランスポート

SubprocessTransport.get_pid()

サブプロセスのプロセス ID (整数) を返します。

SubprocessTransport.get_pipe_transport(fd)

整数のファイル記述子 fd に該当する通信パイプのトランスポートを返します:

  • 0: 標準入力 (stdin) の読み込み可能ストリーミングトランスポート。サブプロセスが stdin=PIPE で作成されていない場合は None

  • 1: 標準出力 (stdout) の書き込み可能ストリーミングトランスポート。サブプロセスが stdout=PIPE で作成されていない場合は None

  • 2: 標準エラー出力 (stderr) の書き込み可能ストリーミングトランスポート。サブプロセスが stderr=PIPE で作成されていない場合は None

  • その他の fd: None

SubprocessTransport.get_returncode()

サブプロセスのリターンコードを整数で返します。サブプロセスがリターンしなかった場合は None を返します。 subprocess.Popen.returncode 属性と同じです。

SubprocessTransport.kill()

サブプロセスを強制終了 (kill) します。

POSIX システムでは、この関数はサブプロセスに SIGKILL を送信します。Windows では、このメソッドは terminate() の別名です。

subprocess.Popen.kill() も参照してください。

SubprocessTransport.send_signal(signal)

サブプロセスにシグナル signal を送信します。subprocess.Popen.send_signal() と同じです。

SubprocessTransport.terminate()

サブプロセスを停止します。

POSIX システムでは、このメソッドはサブプロセスに SIGTERM を送信します。Windows では、Windows API 関数 TerminateProcess() がサブプロセスを停止するために呼び出されます。

subprocess.Popen.terminate() も参照してください。

SubprocessTransport.close()

kill() メソッドを呼び出すことでサブプロセスを強制終了します。

サブプロセスがまだリターンしていない場合、 stdin, stdout, および stderr の各パイプのトランスポートをクローズします。

プロトコル

ソースコード: Lib/asyncio/protocols.py


asyncio はネットワークプロトコルを実装するために使う抽象基底クラス群を提供します。これらのクラスは トランスポート と組み合わせて使うことが想定されています。

抽象基底プロトコルクラスの派生クラスはメソッドの一部または全てを実装することができます。これらのメソッドは全てコールバックです: それらは、データを受信した、などの決まったイベントに対してトランスポートから呼び出されます。基底プロトコルメソッドは対応するトランスポートから呼び出されるべきです。

基底プロトコル

class asyncio.BaseProtocol

全てのプロトコルクラスが共有する全てのメソッドを持った基底プロトコルクラスです。

class asyncio.Protocol(BaseProtocol)

ストリーミングプロトコル (TCP, Unix ソケットなど) を実装するための基底クラスです。

class asyncio.BufferedProtocol(BaseProtocol)

受信バッファーを手動で制御するストリーミングプロトコルを実装するための基底クラスです。

class asyncio.DatagramProtocol(BaseProtocol)

データグラム (UDP) プロトコルを実装するための基底クラスです。

class asyncio.SubprocessProtocol(BaseProtocol)

子プロセスと (一方向パイプを通じて) 通信するプロトコルを実装するための基底クラスです。

基底プロトコル

全ての asyncio プロトコルは基底プロトコルのコールバックを実装することができます。

通信のコールバック

コネクションコールバックは全てのプロトコルから、成功したコネクションそれぞれにつきただ一度だけ呼び出されます。その他の全てのプロトコルコールバックはこれら2つのメソッドの間に呼び出すことができます。

BaseProtocol.connection_made(transport)

コネクションが作成されたときに呼び出されます。

引数 transport はコネクションをあらわすトランスポートです。プロトコルはトランスポートへの参照を保存する責任を負います。

BaseProtocol.connection_lost(exc)

コネクションが失われた、あるいはクローズされたときに呼び出されます。

引数は例外オブジェクトまたは None になります。None のとき、通常の EOF が受信されたか、あるいはコネクションがこちら側から中止またはクローズされたことを意味します。

フロー制御コールバック

フロー制御コールバックは、プロトコルによって実行される書き込み処理を停止または再開するためにトランスポートから呼び出されます。

詳しくは set_write_buffer_limits() メソッドのドキュメントを参照してください。

BaseProtocol.pause_writing()

トランスポートのバッファーサイズが最高水位点 (high watermark) を超えたときに呼び出されます。

BaseProtocol.resume_writing()

トランスポートのバッファーサイズが最低水位点 (low watermark) に達したきに呼び出されます。

バッファーサイズが最高水位点と等しい場合、 pause_writing() は呼び出されません: バッファーサイズは必ず制限値を超えなければなりません。

それに対して、 resume_writing() はバッファーサイズが最低水位点と等しいかそれよりも小さい場合に呼び出されます。これらの境界条件は、どちらの基準値もゼロである場合の処理が期待通りとなることを保証するために重要です。

ストリーミングプロトコル

loop.create_server(), loop.create_unix_server(), loop.create_connection(), loop.create_unix_connection(), loop.connect_accepted_socket(), loop.connect_read_pipe(), そして loop.connect_write_pipe() などのイベントメソッドはストリーミングプロトコルを返すファクトリを受け付けます。

Protocol.data_received(data)

データを受信したときに呼び出されます。data は受信したデータを含む空ではないバイト列オブジェクトになります。

データがバッファーされるか、チャンキングされるか、または再構築されるかはトランスポートに依存します。一般には、特定のセマンティクスを信頼するべきではなく、代わりにデータのパースを全般的かつ柔軟に行うべきです。ただし、データは常に正しい順序で受信されます。

このメソッドは、コネクションがオープンである間は何度でも呼び出すことができます。

いっぽうで、 protocol.eof_received() メソッドは最大でも一度だけ呼び出されます。いったん eof_received() が呼び出されると、それ以降 data_received() は呼び出されません。

Protocol.eof_received()

コネクションの相手方がこれ以上データを送信しないことを伝えてきたとき (例えば相手方が asyncio を使用しており、 transport.write_eof() を呼び出した場合) に呼び出されます。

このメソッドは (None を含む) 偽値 を返すことがあり、その場合トランスポートは自身をクローズします。一方メソッドが真値を返す場合は、利用しているプロトコルがトランスポートをクローズするかどうかを決めます。デフォルトの実装は None を返すため、コネクションは暗黙のうちにクローズされます。

SSL を含む一部のトランスポートはハーフクローズ接続をサポートしません。そのような場合このメソッドが真値を返すとコネクションはクローズされます。

ステートマシン:

start -> connection_made
    [-> data_received]*
    [-> eof_received]?
-> connection_lost -> end

バッファリングされたストリーミングプロトコル

バージョン 3.7 で追加.

バッファー付きプロトコルは ストリーミングプロトコル をサポートするイベントループメソッドで利用することができます。

BufferedProtocol 実装は受信バッファーの手動での明示的な割り当てや制御を可能にします。イベントループはプロトコルにより提供されるバッファを利用することにより不要なデータのコピーを避けることができます。これにより大量のデータを受信するプロトコルにおいて顕著なパフォーマンスの向上をもたらします。精巧なプロトコル実装によりバッファー割り当ての数を劇的に減少させることができます。

以下に示すコールバックは BufferedProtocol インスタンスに対して呼び出されます:

BufferedProtocol.get_buffer(sizehint)

新しい受信バッファを割り当てるために呼び出します。

sizehint は返されるバッファーの推奨される最小サイズです。 sizehint によって推奨された値より小さい、または大きいサイズのバッファーを返すことは容認されています。 -1 がセットされた場合、バッファーサイズは任意となります。サイズがゼロのバッファーを返すとエラーになります。

get_buffer()バッファープロトコル を実装したオブジェクトを返さなければなりません。

BufferedProtocol.buffer_updated(nbytes)

受信データによりバッファが更新された場合に呼び出されます。

nbytes はバッファに書き込まれた総バイト数です。

BufferedProtocol.eof_received()

protocol.eof_received() メソッドのドキュメントを参照してください。

コネクションの間、 get_buffer() は何度でも呼び出すことができます。しかし protocol.eof_received() が呼び出されるのは最大でも1回で、もし呼び出されると、それ以降 get_buffer()buffer_updated() が呼び出されることはありません。

ステートマシン:

start -> connection_made
    [-> get_buffer
        [-> buffer_updated]?
    ]*
    [-> eof_received]?
-> connection_lost -> end

データグラムプロトコル

データグラムプロトコルのインスタンスは loop.create_datagram_endpoint() メソッドに渡されたプロトコルファクトリによって生成されるべきです。

DatagramProtocol.datagram_received(data, addr)

データグラムを受信したときに呼び出されます。data は受信データを含むバイトオブジェクトです。addr はデータを送信するピアのアドレスです; 正確な形式はトランスポートに依存します。

DatagramProtocol.error_received(exc)

直前の送信あるいは受信が OSError を送出したときに呼び出されます。excOSError のインスタンスになります。

このメソッドが呼ばれるのは、トランスポート (UDP など) がデータグラムを受信側に配信できなかったことが検出されたなどの、まれな場合においてのみです。ほとんどの場合、データグラムが配信できなければそのまま通知されることなく破棄されます。

注釈

BSD システム (macOS, FreeBSD など) ではフロー制御はサポートされていません。これは非常に多くのパケットを書き込もうとしたことによる送信の失敗を検出する信頼できる方法が存在しないためです。

ソケットは常に '準備ができた状態' のように振る舞いますが、超過したパケットは破棄されます。この場合 errnoerrno.ENOBUFS に設定した OSError 例外が送出されることがあります。もし例外が送出された場合は DatagramProtocol.error_received() に通知されますが、送出されない場合は単に無視されます。

サブプロセスプロトコル

サブプロセスプロトコルのインスタンスは loop.subprocess_exec()loop.subprocess_shell() メソッドに渡されたプロトコルファクトリにより生成されるべきです。

SubprocessProtocol.pipe_data_received(fd, data)

子プロセスが標準出力または標準エラー出力のパイプにデータを書き込んだ時に呼び出されます。

fd はパイプのファイル記述子を表す整数です。

data は受信データを含む空でないバイトオブジェクトです。

SubprocessProtocol.pipe_connection_lost(fd, exc)

子プロセスと通信するパイプのいずれかがクローズされたときに呼び出されます。

fd はクローズされたファイル記述子を表す整数です。

SubprocessProtocol.process_exited()

子プロセスが終了したときに呼び出されます。

これは、 pipe_data_received()pipe_connection_lost() メソッドの前に呼び出すことができます。

使用例

TCP エコーサーバー

loop.create_server() メソッドを使って TCP エコーサーバーを生成し、受信したデータをそのまま送り返して、最後にコネクションをクローズします:

import asyncio


class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        lambda: EchoServerProtocol(),
        '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()


asyncio.run(main())

参考

ストリームを使った TCP エコーサーバー の例では高水準の asyncio.start_server() 関数を使っています。

TCP エコークライアント

loop.create_connection() メソッドを使った TCP エコークライアントは、データを送信したあとコネクションがクローズされるまで待機します:

import asyncio


class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = 'Hello World!'

    transport, protocol = await loop.create_connection(
        lambda: EchoClientProtocol(message, on_con_lost),
        '127.0.0.1', 8888)

    # Wait until the protocol signals that the connection
    # is lost and close the transport.
    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

参考

ストリームを使った TCP エコークライアント の例では高水準の asyncio.open_connection() 関数を使っています。

UDP エコーサーバー

loop.create_datagram_endpoint() メソッドを使った UDP エコーサーバーは受信したデータをそのまま送り返します:

import asyncio


class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)


async def main():
    print("Starting UDP server")

    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    # One protocol instance will be created to serve all
    # client requests.
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoServerProtocol(),
        local_addr=('127.0.0.1', 9999))

    try:
        await asyncio.sleep(3600)  # Serve for 1 hour.
    finally:
        transport.close()


asyncio.run(main())

UDP エコークライアント

loop.create_datagram_endpoint() メソッドを使った UDP エコークライアントはデータを送信し、応答を受信するとトランスポートをクローズします:

import asyncio


class EchoClientProtocol:
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Connection closed")
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = "Hello World!"

    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoClientProtocol(message, on_con_lost),
        remote_addr=('127.0.0.1', 9999))

    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

既存のソケットへの接続

プロトコルを設定した loop.create_connection() メソッドを使ってソケットがデータを受信するまで待機します:

import asyncio
import socket


class MyProtocol(asyncio.Protocol):

    def __init__(self, on_con_lost):
        self.transport = None
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport;
        # connection_lost() will be called automatically.
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()
    on_con_lost = loop.create_future()

    # Create a pair of connected sockets
    rsock, wsock = socket.socketpair()

    # Register the socket to wait for data.
    transport, protocol = await loop.create_connection(
        lambda: MyProtocol(on_con_lost), sock=rsock)

    # Simulate the reception of data from the network.
    loop.call_soon(wsock.send, 'abc'.encode())

    try:
        await protocol.on_con_lost
    finally:
        transport.close()
        wsock.close()

asyncio.run(main())

参考

ファイル記述子の読み込みイベントを監視する 例では低レベルの loop.add_reader() メソッドを使ってファイル記述子 (FD) を登録しています。

ストリームを使ってデータを待ち受けるオープンなソケットを登録する 例ではコルーチン内で open_connection() 関数によって生成されたストリームを使っています。

loop.subprocess_exec() と SubprocessProtocol

サブプロセスからの出力を受け取り、サブプロセスが終了するまで待機するために使われるサブプロセスプロトコルの例です。

サブプロセスは loop.subprocess_exec() メソッドにより生成されます:

import asyncio
import sys

class DateProtocol(asyncio.SubprocessProtocol):
    def __init__(self, exit_future):
        self.exit_future = exit_future
        self.output = bytearray()
        self.pipe_closed = False
        self.exited = False

    def pipe_connection_lost(self, fd, exc):
        self.pipe_closed = True
        self.check_for_exit()

    def pipe_data_received(self, fd, data):
        self.output.extend(data)

    def process_exited(self):
        self.exited = True
        # process_exited() method can be called before
        # pipe_connection_lost() method: wait until both methods are
        # called.
        self.check_for_exit()

    def check_for_exit(self):
        if self.pipe_closed and self.exited:
            self.exit_future.set_result(True)

async def get_date():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    code = 'import datetime; print(datetime.datetime.now())'
    exit_future = asyncio.Future(loop=loop)

    # Create the subprocess controlled by DateProtocol;
    # redirect the standard output into a pipe.
    transport, protocol = await loop.subprocess_exec(
        lambda: DateProtocol(exit_future),
        sys.executable, '-c', code,
        stdin=None, stderr=None)

    # Wait for the subprocess exit using the process_exited()
    # method of the protocol.
    await exit_future

    # Close the stdout pipe.
    transport.close()

    # Read the output which was collected by the
    # pipe_data_received() method of the protocol.
    data = bytes(protocol.output)
    return data.decode('ascii').rstrip()

date = asyncio.run(get_date())
print(f"Current date: {date}")

高水準の API を使って書かれた 同様の例 も参照してください。