ストリーム

ソースコード: Lib/asyncio/streams.py


ストリームはネットワークコネクションと合わせて動作する高水準の async/await 可能な基本要素です。ストリームはコールバックや低水準のプロトコルやトランスポートを使うことなくデータを送受信することを可能にします。

以下は asyncio ストリームを使って書いた TCP エコークライアントの例です:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

下記の 使用例 節も参照してください。

ストリーム関数

以下の asyncio のトップレベル関数はストリームの作成や操作を行うことができます:

coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)

ネットワークコネクションを確立し、 (reader, writer) のオブジェクトのペアを返します。

戻り値の readerwriter はそれぞれ StreamReaderStreamWriter クラスのインスタンスです。

limit は戻り値の StreamReader インスタンスが利用するバッファのサイズの上限値を設定します。デフォルトでは limit は 64 KiB に設定されます。

残りの引数は直接 loop.create_connection() に渡されます。

注釈

The sock argument transfers ownership of the socket to the StreamWriter created. To close the socket, call its close() method.

バージョン 3.7 で変更: Added the ssl_handshake_timeout parameter.

バージョン 3.8 で追加: happy_eyeballs_delayinterleave の2つのパラメータが追加されました。

バージョン 3.10 で変更: loop パラメータが削除されました。

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

ソケットサーバーを起動します。

client_connected_cb コールバックは新しいクライアントコネクションが確立されるたびに呼び出されます。このコールバックは StreamReaderStreamWriter クラスのインスタンスのペア (reader, writer) を2つの引数として受け取ります。

client_connected_cb には単純な呼び出し可能オブジェクトか、または コルーチン関数 を指定します; コルーチン関数が指定された場合、コールバックの呼び出しは自動的に Task としてスケジュールされます。

limit は戻り値の StreamReader インスタンスが利用するバッファのサイズの上限値を設定します。デフォルトでは limit は 64 KiB に設定されます。

残りの引数は直接 loop.create_server() に渡されます。

注釈

The sock argument transfers ownership of the socket to the server created. To close the socket, call the server's close() method.

バージョン 3.7 で変更: ssl_handshake_timeoutstart_serving パラメータが追加されました。

バージョン 3.10 で変更: loop パラメータが削除されました。

Unix ソケット

coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)

Unix ソケットコネクションを確立し、 (reader, writer) のオブジェクトのペアを返します。

この関数は open_connection() と似ていますが Unix ソケットに対して動作します。

loop.create_unix_connection() のドキュメントも参照してください。

注釈

The sock argument transfers ownership of the socket to the StreamWriter created. To close the socket, call its close() method.

利用可能な環境: Unix。

バージョン 3.7 で変更: Added the ssl_handshake_timeout parameter. The path parameter can now be a path-like object

バージョン 3.10 で変更: loop パラメータが削除されました。

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

Unix のソケットサーバーを起動します。

start_server() と似ていますが Unix ソケットに対して動作します。

loop.create_unix_server() のドキュメントも参照してください。

注釈

The sock argument transfers ownership of the socket to the server created. To close the socket, call the server's close() method.

利用可能な環境: Unix。

バージョン 3.7 で変更: Added the ssl_handshake_timeout and start_serving parameters. The path parameter can now be a path-like object.

バージョン 3.10 で変更: loop パラメータが削除されました。

StreamReader

class asyncio.StreamReader

Represents a reader object that provides APIs to read data from the IO stream. As an asynchronous iterable, the object supports the async for statement.

StreamReader オブジェクトを直接インスタンス化することは推奨されません; 代わりに open_connection()start_server() を使ってください。

coroutine read(n=- 1)

Read up to n bytes from the stream.

If n is not provided or set to -1, read until EOF, then return all read bytes. If EOF was received and the internal buffer is empty, return an empty bytes object.

If n is 0, return an empty bytes object immediately.

If n is positive, return at most n available bytes as soon as at least 1 byte is available in the internal buffer. If EOF is received before any byte is read, return an empty bytes object.

coroutine readline()

1 行読み込みます。 "行" とは、\n で終了するバイト列のシーケンスです。

EOF を受信し、かつ \n が見つからない場合、このメソッドは部分的に読み込んだデータを返します。

EOF を受信し、かつ内部バッファーが空の場合、空の bytes オブジェクトを返します。

coroutine readexactly(n)

厳密に n バイトのデータを読み出します。

n バイトを読み出す前に EOF に達した場合 IncompleteReadError を送出します。部分的に読み出したデータを取得するには IncompleteReadError.partial 属性を使ってください。

coroutine readuntil(separator=b'\n')

separator が見つかるまでストリームからデータを読み出します。

成功時には、データと区切り文字は内部バッファから削除されます (消費されます)。返されるデータの最後には区切り文字が含まれます。

読み出したデータの量が設定したストリームの上限を超えると LimitOverrunError 例外が送出されます。このときデータは内部バッファーに残され、再度読み出すことができます。

完全な区切り文字が見つかる前に EOF に達すると IncompleteReadError 例外が送出され、内部バッファーがリセットされます。このとき IncompleteReadError.partial 属性は区切り文字の一部を含むかもしれません。

バージョン 3.5.2 で追加.

at_eof()

バッファーが空で feed_eof() が呼ばれていた場合 True を返します。

StreamWriter

class asyncio.StreamWriter

IO ストリームにデータを書き込むための API を提供するライターオブジェクトを表します。

StreamWriter オブジェクトを直接インスタンス化することは推奨されません; 代わりに open_connection()start_server() を使ってください。

write(data)

このメソッドは、背後にあるソケットにデータ data を即座に書き込みます。書き込みに失敗した場合、データは送信可能になるまで内部の書き込みバッファーに格納されて待機します。

このメソッドは drain() メソッドと組み合わせて使うべきです:

stream.write(data)
await stream.drain()
writelines(data)

このメソッドは、背後にあるソケットにバイトデータのリスト (またはイテラブル) を即座に書き込みます。書き込みに失敗した場合、データは送信可能になるまで内部の書き込みバッファーに格納されて待機します。

このメソッドは drain() メソッドと組み合わせて使うべきです:

stream.writelines(lines)
await stream.drain()
close()

このメソッドはストリームと背後にあるソケットをクローズします。

The method should be used, though not mandatory, along with the wait_closed() method:

stream.close()
await stream.wait_closed()
can_write_eof()

背後にあるトランスポートが write_eof() メソッドをサポートしている場合 True を返し、そうでない場合は False を返します。

write_eof()

バッファーされた書き込みデータを全て書き込んでから、ストリームの書き込み側終端をクローズします。

transport

背後にある asyncio トランスポートを返します。

get_extra_info(name, default=None)

オプションのトランスポート情報にアクセスします。詳細は BaseTransport.get_extra_info() を参照してください。

coroutine drain()

ストリームへの書き込み再開に適切な状態になるまで待ちます。使用例:

writer.write(data)
await writer.drain()

このメソッドは背後にある IO 書き込みバッファーとやりとりを行うフロー制御メソッドです。バッファーのサイズが最高水位点に達した場合、drain() はバッファのサイズが最低水位点を下回るまで減量され、書き込み再開可能になるまで書き込みをブロックします。待ち受けの必要がない場合、 drain() は即座にリターンします。

is_closing()

ストリームがクローズされたか、またはクローズ処理中の場合に True を返します。

バージョン 3.7 で追加.

coroutine wait_closed()

ストリームがクローズされるまで待機します。

Should be called after close() to wait until the underlying connection is closed, ensuring that all data has been flushed before e.g. exiting the program.

バージョン 3.7 で追加.

使用例

ストリームを使った TCP Echo クライアント

asyncio.open_connection() 関数を使った TCP Echo クライアントです:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

参考

TCP エコークライアントプロトコル の例は低水準の loop.create_connection() メソッドを使っています。

ストリームを使った TCP Echo サーバー

asyncio.start_server() 関数を使った TCP Echo サーバーです:

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

参考

TCP エコーサーバープロトコル の例は loop.create_server() メソッドを使っています。

HTTP ヘッダーの取得

コマンドラインから渡された URL の HTTP ヘッダーを問い合わせる簡単な例です:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()
    await writer.wait_closed()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

使い方:

python example.py http://example.com/path/page.html

または HTTPS を使用:

python example.py https://example.com/path/page.html

ストリームを使ってデータを待つオープンソケットの登録

open_connection() 関数を使ってソケットがデータを受信するまで待つコルーチンです:

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()
    await writer.wait_closed()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

参考

プロトコルを使ってオープンしたソケットをデータ待ち受けのために登録する 例では、低水準のプロトコルと loop.create_connection() メソッドを使っています。

ファイル記述子の読み出しイベントを監視する 例では、低水準の loop.add_reader() メソッドを使ってファイル記述子を監視しています。