ストリーム¶
ソースコード: Lib/asyncio/streams.py
ストリームはネットワークコネクションと合わせて動作する高水準の async/await 可能な基本要素です。ストリームはコールバックや低水準のプロトコルやトランスポートを使うことなくデータを送受信することを可能にします。
以下は asyncio ストリームを使って書いた TCP エコークライアントの例です:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
下記の 使用例 節も参照してください。
ストリーム関数
以下の asyncio のトップレベル関数はストリームの作成や操作を行うことができます:
-
coroutine
asyncio.
open_connection
(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)¶ ネットワークコネクションを確立し、
(reader, writer)
のオブジェクトのペアを返します。戻り値の reader と writer はそれぞれ
StreamReader
とStreamWriter
クラスのインスタンスです。The loop argument is optional and can always be determined automatically when this function is awaited from a coroutine.
limit は戻り値の
StreamReader
インスタンスが利用するバッファのサイズの上限値を設定します。デフォルトでは limit は 64 KiB に設定されます。残りの引数は直接
loop.create_connection()
に渡されます。バージョン 3.7 で追加: The ssl_handshake_timeout parameter.
バージョン 3.8 で追加: Added happy_eyeballs_delay and interleave parameters.
-
coroutine
asyncio.
start_server
(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)¶ ソケットサーバーを起動します。
client_connected_cb コールバックは新しいクライアントコネクションが確立されるたびに呼び出されます。このコールバックは
StreamReader
とStreamWriter
クラスのインスタンスのペア(reader, writer)
を2つの引数として受け取ります。client_connected_cb には単純な呼び出し可能オブジェクトか、または コルーチン関数 を指定します; コルーチン関数が指定された場合、コールバックの呼び出しは自動的に
Task
としてスケジュールされます。The loop argument is optional and can always be determined automatically when this method is awaited from a coroutine.
limit は戻り値の
StreamReader
インスタンスが利用するバッファのサイズの上限値を設定します。デフォルトでは limit は 64 KiB に設定されます。残りの引数は直接
loop.create_server()
に渡されます。バージョン 3.7 で追加: The ssl_handshake_timeout and start_serving parameters.
Unix ソケット
-
coroutine
asyncio.
open_unix_connection
(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)¶ Unix ソケットコネクションを確立し、
(reader, writer)
のオブジェクトのペアを返します。この関数は
open_connection()
と似ていますが Unix ソケットに対して動作します。loop.create_unix_connection()
のドキュメントも参照してください。利用可能な環境: Unix。
バージョン 3.7 で追加: The ssl_handshake_timeout parameter.
バージョン 3.7 で変更: The path parameter can now be a path-like object
-
coroutine
asyncio.
start_unix_server
(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)¶ Unix のソケットサーバーを起動します。
start_server()
と似ていますが Unix ソケットに対して動作します。loop.create_unix_server()
のドキュメントも参照してください。利用可能な環境: Unix。
バージョン 3.7 で追加: The ssl_handshake_timeout and start_serving parameters.
バージョン 3.7 で変更: The path parameter can now be a path-like object.
StreamReader¶
-
class
asyncio.
StreamReader
¶ IO ストリームからデータを読み出すための API を提供するリーダーオブジェクトを表します。
StreamReader オブジェクトを直接インスタンス化することは推奨されません; 代わりに
open_connection()
やstart_server()
を使ってください。-
coroutine
read
(n=-1)¶ n バイト読み込みます。n が指定されないか
-1
が指定されていた場合 EOF になるまで読み込み、全データを返します。EOF を受信し、かつ内部バッファーが空の場合、空の
bytes
オブジェクトを返します。
-
coroutine
readline
()¶ 1 行読み込みます。 "行" とは、
\n
で終了するバイト列のシーケンスです。EOF を受信し、かつ
\n
が見つからない場合、このメソッドは部分的に読み込んだデータを返します。EOF を受信し、かつ内部バッファーが空の場合、空の
bytes
オブジェクトを返します。
-
coroutine
readexactly
(n)¶ 厳密に n バイトのデータを読み出します。
n バイトを読み出す前に EOF に達した場合
IncompleteReadError
を送出します。部分的に読み出したデータを取得するにはIncompleteReadError.partial
属性を使ってください。
-
coroutine
readuntil
(separator=b'\n')¶ separator が見つかるまでストリームからデータを読み出します。
成功時には、データと区切り文字は内部バッファから削除されます (消費されます)。返されるデータの最後には区切り文字が含まれます。
読み出したデータの量が設定したストリームの上限を超えると
LimitOverrunError
例外が送出されます。このときデータは内部バッファーに残され、再度読み出すことができます。完全な区切り文字が見つかる前に EOF に達すると
IncompleteReadError
例外が送出され、内部バッファーがリセットされます。このときIncompleteReadError.partial
属性は区切り文字の一部を含むかもしれません。バージョン 3.5.2 で追加.
-
at_eof
()¶ バッファーが空で
feed_eof()
が呼ばれていた場合True
を返します。
-
coroutine
StreamWriter¶
-
class
asyncio.
StreamWriter
¶ IO ストリームにデータを書き込むための API を提供するライターオブジェクトを表します。
StreamWriter オブジェクトを直接インスタンス化することは推奨されません; 代わりに
open_connection()
やstart_server()
を使ってください。-
write
(data)¶ このメソッドは、背後にあるソケットにデータ data を即座に書き込みます。書き込みに失敗した場合、データは送信可能になるまで内部の書き込みバッファーに格納されて待機します。
このメソッドは
drain()
メソッドと組み合わせて使うべきです:stream.write(data) await stream.drain()
-
writelines
(data)¶ このメソッドは、背後にあるソケットにバイトデータのリスト (またはイテラブル) を即座に書き込みます。書き込みに失敗した場合、データは送信可能になるまで内部の書き込みバッファーに格納されて待機します。
このメソッドは
drain()
メソッドと組み合わせて使うべきです:stream.writelines(lines) await stream.drain()
-
close
()¶ このメソッドはストリームと背後にあるソケットをクローズします。
このメソッドは
wait_closed()
メソッドと組み合わせて使うべきです:stream.close() await stream.wait_closed()
-
can_write_eof
()¶ 背後にあるトランスポートが
write_eof()
メソッドをサポートしている場合True
を返し、そうでない場合はFalse
を返します。
-
write_eof
()¶ バッファーされた書き込みデータを全て書き込んでから、ストリームの書き込み側終端をクローズします。
-
transport
¶ 背後にある asyncio トランスポートを返します。
-
get_extra_info
(name, default=None)¶ オプションのトランスポート情報にアクセスします。詳細は
BaseTransport.get_extra_info()
を参照してください。
-
coroutine
drain
()¶ ストリームへの書き込み再開に適切な状態になるまで待ちます。使用例:
writer.write(data) await writer.drain()
このメソッドは背後にある IO 書き込みバッファーとやりとりを行うフロー制御メソッドです。バッファーのサイズが最高水位点に達した場合、drain() はバッファのサイズが最低水位点を下回るまで減量され、書き込み再開可能になるまで書き込みをブロックします。待ち受けの必要がない場合、
drain()
は即座にリターンします。
-
is_closing
()¶ ストリームがクローズされたか、またはクローズ処理中の場合に
True
を返します。バージョン 3.7 で追加.
-
使用例¶
ストリームを使った TCP Echo クライアント¶
asyncio.open_connection()
関数を使った TCP Echo クライアントです:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
asyncio.run(tcp_echo_client('Hello World!'))
参考
TCP エコークライアントプロトコル の例は低水準の loop.create_connection()
メソッドを使っています。
ストリームを使った TCP Echo サーバー¶
asyncio.start_server()
関数を使った TCP Echo サーバーです:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
asyncio.run(main())
参考
TCP エコーサーバープロトコル の例は loop.create_server()
メソッドを使っています。
HTTP ヘッダーの取得¶
コマンドラインから渡された URL の HTTP ヘッダーを問い合わせる簡単な例です:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
使い方:
python example.py http://example.com/path/page.html
または HTTPS を使用:
python example.py https://example.com/path/page.html
ストリームを使ってデータを待つオープンソケットの登録¶
open_connection()
関数を使ってソケットがデータを受信するまで待つコルーチンです:
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
参考
プロトコルを使ってオープンしたソケットをデータ待ち受けのために登録する 例では、低水準のプロトコルと loop.create_connection()
メソッドを使っています。
ファイル記述子の読み出しイベントを監視する 例では、低水準の loop.add_reader()
メソッドを使ってファイル記述子を監視しています。