18.5.4. Transports and protocols (callback based API)¶
Source code: Lib/asyncio/transports.py
Source code: Lib/asyncio/protocols.py
18.5.4.1. トランスポート¶
トランスポートは、asyncio
が提供する、さまざまな種類の通信チャンネルを抽象化するクラス群です。通常あなた自身が直接トランスポートのインスタンスを作成することはなく、AbstractEventLoop
のメソッドを呼び出すことでトランスポートとその下層の通信チャンネルのインスタンスが作成され、成功時にコールバックが返ります。
いったん通信チャンネルが確立されると、トランスポートは常に プロトコル インスタンスとのペアを成します。プロトコルはその後さまざまな用途のためトランスポートのメソッドを呼び出します。
asyncio
は現在 TCP、UDP、SSL およびサブプロセスパイプのトランスポートを実装しています。利用可能なトランスポートのメソッドはトランスポートの種類に依存します。
トランスポートクラスは スレッド安全ではありません。
バージョン 3.6 で変更: デフォルトでソケットオプションの TCP_NODELAY
が設定されるようになりました。
18.5.4.1.1. BaseTransport¶
-
class
asyncio.
BaseTransport
¶ トランスポートの基底クラスです。
-
close
()¶ トランスポートをクローズします。トランスポートが発信データのバッファーを持っていた場合、バッファーされたデータは非同期にフラッシュされます。それ以降データは受信されません。バッファーされていたデータがすべてフラッシュされた後、そのプロトコルの
connection_lost()
メソッドが引数None
で呼び出されます。
-
is_closing
()¶ トランスポートを閉じている最中か閉じていた場合
True
を返します。バージョン 3.5.1 で追加.
-
get_extra_info
(name, default=None)¶ オプションのトランスポート情報を返します。name は取得したトランスポート固有の情報を表す文字列で、default は情報が存在しなかったときに返す値になります。
このメソッドはトランスポートの実装に容易にチャンネル固有の情報を渡すことができます。
ソケット:
'peername'
: ソケットが接続されているリモートアドレスで、socket.socket.getpeername()
の結果になります (エラーのときはNone
)'socket'
:socket.socket
のインスタンスになります'sockname'
: ソケット自身のアドレスで、socket.socket.getsockname()
の結果になります
SSL ソケット:
'compression'
: 圧縮アルゴリズムで、ssl.SSLSocket.compression()
の結果になります。圧縮されていないときはNone
になります'cipher'
: 3 個の値 (使用されている暗号アルゴリズムの名称、使用が定義されている SSL プロトコルのバージョン、および使用されている秘密鍵のビット数) からなるタプルで、ssl.SSLSocket.cipher()
の結果になります'peercert'
: ピアの証明書で、ssl.SSLSocket.getpeercert()
の結果になります'sslcontext'
:ssl.SSLContext
のインスタンスになります'ssl_object'
:ssl.SSLObject
またはssl.SSLSocket
インスタンス
パイプ:
'pipe'
: パイプオブジェクトです
サブプロセス:
'subprocess'
:subprocess.Popen
のインスタンスになります
-
set_protocol
(protocol)¶ 新しいプロトコルを設定します。プロトコルの切り替えは、両方のプロトコルのドキュメントで切り替えがサポートされている場合にのみ行うべきです。
バージョン 3.5.3 で追加.
-
get_protocol
()¶ 現在のプロトコルを返します。
バージョン 3.5.3 で追加.
バージョン 3.5.1 で変更:
'ssl_object'
情報が SSL ソケットに追加されました。-
18.5.4.1.2. ReadTransport¶
-
class
asyncio.
ReadTransport
¶ 読み出し専用トランスポートのインターフェースです。
-
pause_reading
()¶ トランスポートの受信側を一時停止します。
resume_reading()
が呼び出されるまでそのプロトコルのdata_received()
メソッドにデータは渡されません。バージョン 3.6.7 で変更: The method is idempotent, i.e. it can be called when the transport is already paused or closed.
-
resume_reading
()¶ 受信を再開します。読み込み可能データが存在した場合そのプロトコルの
data_received()
メソッドが一度呼び出されます。バージョン 3.6.7 で変更: The method is idempotent, i.e. it can be called when the transport is already reading.
-
18.5.4.1.3. WriteTransport¶
-
class
asyncio.
WriteTransport
¶ 書き込み専用トランスポートのインターフェースです。
-
abort
()¶ トランスポートを即座にクローズします。未完了の処理があってもそれを待ちません。バッファーされているデータは失われます。それ以降データは受信されません。最終的にそのプロトコルの
connection_lost()
メソッドが引数None
で呼び出されます。
-
can_write_eof
()¶ トランスポートが
write_eof()
をサポートしている場合True
を、サポートしていない場合はFalse
を返します。
-
get_write_buffer_size
()¶ トランスポートで使用されている出力バッファーの現在のサイズを返します。
-
get_write_buffer_limits
()¶ 書き込みフロー制御の 最高 および 最低 水位点 (high- and low-water limits) を取得します。
(low, high)
のタプルを返します。low および high は整数のバイト列になります。水位点の設定は
set_write_buffer_limits()
で行います。バージョン 3.4.2 で追加.
-
set_write_buffer_limits
(high=None, low=None)¶ 書き込みフロー制御の 最高 および 最低 水位点 (high- and low-water limits) を設定します。
These two values (measured in number of bytes) control when the protocol's
pause_writing()
andresume_writing()
methods are called. If specified, the low-water limit must be less than or equal to the high-water limit. Neither high nor low can be negative.pause_writing()
is called when the buffer size becomes greater than or equal to the high value. If writing has been paused,resume_writing()
is called when the buffer size becomes less than or equal to the low value.デフォルト値は実装固有になります。最高水位点のみ与えられた場合、最低水位点は最高水位点以下の実装固有のデフォルト値になります。最高水位点にゼロが設定されると最低水位点も強制的にゼロになり、バッファが空でなくなるたびに
pause_writing()
が呼び出されるようになります。最低水位点にゼロが設定されるとバッファは空になるとすぐにresume_writing()
が呼び出されるようになります。どちらかにゼロを設定することは、並列の I/O 処理や計算の機会を減らすことになるため、一般に最善とは言えません。水位点の取得には
get_write_buffer_limits()
を使用します。
-
write
(data)¶ トランスポートにバイト列 data を書き込みます。
このメソッドはブロックしません; データをバッファーし、非同期に送信する準備を行います。
-
writelines
(list_of_data)¶ バイト列のデータのリスト (またはイテラブル) をトランスポートに書き込みます。この振る舞いはイテラブルを yield して各要素で
write()
を呼び出すことと等価ですが、より効率的な実装となる場合があります。
-
write_eof
()¶ バッファーされたデータをフラッシュした後トランスポートの送信側をクローズします。データは受信されます。
このメソッドはトランスポート (例えば SSL) がハーフクローズをサポートしていない場合
NotImplementedError
を送出します。
-
18.5.4.1.4. DatagramTransport¶
18.5.4.1.5. BaseSubprocessTransport¶
-
class
asyncio.
BaseSubprocessTransport
¶ -
get_pid
()¶ サブプロセスのプロセス ID (整数) を返します。
-
get_pipe_transport
(fd)¶ 整数のファイル記述子 fd に該当する通信パイプのトランスポートを返します:
-
get_returncode
()¶ サブプロセスのリターンコード (整数) を返します。リターンコードを持たない場合
None
を返します。subprocess.Popen.returncode
属性と同じです。
-
kill
()¶ subprocess.Popen.kill()
と同様に、サブプロセスを kill します。POSIX システムでは、この関数はサブプロセスに SIGKILL を送信します。Windows では、このメソッドは
terminate()
の別名です。
-
send_signal
(signal)¶ サブプロセスにシグナル signal を送信します。
subprocess.Popen.send_signal()
と同じです。
-
terminate
()¶ サブプロセスに停止を要求します。
subprocess.Popen.terminate()
と同じです。このメソッドはclose()
メソッドの別名です。POSIX システムでは、このメソッドはサブプロセスに SIGTERM を送信します。Windows では、Windows API 関数 TerminateProcess() が呼び出されます。
-
close
()¶ サブプロセスがまだ返していない場合、
terminate()
メソッドの呼び出しによってサブプロセスに停止を要求し、全パイプ (stdin、stdout および stderr) のトランスポートをクローズします。
-
18.5.4.2. プロトコル¶
asyncio
はネットワークプロトコルの実装をサブクラス化する基底クラスを提供します。これらクラスは トランスポート と連動して使用されます: プロトコルは入力データの解析および出力データの書き込みのための問い合わせを行い、トランスポートは実際の I/O とバッファリングに責任を持ちます。
プロトコルクラスをサブクラス化するとき、いくつかのメソッドをオーバーライドすることを推奨します。これらメソッドはコールバックです: いくつかのイベントが発生したとき (例えばデータの受信など) に呼び出されます; あなたがトランスポートを実装する場合を除き、これらを直接呼び出すべきではありません。
注釈
すべてのコールバックはデフォルトで空の実装を持ちます。したがって、あなたが興味を持ったイベント用のコールバックのみ実装が必要になります。
18.5.4.2.1. プロトコルクラス群¶
-
class
asyncio.
Protocol
¶ (例えば TCP や SSL トランスポートとともに使用する) ストリーミングプロトコルを実装する基底クラスです。
-
class
asyncio.
DatagramProtocol
¶ (例えば UDP トランスポートともに使用する) データグラムプロトコルを実装する基底クラスです。
-
class
asyncio.
SubprocessProtocol
¶ 子プロセスと (一方向パイプを使用して) 通信するプロトコルを実装する基底クラスです。
18.5.4.2.2. コネクションコールバック¶
これらコールバックは Protocol
、DatagramProtocol
および SubprocessProtocol
インスタンスから呼び出される場合があります:
-
BaseProtocol.
connection_made
(transport)¶ コネクションが作成されたときに呼び出されます。
引数 transport はコネクションを表すトランスポートです。必要であれば、それをどこに格納するか (例えば属性へ) を決めるのはあなたです。
-
BaseProtocol.
connection_lost
(exc)¶ コネクションが失われた、あるいはクローズされたときに呼び出されます。
引数は例外オブジェクトまたは
None
になります。None
のとき、通常の EOF が受信されたか、あるいはコネクションがこちら側から中止またはクローズされたことを意味します。
connection_made()
および connection_lost()
は接続が成功するたびに厳密に 1 回呼び出されます。その他のすべてのコールバックはこれら 2 つのメソッドの間に呼び出され、あなたのプロトコルの実装内のリソース管理を容易に行えます。
以下のコールバックを呼び出すのは SubprocessProtocol
インスタンスのみかもしれません:
-
SubprocessProtocol.
pipe_data_received
(fd, data)¶ 子プロセスが自身の標準出力や標準エラー出力のパイプにデータを書き込んだときに呼び出されます。fd はパイプのファイル記述子 (整数) になります。data はデータを含む空ではないバイト列になります。
-
SubprocessProtocol.
pipe_connection_lost
(fd, exc)¶ 子プロセスと通信するパイプの一つがクローズされると呼び出されます。fd はクローズされたファイル記述子 (整数) になります。
-
SubprocessProtocol.
process_exited
()¶ 子プロセスが終了したときに呼び出されます。
18.5.4.2.3. ストリーミングプロトコル¶
以下のコールバックは Protocol
インスタンス上で呼び出されます:
-
Protocol.
data_received
(data)¶ データを受信したときに呼び出されます。data は受信したデータを含む空ではないバイト列オブジェクトになります。
注釈
トランスポートによって、データのバッファー、チャンクあるいは再構築のどれかが行われます。一般に、固有のセマンティックを信頼すべきではなく、代わりに全体的かつ十分に柔軟な解析を行うべきです。ただし、データは常に正しい順序で受信されます。
-
Protocol.
eof_received
()¶ 相手方が送信するデータがないことを伝えてきたとき (例えば相手方が asyncio を使用しており
write_eof()
を呼び出した場合) に呼び出されます。このメソッドは偽値 (
None
を含む) を返すことがあり、その場合トランスポートは自身をクローズします。真値を返す場合、トランスポートはプロトコルによってクローズされます。デフォルトの実装ではNone
を返すため、コネクションは暗黙的にクローズされます。注釈
SSL のような一部のトランスポートはハーフクローズ接続をサポートしていません。その場合このメソッドが真値を返すとコネクションのクローズを回避できません。
接続中、data_received()
は複数回呼び出されえます。eof_received()
が呼び出されるのは 1 回で、1 度呼び出されると、その後 data_received()
が呼び出されることはありません。
ステートマシン:
開始状態 ->
connection_made()
[->data_received()
*] [->eof_received()
?] ->connection_lost()
-> 終了状態
18.5.4.2.4. データグラムプロトコル¶
以下のコールバックは DatagramProtocol
インスタンス上で呼び出されます。
-
DatagramProtocol.
datagram_received
(data, addr)¶ データグラムを受信したときに呼び出されます。data は受信データを含むバイトオブジェクトです。addr はデータを送信するピアのアドレスです; 正確な形式はトランスポートに依存します。
18.5.4.2.5. フロー制御コールバック¶
これらコールバックは Protocol
、DatagramProtocol
および SubprocessProtocol
インスタンスから呼び出される場合があります:
-
BaseProtocol.
pause_writing
()¶ トランスポートのバッファーサイズが最高水位点 (High-Water Mark) を超えたときに呼び出されます。
-
BaseProtocol.
resume_writing
()¶ トランスポートのバッファーサイズが最低水位点 (Low-Water Mark) に達したきに呼び出されます。
pause_writing()
および resume_writing()
の呼び出しは対になります。-- pause_writing()
はバッファーが完全に最高水位点を超えたとき (後続の書き込みがさらにバッファーサイズを増やすとしても) 1 度呼び出され、バッファーサイズが最終的に最低水位点に達したときに resume_writing()
が 1 度呼び出されます。
注釈
バッファーサイズが最高水位点と等しくなった時点では pause_writing()
は呼び出されません -- 完全に超えなければなりません。対して、バッファーサイズが最低水位点と等しくなったときは resume_writing()
は呼び出されます。これら端末条件は各点がゼロになったとき予定通りに動作するかどうか確認するために重要です。
注釈
BSD システム (OS X、FreeBSD など) では、DatagramProtocol
でのフロー制御は、大量のパケット送信による送信失敗を検知するのが容易ではないためサポートされていません。ソケットは常に '待機状態' のように見え、超過分のパケットは破棄されます; エラー番号 errno.ENOBUFS
が設定された OSError
が送出されるときもあればされないときもあります; 送出された場合、DatagramProtocol.error_received()
に通知されますが、送出されないと無視されます。
18.5.4.2.6. コルーチンとプロトコル¶
コルーチンはプロトコルメソッドで ensure_future()
を使用してスケジュールされることがありますが、それは実行順を保証するものではありません。プロトコルはプロトコルメソッド内で作成されたコルーチンを検知しないため、それらを待機しません。
信頼できる実行順を持つには、コルーチンの yield from
で ストリームオブジェクト を使用します。例えば、StreamWriter.drain()
コルーチンは書き込みバッファーがフラッシュされるまで待機することができます。
18.5.4.3. プロトコルの例¶
18.5.4.3.1. TCP Echo クライアントプロトコル¶
AbstractEventLoop.create_connection()
メソッドを使用した TCP Echo クライアントで、データを送信しコネクションがクローズされるまで待機します。
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, loop):
self.message = message
self.loop = loop
def connection_made(self, transport):
transport.write(self.message.encode())
print('Data sent: {!r}'.format(self.message))
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
print('Stop the event loop')
self.loop.stop()
loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
'127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
この例ではイベントループを 2 個実行しています。run_until_complete()
メソッドはサーバが待ち受け状態にないときに例外を送出するためのもので、通常作成しなければならない例外の送出やループの停止などを行うための短いコルーチンの代用になります。run_until_complete()
の終了時、ループの実行は終了しているので、エラー発生時にループを停止する必要はありません。
参考
ストリームを使った TCP Echo クライアント の例では asyncio.open_connection()
関数を使用しています。
18.5.4.3.2. TCP Echo サーバープロトコル¶
AbstractEventLoop.create_server()
メソッドを使用した TCP Echo サーバーで、受信したデータを返信しコネクションをクローズします。
import asyncio
class EchoServerClientProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
print('Send: {!r}'.format(message))
self.transport.write(data)
print('Close the client socket')
self.transport.close()
loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
Transport.close()
は、データがまだソケットに送信されていなくても、WriteTransport.write()
の直後に呼び出されます: それぞれのメソッドは非同期です。これらトランスポートメソッドはコルーチンではないため、yield from
は必要ありません。
参考
ストリームを使った TCP Echo サーバー の例では asyncio.start_server()
関数を使用しています。
18.5.4.3.3. UDP Echo クライアントプロトコル¶
AbstractEventLoop.create_datagram_endpoint()
メソッドを使用する UDP Echo クライアントで、データを送信し応答を受信するとトランスポートをクローズします。
import asyncio
class EchoClientProtocol:
def __init__(self, message, loop):
self.message = message
self.loop = loop
self.transport = None
def connection_made(self, transport):
self.transport = transport
print('Send:', self.message)
self.transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
print("Received:", data.decode())
print("Close the socket")
self.transport.close()
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print("Socket closed, stop the event loop")
loop = asyncio.get_event_loop()
loop.stop()
loop = asyncio.get_event_loop()
message = "Hello World!"
connect = loop.create_datagram_endpoint(
lambda: EchoClientProtocol(message, loop),
remote_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()
18.5.4.3.4. UDP Echo サーバープロトコル¶
AbstractEventLoop.create_datagram_endpoint()
メソッドを使用しする UDP Echo サーバーで、受信したデータを返送します。
import asyncio
class EchoServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print('Received %r from %s' % (message, addr))
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
loop = asyncio.get_event_loop()
print("Starting UDP server")
# One protocol instance will be created to serve all client requests
listen = loop.create_datagram_endpoint(
EchoServerProtocol, local_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(listen)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
transport.close()
loop.close()
18.5.4.3.5. プロトコルを使ってデータを待つオープンソケットの登録¶
プロトコルと AbstractEventLoop.create_connection()
メソッドを使用してソケットがデータを受信するまで待機し、受信後イベントループをクローズします。
import asyncio
try:
from socket import socketpair
except ImportError:
from asyncio.windows_utils import socketpair
# Create a pair of connected sockets
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()
class MyProtocol(asyncio.Protocol):
transport = None
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print("Received:", data.decode())
# We are done: close the transport (it will call connection_lost())
self.transport.close()
def connection_lost(self, exc):
# The socket has been closed, stop the event loop
loop.stop()
# Register the socket to wait for data
connect_coro = loop.create_connection(MyProtocol, sock=rsock)
transport, protocol = loop.run_until_complete(connect_coro)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Run the event loop
loop.run_forever()
# We are done, close sockets and the event loop
rsock.close()
wsock.close()
loop.close()
参考
読み込みイベント用のファイル記述子の監視 の例では、ソケットのファイル記述子を登録するのに低水準の AbstractEventLoop.add_reader()
メソッドを使用しています。
ストリームを使ってデータを待つオープンソケットの登録 の例ではコルーチンの open_connection()
関数によって作成された高水準ストリームを使用しています。