ストリーム¶
ソースコード: Lib/asyncio/streams.py
ストリームはネットワークコネクションと合わせて動作する高水準の async/await 可能な基本要素です。ストリームはコールバックや低水準のプロトコルやトランスポートを使うことなくデータを送受信することを可能にします。
以下は asyncio ストリームを使って書いた TCP エコークライアントの例です:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
下記の 使用例 節も参照してください。
ストリーム関数
以下の asyncio のトップレベル関数はストリームの作成や操作を行うことができます:
- coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)¶
ネットワークコネクションを確立し、
(reader, writer)
のオブジェクトのペアを返します。戻り値の reader と writer はそれぞれ
StreamReader
とStreamWriter
クラスのインスタンスです。limit は戻り値の
StreamReader
インスタンスが利用するバッファのサイズの上限値を設定します。デフォルトでは limit は 64 KiB に設定されます。残りの引数は直接
loop.create_connection()
に渡されます。注釈
The sock argument transfers ownership of the socket to the
StreamWriter
created. To close the socket, call itsclose()
method.バージョン 3.7 で変更: Added the ssl_handshake_timeout parameter.
バージョン 3.8 で変更: happy_eyeballs_delay と interleave が追加されました。
バージョン 3.10 で変更: loop パラメータが削除されました。
バージョン 3.11 で変更: ssl_shutdown_timeout パラメータが追加されました。
- coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
ソケットサーバーを起動します。
client_connected_cb コールバックは新しいクライアントコネクションが確立されるたびに呼び出されます。このコールバックは
StreamReader
とStreamWriter
クラスのインスタンスのペア(reader, writer)
を2つの引数として受け取ります。client_connected_cb には単純な呼び出し可能オブジェクトか、または コルーチン関数 を指定します; コルーチン関数が指定された場合、コールバックの呼び出しは自動的に
Task
としてスケジュールされます。limit は戻り値の
StreamReader
インスタンスが利用するバッファのサイズの上限値を設定します。デフォルトでは limit は 64 KiB に設定されます。残りの引数は直接
loop.create_server()
に渡されます。注釈
The sock argument transfers ownership of the socket to the server created. To close the socket, call the server's
close()
method.バージョン 3.7 で変更: ssl_handshake_timeout と start_serving パラメータが追加されました。
バージョン 3.10 で変更: loop パラメータが削除されました。
バージョン 3.11 で変更: ssl_shutdown_timeout パラメータが追加されました。
Unix ソケット
- coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
Unix ソケットコネクションを確立し、
(reader, writer)
のオブジェクトのペアを返します。この関数は
open_connection()
と似ていますが Unix ソケットに対して動作します。loop.create_unix_connection()
のドキュメントも参照してください。注釈
The sock argument transfers ownership of the socket to the
StreamWriter
created. To close the socket, call itsclose()
method.利用可能な環境: Unix。
バージョン 3.7 で変更: Added the ssl_handshake_timeout parameter. The path parameter can now be a path-like object
バージョン 3.10 で変更: loop パラメータが削除されました。
バージョン 3.11 で変更: ssl_shutdown_timeout パラメータが追加されました。
- coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
Unix のソケットサーバーを起動します。
start_server()
と似ていますが Unix ソケットに対して動作します。loop.create_unix_server()
のドキュメントも参照してください。注釈
The sock argument transfers ownership of the socket to the server created. To close the socket, call the server's
close()
method.利用可能な環境: Unix。
バージョン 3.7 で変更: Added the ssl_handshake_timeout and start_serving parameters. The path parameter can now be a path-like object.
バージョン 3.10 で変更: loop パラメータが削除されました。
バージョン 3.11 で変更: ssl_shutdown_timeout パラメータが追加されました。
StreamReader¶
- class asyncio.StreamReader¶
Represents a reader object that provides APIs to read data from the IO stream. As an asynchronous iterable, the object supports the
async for
statement.StreamReader オブジェクトを直接インスタンス化することは推奨されません; 代わりに
open_connection()
やstart_server()
を使ってください。- feed_eof()¶
EOF の肯定応答を行います。
- coroutine read(n=-1)¶
ストリームから最大 n バイト読み込みます。
n が指定されないか
-1
が指定されていた場合 EOF になるまで読み込み、読み込まれた全てのbytes
を返します。EOF を受信し、かつ内部バッファーが空の場合、空のbytes
オブジェクトを返します。n が
0
なら、ただちに空のbytes
オブジェクトを返します。n が正なら、内部バッファで最低でも 1 バイトが利用可能になり次第、利用可能な最大 n
bytes
を返します。1 バイトも読み込まないうちに EOF を受信したなら、空のbytes
オブジェクトを返します。
- coroutine readline()¶
1 行読み込みます。 "行" とは、
\n
で終了するバイト列のシーケンスです。EOF を受信し、かつ
\n
が見つからない場合、このメソッドは部分的に読み込んだデータを返します。EOF を受信し、かつ内部バッファーが空の場合、空の
bytes
オブジェクトを返します。
- coroutine readexactly(n)¶
厳密に n バイトのデータを読み出します。
n バイトを読み出す前に EOF に達した場合
IncompleteReadError
を送出します。部分的に読み出したデータを取得するにはIncompleteReadError.partial
属性を使ってください。
- coroutine readuntil(separator=b'\n')¶
separator が見つかるまでストリームからデータを読み出します。
成功時には、データと区切り文字は内部バッファから削除されます (消費されます)。返されるデータの最後には区切り文字が含まれます。
読み出したデータの量が設定したストリームの上限を超えると
LimitOverrunError
例外が送出されます。このときデータは内部バッファーに残され、再度読み出すことができます。完全な区切り文字が見つかる前に EOF に達すると
IncompleteReadError
例外が送出され、内部バッファーがリセットされます。このときIncompleteReadError.partial
属性は区切り文字の一部を含むかもしれません。Added in version 3.5.2.
- at_eof()¶
バッファーが空で
feed_eof()
が呼ばれていた場合True
を返します。
StreamWriter¶
- class asyncio.StreamWriter¶
IO ストリームにデータを書き込むための API を提供するライターオブジェクトを表します。
StreamWriter オブジェクトを直接インスタンス化することは推奨されません; 代わりに
open_connection()
やstart_server()
を使ってください。- write(data)¶
このメソッドは、背後にあるソケットにデータ data を即座に書き込みます。書き込みに失敗した場合、データは送信可能になるまで内部の書き込みバッファーに格納されて待機します。
このメソッドは
drain()
メソッドと組み合わせて使うべきです:stream.write(data) await stream.drain()
- writelines(data)¶
このメソッドは、背後にあるソケットにバイトデータのリスト (またはイテラブル) を即座に書き込みます。書き込みに失敗した場合、データは送信可能になるまで内部の書き込みバッファーに格納されて待機します。
このメソッドは
drain()
メソッドと組み合わせて使うべきです:stream.writelines(lines) await stream.drain()
- close()¶
このメソッドはストリームと背後にあるソケットをクローズします。
The method should be used, though not mandatory, along with the
wait_closed()
method:stream.close() await stream.wait_closed()
- can_write_eof()¶
背後にあるトランスポートが
write_eof()
メソッドをサポートしている場合True
を返し、そうでない場合はFalse
を返します。
- write_eof()¶
バッファーされた書き込みデータを全て書き込んでから、ストリームの書き込み側終端をクローズします。
- transport¶
背後にある asyncio トランスポートを返します。
- get_extra_info(name, default=None)¶
オプションのトランスポート情報にアクセスします。詳細は
BaseTransport.get_extra_info()
を参照してください。
- coroutine drain()¶
ストリームへの書き込み再開に適切な状態になるまで待ちます。使用例:
writer.write(data) await writer.drain()
このメソッドは背後にある IO 書き込みバッファーとやりとりを行うフロー制御メソッドです。バッファーのサイズが最高水位点に達した場合、drain() はバッファのサイズが最低水位点を下回るまで減量され、書き込み再開可能になるまで書き込みをブロックします。待ち受けの必要がない場合、
drain()
は即座にリターンします。
- coroutine start_tls(sslcontext, *, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
Upgrade an existing stream-based connection to TLS.
引数:
sslcontext: 構成済みの
SSLContext
インスタンスです。server_hostname: 対象のサーバーの証明書との照合に使われるホスト名を設定または上書きします。
ssl_handshake_timeout is the time in seconds to wait for the TLS handshake to complete before aborting the connection.
60.0
seconds ifNone
(default).ssl_shutdown_timeout is the time in seconds to wait for the SSL shutdown to complete before aborting the connection.
30.0
seconds ifNone
(default).
Added in version 3.11.
バージョン 3.12 で変更: ssl_shutdown_timeout パラメータが追加されました。
- is_closing()¶
ストリームがクローズされたか、またはクローズ処理中の場合に
True
を返します。Added in version 3.7.
使用例¶
ストリームを使った TCP Echo クライアント¶
asyncio.open_connection()
関数を使った TCP Echo クライアントです:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
参考
TCP エコークライアントプロトコル の例は低水準の loop.create_connection()
メソッドを使っています。
ストリームを使った TCP Echo サーバー¶
asyncio.start_server()
関数を使った TCP Echo サーバーです:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
asyncio.run(main())
参考
TCP エコーサーバープロトコル の例は loop.create_server()
メソッドを使っています。
HTTP ヘッダーの取得¶
コマンドラインから渡された URL の HTTP ヘッダーを問い合わせる簡単な例です:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
await writer.wait_closed()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
使い方:
python example.py http://example.com/path/page.html
または HTTPS を使用:
python example.py https://example.com/path/page.html
ストリームを使ってデータを待つオープンソケットの登録¶
open_connection()
関数を使ってソケットがデータを受信するまで待つコルーチンです:
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
await writer.wait_closed()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
参考
プロトコルを使ってオープンしたソケットをデータ待ち受けのために登録する 例では、低水準のプロトコルと loop.create_connection()
メソッドを使っています。
ファイル記述子の読み出しイベントを監視する 例では、低水準の loop.add_reader()
メソッドを使ってファイル記述子を監視しています。