ストリーム

ソースコード: Lib/asyncio/streams.py


ストリームはネットワークコネクションと合わせて動作する高水準の async/await 可能な基本要素です。ストリームはコールバックや低水準のプロトコルやトランスポートを使うことなくデータを送受信することを可能にします。

以下は asyncio ストリームを使って書いた TCP エコークライアントの例です:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

下記の 使用例 節も参照してください。

ストリーム関数

以下の asyncio のトップレベル関数はストリームの作成や操作を行うことができます:

coroutine asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)

ネットワークコネクションを確立し、 (reader, writer) のオブジェクトのペアを返します。

戻り値の readerwriter はそれぞれ StreamReaderStreamWriter クラスのインスタンスです。

The loop argument is optional and can always be determined automatically when this function is awaited from a coroutine.

limit は戻り値の StreamReader インスタンスが利用するバッファのサイズの上限値を設定します。デフォルトでは limit は 64 KiB に設定されます。

残りの引数は直接 loop.create_connection() に渡されます。

バージョン 3.7 で追加: The ssl_handshake_timeout parameter.

バージョン 3.8 で追加: Added happy_eyeballs_delay and interleave parameters.

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

ソケットサーバーを起動します。

client_connected_cb コールバックは新しいクライアントコネクションが確立されるたびに呼び出されます。このコールバックは StreamReaderStreamWriter クラスのインスタンスのペア (reader, writer) を2つの引数として受け取ります。

client_connected_cb には単純な呼び出し可能オブジェクトか、または コルーチン関数 を指定します; コルーチン関数が指定された場合、コールバックの呼び出しは自動的に Task としてスケジュールされます。

The loop argument is optional and can always be determined automatically when this method is awaited from a coroutine.

limit は戻り値の StreamReader インスタンスが利用するバッファのサイズの上限値を設定します。デフォルトでは limit は 64 KiB に設定されます。

残りの引数は直接 loop.create_server() に渡されます。

バージョン 3.7 で追加: The ssl_handshake_timeout and start_serving parameters.

Unix ソケット

coroutine asyncio.open_unix_connection(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)

Unix ソケットコネクションを確立し、 (reader, writer) のオブジェクトのペアを返します。

この関数は open_connection() と似ていますが Unix ソケットに対して動作します。

loop.create_unix_connection() のドキュメントも参照してください。

利用可能な環境: Unix。

バージョン 3.7 で追加: The ssl_handshake_timeout parameter.

バージョン 3.7 で変更: The path parameter can now be a path-like object

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

Unix のソケットサーバーを起動します。

start_server() と似ていますが Unix ソケットに対して動作します。

loop.create_unix_server() のドキュメントも参照してください。

利用可能な環境: Unix。

バージョン 3.7 で追加: The ssl_handshake_timeout and start_serving parameters.

バージョン 3.7 で変更: The path parameter can now be a path-like object.

StreamReader

class asyncio.StreamReader

IO ストリームからデータを読み出すための API を提供するリーダーオブジェクトを表します。

StreamReader オブジェクトを直接インスタンス化することは推奨されません; 代わりに open_connection()start_server() を使ってください。

coroutine read(n=-1)

n バイト読み込みます。n が指定されないか -1 が指定されていた場合 EOF になるまで読み込み、全データを返します。

EOF を受信し、かつ内部バッファーが空の場合、空の bytes オブジェクトを返します。

coroutine readline()

1 行読み込みます。 "行" とは、\n で終了するバイト列のシーケンスです。

EOF を受信し、かつ \n が見つからない場合、このメソッドは部分的に読み込んだデータを返します。

EOF を受信し、かつ内部バッファーが空の場合、空の bytes オブジェクトを返します。

coroutine readexactly(n)

厳密に n バイトのデータを読み出します。

n バイトを読み出す前に EOF に達した場合 IncompleteReadError を送出します。部分的に読み出したデータを取得するには IncompleteReadError.partial 属性を使ってください。

coroutine readuntil(separator=b'\n')

separator が見つかるまでストリームからデータを読み出します。

成功時には、データと区切り文字は内部バッファから削除されます (消費されます)。返されるデータの最後には区切り文字が含まれます。

読み出したデータの量が設定したストリームの上限を超えると LimitOverrunError 例外が送出されます。このときデータは内部バッファーに残され、再度読み出すことができます。

完全な区切り文字が見つかる前に EOF に達すると IncompleteReadError 例外が送出され、内部バッファーがリセットされます。このとき IncompleteReadError.partial 属性は区切り文字の一部を含むかもしれません。

バージョン 3.5.2 で追加.

at_eof()

バッファーが空で feed_eof() が呼ばれていた場合 True を返します。

StreamWriter

class asyncio.StreamWriter

IO ストリームにデータを書き込むための API を提供するライターオブジェクトを表します。

StreamWriter オブジェクトを直接インスタンス化することは推奨されません; 代わりに open_connection()start_server() を使ってください。

write(data)

このメソッドは、背後にあるソケットにデータ data を即座に書き込みます。書き込みに失敗した場合、データは送信可能になるまで内部の書き込みバッファーに格納されて待機します。

このメソッドは drain() メソッドと組み合わせて使うべきです:

stream.write(data)
await stream.drain()
writelines(data)

このメソッドは、背後にあるソケットにバイトデータのリスト (またはイテラブル) を即座に書き込みます。書き込みに失敗した場合、データは送信可能になるまで内部の書き込みバッファーに格納されて待機します。

このメソッドは drain() メソッドと組み合わせて使うべきです:

stream.writelines(lines)
await stream.drain()
close()

このメソッドはストリームと背後にあるソケットをクローズします。

このメソッドは wait_closed() メソッドと組み合わせて使うべきです:

stream.close()
await stream.wait_closed()
can_write_eof()

背後にあるトランスポートが write_eof() メソッドをサポートしている場合 True を返し、そうでない場合は False を返します。

write_eof()

バッファーされた書き込みデータを全て書き込んでから、ストリームの書き込み側終端をクローズします。

transport

背後にある asyncio トランスポートを返します。

get_extra_info(name, default=None)

オプションのトランスポート情報にアクセスします。詳細は BaseTransport.get_extra_info() を参照してください。

coroutine drain()

ストリームへの書き込み再開に適切な状態になるまで待ちます。使用例:

writer.write(data)
await writer.drain()

このメソッドは背後にある IO 書き込みバッファーとやりとりを行うフロー制御メソッドです。バッファーのサイズが最高水位点に達した場合、drain() はバッファのサイズが最低水位点を下回るまで減量され、書き込み再開可能になるまで書き込みをブロックします。待ち受けの必要がない場合、 drain() は即座にリターンします。

is_closing()

ストリームがクローズされたか、またはクローズ処理中の場合に True を返します。

バージョン 3.7 で追加.

coroutine wait_closed()

ストリームがクローズされるまで待機します。

このメソッドは、 close() を呼び出した後に、コネクションがクローズされるまで待機するために呼び出すべきです。

バージョン 3.7 で追加.

使用例

ストリームを使った TCP Echo クライアント

asyncio.open_connection() 関数を使った TCP Echo クライアントです:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()

asyncio.run(tcp_echo_client('Hello World!'))

参考

TCP エコークライアントプロトコル の例は低水準の loop.create_connection() メソッドを使っています。

ストリームを使った TCP Echo サーバー

asyncio.start_server() 関数を使った TCP Echo サーバーです:

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

参考

TCP エコーサーバープロトコル の例は loop.create_server() メソッドを使っています。

HTTP ヘッダーの取得

コマンドラインから渡された URL の HTTP ヘッダーを問い合わせる簡単な例です:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

使い方:

python example.py http://example.com/path/page.html

または HTTPS を使用:

python example.py https://example.com/path/page.html

ストリームを使ってデータを待つオープンソケットの登録

open_connection() 関数を使ってソケットがデータを受信するまで待つコルーチンです:

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

参考

プロトコルを使ってオープンしたソケットをデータ待ち受けのために登録する 例では、低水準のプロトコルと loop.create_connection() メソッドを使っています。

ファイル記述子の読み出しイベントを監視する 例では、低水準の loop.add_reader() メソッドを使ってファイル記述子を監視しています。