18.5.4. Transports and protocols (callback based API)

Source code: Lib/asyncio/transports.py

Source code: Lib/asyncio/protocols.py

18.5.4.1. トランスポート

トランスポートは、asyncio が提供する、さまざまな種類の通信チャンネルを抽象化するクラス群です。通常あなた自身が直接トランスポートのインスタンスを作成することはなく、AbstractEventLoop のメソッドを呼び出すことでトランスポートとその下層の通信チャンネルのインスタンスが作成され、成功時にコールバックが返ります。

いったん通信チャンネルが確立されると、トランスポートは常に プロトコル インスタンスとのペアを成します。プロトコルはその後さまざまな用途のためトランスポートのメソッドを呼び出します。

asyncio は現在 TCP、UDP、SSL およびサブプロセスパイプのトランスポートを実装しています。利用可能なトランスポートのメソッドはトランスポートの種類に依存します。

トランスポートクラスは スレッド安全ではありません

バージョン 3.6 で変更: デフォルトでソケットオプションの TCP_NODELAY が設定されるようになりました。

18.5.4.1.1. BaseTransport

class asyncio.BaseTransport

トランスポートの基底クラスです。

close()

トランスポートをクローズします。トランスポートが発信データのバッファーを持っていた場合、バッファーされたデータは非同期にフラッシュされます。それ以降データは受信されません。バッファーされていたデータがすべてフラッシュされた後、そのプロトコルの connection_lost() メソッドが引数 None で呼び出されます。

is_closing()

トランスポートを閉じている最中か閉じていた場合 True を返します。

バージョン 3.5.1 で追加.

get_extra_info(name, default=None)

オプションのトランスポート情報を返します。name は取得したトランスポート固有の情報を表す文字列で、default は情報が存在しなかったときに返す値になります。

このメソッドはトランスポートの実装に容易にチャンネル固有の情報を渡すことができます。

  • ソケット:

  • SSL ソケット:

    • 'compression': 圧縮アルゴリズムで、ssl.SSLSocket.compression() の結果になります。圧縮されていないときは None になります

    • 'cipher': 3 個の値 (使用されている暗号アルゴリズムの名称、使用が定義されている SSL プロトコルのバージョン、および使用されている秘密鍵のビット数) からなるタプルで、ssl.SSLSocket.cipher() の結果になります

    • 'peercert': ピアの証明書で、ssl.SSLSocket.getpeercert() の結果になります

    • 'sslcontext': ssl.SSLContext のインスタンスになります

    • 'ssl_object': ssl.SSLObject または ssl.SSLSocket インスタンス

  • パイプ:

    • 'pipe': パイプオブジェクトです

  • サブプロセス:

set_protocol(protocol)

新しいプロトコルを設定します。プロトコルの切り替えは、両方のプロトコルのドキュメントで切り替えがサポートされている場合にのみ行うべきです。

バージョン 3.5.3 で追加.

get_protocol()

現在のプロトコルを返します。

バージョン 3.5.3 で追加.

バージョン 3.5.1 で変更: 'ssl_object' 情報が SSL ソケットに追加されました。

18.5.4.1.2. ReadTransport

class asyncio.ReadTransport

読み出し専用トランスポートのインターフェースです。

pause_reading()

トランスポートの受信側を一時停止します。resume_reading() が呼び出されるまでそのプロトコルの data_received() メソッドにデータは渡されません。

バージョン 3.6.7 で変更: The method is idempotent, i.e. it can be called when the transport is already paused or closed.

resume_reading()

受信を再開します。読み込み可能データが存在した場合そのプロトコルの data_received() メソッドが一度呼び出されます。

バージョン 3.6.7 で変更: The method is idempotent, i.e. it can be called when the transport is already reading.

18.5.4.1.3. WriteTransport

class asyncio.WriteTransport

書き込み専用トランスポートのインターフェースです。

abort()

トランスポートを即座にクローズします。未完了の処理があってもそれを待ちません。バッファーされているデータは失われます。それ以降データは受信されません。最終的にそのプロトコルの connection_lost() メソッドが引数 None で呼び出されます。

can_write_eof()

トランスポートが write_eof() をサポートしている場合 True を、サポートしていない場合は False を返します。

get_write_buffer_size()

トランスポートで使用されている出力バッファーの現在のサイズを返します。

get_write_buffer_limits()

書き込みフロー制御の 最高 および 最低 水位点 (high- and low-water limits) を取得します。(low, high) のタプルを返します。low および high は整数のバイト列になります。

水位点の設定は set_write_buffer_limits() で行います。

バージョン 3.4.2 で追加.

set_write_buffer_limits(high=None, low=None)

書き込みフロー制御の 最高 および 最低 水位点 (high- and low-water limits) を設定します。

These two values (measured in number of bytes) control when the protocol's pause_writing() and resume_writing() methods are called. If specified, the low-water limit must be less than or equal to the high-water limit. Neither high nor low can be negative.

pause_writing() is called when the buffer size becomes greater than or equal to the high value. If writing has been paused, resume_writing() is called when the buffer size becomes less than or equal to the low value.

デフォルト値は実装固有になります。最高水位点のみ与えられた場合、最低水位点は最高水位点以下の実装固有のデフォルト値になります。最高水位点にゼロが設定されると最低水位点も強制的にゼロになり、バッファが空でなくなるたびに pause_writing() が呼び出されるようになります。最低水位点にゼロが設定されるとバッファは空になるとすぐに resume_writing() が呼び出されるようになります。どちらかにゼロを設定することは、並列の I/O 処理や計算の機会を減らすことになるため、一般に最善とは言えません。

水位点の取得には get_write_buffer_limits() を使用します。

write(data)

トランスポートにバイト列 data を書き込みます。

このメソッドはブロックしません; データをバッファーし、非同期に送信する準備を行います。

writelines(list_of_data)

バイト列のデータのリスト (またはイテラブル) をトランスポートに書き込みます。この振る舞いはイテラブルを yield して各要素で write() を呼び出すことと等価ですが、より効率的な実装となる場合があります。

write_eof()

バッファーされたデータをフラッシュした後トランスポートの送信側をクローズします。データは受信されます。

このメソッドはトランスポート (例えば SSL) がハーフクローズをサポートしていない場合 NotImplementedError を送出します。

18.5.4.1.4. DatagramTransport

DatagramTransport.sendto(data, addr=None)

リモートピア addr (トランスポート依存の対象アドレス) にバイト列 data を送信します。addrNone の場合、データはトランスポートの作成時に指定された送信先に送られます。

このメソッドはブロックしません; データをバッファーし、非同期に送信する準備を行います。

DatagramTransport.abort()

トランスポートを即座にクローズします。未完了の処理があってもそれを待ちません。バッファーされているデータは失われます。それ以降データは受信されません。最終的にそのプロトコルの connection_lost() メソッドが引数 None で呼び出されます。

18.5.4.1.5. BaseSubprocessTransport

class asyncio.BaseSubprocessTransport
get_pid()

サブプロセスのプロセス ID (整数) を返します。

get_pipe_transport(fd)

整数のファイル記述子 fd に該当する通信パイプのトランスポートを返します:

  • 0: 標準入力 (stdin) の読み込み可能ストリーミングトランスポート。サブプロセスが stdin=PIPE で作成されていない場合は None

  • 1: 標準出力 (stdout) の書き込み可能ストリーミングトランスポート。サブプロセスが stdout=PIPE で作成されていない場合は None

  • 2: 標準エラー出力 (stderr) の書き込み可能ストリーミングトランスポート。サブプロセスが stderr=PIPE で作成されていない場合は None

  • その他の fd: None

get_returncode()

サブプロセスのリターンコード (整数) を返します。リターンコードを持たない場合 None を返します。subprocess.Popen.returncode 属性と同じです。

kill()

subprocess.Popen.kill() と同様に、サブプロセスを kill します。

POSIX システムでは、この関数はサブプロセスに SIGKILL を送信します。Windows では、このメソッドは terminate() の別名です。

send_signal(signal)

サブプロセスにシグナル signal を送信します。subprocess.Popen.send_signal() と同じです。

terminate()

サブプロセスに停止を要求します。subprocess.Popen.terminate() と同じです。このメソッドは close() メソッドの別名です。

POSIX システムでは、このメソッドはサブプロセスに SIGTERM を送信します。Windows では、Windows API 関数 TerminateProcess() が呼び出されます。

close()

サブプロセスがまだ返していない場合、terminate() メソッドの呼び出しによってサブプロセスに停止を要求し、全パイプ (stdinstdout および stderr) のトランスポートをクローズします。

18.5.4.2. プロトコル

asyncio はネットワークプロトコルの実装をサブクラス化する基底クラスを提供します。これらクラスは トランスポート と連動して使用されます: プロトコルは入力データの解析および出力データの書き込みのための問い合わせを行い、トランスポートは実際の I/O とバッファリングに責任を持ちます。

プロトコルクラスをサブクラス化するとき、いくつかのメソッドをオーバーライドすることを推奨します。これらメソッドはコールバックです: いくつかのイベントが発生したとき (例えばデータの受信など) に呼び出されます; あなたがトランスポートを実装する場合を除き、これらを直接呼び出すべきではありません。

注釈

すべてのコールバックはデフォルトで空の実装を持ちます。したがって、あなたが興味を持ったイベント用のコールバックのみ実装が必要になります。

18.5.4.2.1. プロトコルクラス群

class asyncio.Protocol

(例えば TCP や SSL トランスポートとともに使用する) ストリーミングプロトコルを実装する基底クラスです。

class asyncio.DatagramProtocol

(例えば UDP トランスポートともに使用する) データグラムプロトコルを実装する基底クラスです。

class asyncio.SubprocessProtocol

子プロセスと (一方向パイプを使用して) 通信するプロトコルを実装する基底クラスです。

18.5.4.2.2. コネクションコールバック

これらコールバックは ProtocolDatagramProtocol および SubprocessProtocol インスタンスから呼び出される場合があります:

BaseProtocol.connection_made(transport)

コネクションが作成されたときに呼び出されます。

引数 transport はコネクションを表すトランスポートです。必要であれば、それをどこに格納するか (例えば属性へ) を決めるのはあなたです。

BaseProtocol.connection_lost(exc)

コネクションが失われた、あるいはクローズされたときに呼び出されます。

引数は例外オブジェクトまたは None になります。None のとき、通常の EOF が受信されたか、あるいはコネクションがこちら側から中止またはクローズされたことを意味します。

connection_made() および connection_lost() は接続が成功するたびに厳密に 1 回呼び出されます。その他のすべてのコールバックはこれら 2 つのメソッドの間に呼び出され、あなたのプロトコルの実装内のリソース管理を容易に行えます。

以下のコールバックを呼び出すのは SubprocessProtocol インスタンスのみかもしれません:

SubprocessProtocol.pipe_data_received(fd, data)

子プロセスが自身の標準出力や標準エラー出力のパイプにデータを書き込んだときに呼び出されます。fd はパイプのファイル記述子 (整数) になります。data はデータを含む空ではないバイト列になります。

SubprocessProtocol.pipe_connection_lost(fd, exc)

子プロセスと通信するパイプの一つがクローズされると呼び出されます。fd はクローズされたファイル記述子 (整数) になります。

SubprocessProtocol.process_exited()

子プロセスが終了したときに呼び出されます。

18.5.4.2.3. ストリーミングプロトコル

以下のコールバックは Protocol インスタンス上で呼び出されます:

Protocol.data_received(data)

データを受信したときに呼び出されます。data は受信したデータを含む空ではないバイト列オブジェクトになります。

注釈

トランスポートによって、データのバッファー、チャンクあるいは再構築のどれかが行われます。一般に、固有のセマンティックを信頼すべきではなく、代わりに全体的かつ十分に柔軟な解析を行うべきです。ただし、データは常に正しい順序で受信されます。

Protocol.eof_received()

相手方が送信するデータがないことを伝えてきたとき (例えば相手方が asyncio を使用しており write_eof() を呼び出した場合) に呼び出されます。

このメソッドは偽値 (None を含む) を返すことがあり、その場合トランスポートは自身をクローズします。真値を返す場合、トランスポートはプロトコルによってクローズされます。デフォルトの実装では None を返すため、コネクションは暗黙的にクローズされます。

注釈

SSL のような一部のトランスポートはハーフクローズ接続をサポートしていません。その場合このメソッドが真値を返すとコネクションのクローズを回避できません。

接続中、data_received() は複数回呼び出されえます。eof_received() が呼び出されるのは 1 回で、1 度呼び出されると、その後 data_received() が呼び出されることはありません。

ステートマシン:

開始状態 -> connection_made() [-> data_received() *] [-> eof_received() ?] -> connection_lost() -> 終了状態

18.5.4.2.4. データグラムプロトコル

以下のコールバックは DatagramProtocol インスタンス上で呼び出されます。

DatagramProtocol.datagram_received(data, addr)

データグラムを受信したときに呼び出されます。data は受信データを含むバイトオブジェクトです。addr はデータを送信するピアのアドレスです; 正確な形式はトランスポートに依存します。

DatagramProtocol.error_received(exc)

直前の送信あるいは受信が OSError を送出したときに呼び出されます。excOSError のインスタンスになります。

このメソッドが呼ばれるのは、トランスポート (UDP など) がデータグラムを受信側に配信できなかったことが検出されたなどの、まれな場合においてのみです。ほとんどの場合、データグラムが配信できなければそのまま通知されることなく破棄されます。

18.5.4.2.5. フロー制御コールバック

これらコールバックは ProtocolDatagramProtocol および SubprocessProtocol インスタンスから呼び出される場合があります:

BaseProtocol.pause_writing()

トランスポートのバッファーサイズが最高水位点 (High-Water Mark) を超えたときに呼び出されます。

BaseProtocol.resume_writing()

トランスポートのバッファーサイズが最低水位点 (Low-Water Mark) に達したきに呼び出されます。

pause_writing() および resume_writing() の呼び出しは対になります。-- pause_writing() はバッファーが完全に最高水位点を超えたとき (後続の書き込みがさらにバッファーサイズを増やすとしても) 1 度呼び出され、バッファーサイズが最終的に最低水位点に達したときに resume_writing() が 1 度呼び出されます。

注釈

バッファーサイズが最高水位点と等しくなった時点では pause_writing() は呼び出されません -- 完全に超えなければなりません。対して、バッファーサイズが最低水位点と等しくなったときは resume_writing() は呼び出されます。これら端末条件は各点がゼロになったとき予定通りに動作するかどうか確認するために重要です。

注釈

BSD システム (OS X、FreeBSD など) では、DatagramProtocol でのフロー制御は、大量のパケット送信による送信失敗を検知するのが容易ではないためサポートされていません。ソケットは常に '待機状態' のように見え、超過分のパケットは破棄されます; エラー番号 errno.ENOBUFS が設定された OSError が送出されるときもあればされないときもあります; 送出された場合、DatagramProtocol.error_received() に通知されますが、送出されないと無視されます。

18.5.4.2.6. コルーチンとプロトコル

コルーチンはプロトコルメソッドで ensure_future() を使用してスケジュールされることがありますが、それは実行順を保証するものではありません。プロトコルはプロトコルメソッド内で作成されたコルーチンを検知しないため、それらを待機しません。

信頼できる実行順を持つには、コルーチンの yield fromストリームオブジェクト を使用します。例えば、StreamWriter.drain() コルーチンは書き込みバッファーがフラッシュされるまで待機することができます。

18.5.4.3. プロトコルの例

18.5.4.3.1. TCP Echo クライアントプロトコル

AbstractEventLoop.create_connection() メソッドを使用した TCP Echo クライアントで、データを送信しコネクションがクローズされるまで待機します。

import asyncio

class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()

loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
                              '127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

この例ではイベントループを 2 個実行しています。run_until_complete() メソッドはサーバが待ち受け状態にないときに例外を送出するためのもので、通常作成しなければならない例外の送出やループの停止などを行うための短いコルーチンの代用になります。run_until_complete() の終了時、ループの実行は終了しているので、エラー発生時にループを停止する必要はありません。

参考

ストリームを使った TCP Echo クライアント の例では asyncio.open_connection() 関数を使用しています。

18.5.4.3.2. TCP Echo サーバープロトコル

AbstractEventLoop.create_server() メソッドを使用した TCP Echo サーバーで、受信したデータを返信しコネクションをクローズします。

import asyncio

class EchoServerClientProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()

loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

Transport.close() は、データがまだソケットに送信されていなくても、WriteTransport.write() の直後に呼び出されます: それぞれのメソッドは非同期です。これらトランスポートメソッドはコルーチンではないため、yield from は必要ありません。

参考

ストリームを使った TCP Echo サーバー の例では asyncio.start_server() 関数を使用しています。

18.5.4.3.3. UDP Echo クライアントプロトコル

AbstractEventLoop.create_datagram_endpoint() メソッドを使用する UDP Echo クライアントで、データを送信し応答を受信するとトランスポートをクローズします。

import asyncio

class EchoClientProtocol:
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Socket closed, stop the event loop")
        loop = asyncio.get_event_loop()
        loop.stop()

loop = asyncio.get_event_loop()
message = "Hello World!"
connect = loop.create_datagram_endpoint(
    lambda: EchoClientProtocol(message, loop),
    remote_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()

18.5.4.3.4. UDP Echo サーバープロトコル

AbstractEventLoop.create_datagram_endpoint() メソッドを使用しする UDP Echo サーバーで、受信したデータを返送します。

import asyncio

class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)

loop = asyncio.get_event_loop()
print("Starting UDP server")
# One protocol instance will be created to serve all client requests
listen = loop.create_datagram_endpoint(
    EchoServerProtocol, local_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(listen)

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

transport.close()
loop.close()

18.5.4.3.5. プロトコルを使ってデータを待つオープンソケットの登録

プロトコルと AbstractEventLoop.create_connection() メソッドを使用してソケットがデータを受信するまで待機し、受信後イベントループをクローズします。

import asyncio
try:
    from socket import socketpair
except ImportError:
    from asyncio.windows_utils import socketpair

# Create a pair of connected sockets
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()

class MyProtocol(asyncio.Protocol):
    transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport (it will call connection_lost())
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed, stop the event loop
        loop.stop()

# Register the socket to wait for data
connect_coro = loop.create_connection(MyProtocol, sock=rsock)
transport, protocol = loop.run_until_complete(connect_coro)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

# Run the event loop
loop.run_forever()

# We are done, close sockets and the event loop
rsock.close()
wsock.close()
loop.close()

参考

読み込みイベント用のファイル記述子の監視 の例では、ソケットのファイル記述子を登録するのに低水準の AbstractEventLoop.add_reader() メソッドを使用しています。

ストリームを使ってデータを待つオープンソケットの登録 の例ではコルーチンの open_connection() 関数によって作成された高水準ストリームを使用しています。