18.5.5. ストリーム (コルーチンベースの API)

ソースコード: Lib/asyncio/streams.py

18.5.5.1. ストリーム関数

注釈

このモジュール内のトップレベル関数は、便利なラッパーとしてのみ意図されています。特別なことは何もありませんし、それらが思い通りに動作しない場合は、ご自由にコードをコピーしてください。

coroutine asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, **kwds)

create_connection() のラッパーで (reader, writer) ペアを返します。

返されたリーダーは StreamReader のインスタンスで、ライターは StreamWriter のインスタンスです。

引数は protocol_factory を除き、 AbstractEventLoop.create_connection() の通常の引数です; 最も一般的なものは位置引数のホストとポートで、その後にオプションのキーワード引数が続きます。

追加のキーワード引数 loop には使用するイベントループインスタンスを、limit には StreamReader に渡すバッファーリミットを設定します。

この関数は コルーチン です。

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, **kwds)

各クライアントが接続するコールバックでソケットサーバーを開始します。戻り値は create_server() と同じです。

client_connected_cb 引数は client_readerclient_writer という 2 個の引数で呼び出されます。 client_readerStreamReader オブジェクトで、client_writerStreamWriter オブジェクトです。 client_connected_cb 引数には単純なコールバック関数か コルーチン関数 のどちらかを指定できます; コルーチン関数の場合、自動的に Task に変換されます。

残りの引数は protocol_factory を除きすべて create_server() の通常の引数です; 最も一般的なのは位置引数 hostport で、さまざまなオプションのキーワード引数が続きます。

追加のキーワード引数 loop には使用するイベントループインスタンスを、limit には StreamReader に渡すバッファーリミットを設定します。

この関数は コルーチン です。

coroutine asyncio.open_unix_connection(path=None, *, loop=None, limit=None, **kwds)

create_unix_connection() のラッパーで (reader, writer) ペアを返します。

戻り値やその他詳細については open_connection() を参照してください。

この関数は コルーチン です。

利用できる環境: UNIX。

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, **kwds)

接続された各クライアントごとのコールバックとともに UNIX ドメインソケットサーバーを開始します。

戻り値やその他詳細については start_server() を参照してください。

この関数は コルーチン です。

利用できる環境: UNIX。

18.5.5.2. StreamReader

class asyncio.StreamReader(limit=_DEFAULT_LIMIT, loop=None)

このクラスは スレッド安全ではありません

The limit argument's default value is set to _DEFAULT_LIMIT which is 2**16 (64 KiB)

exception()

例外を取得します。

feed_eof()

EOF の肯定応答を行います。

feed_data(data)

バイト列 data を内部バッファーに取り込みます。データを待っているあらゆる処理が再開されます。

set_exception(exc)

例外を設定します。

set_transport(transport)

トランスポートを設定します。

coroutine read(n=-1)

n バイト読み込みます。n が指定されないか -1 が指定されていた場合 EOF になるまで読み込み、全データを返します。

EOF に達しており内部バッファーが空であれば、空の bytes オブジェクトを返します。

このメソッドは コルーチン です。

coroutine readline()

1 行読み込みます。 "行" とは、\n で終了するバイト列のシーケンスです。

EOF を受信し、かつ \n が見つからない場合、このメソッドは読み込んだ分の不完全なバイト列を返します。

EOF に達しており内部バッファーが空であれば、空の bytes オブジェクトを返します。

このメソッドは コルーチン です。

coroutine readexactly(n)

厳密に n バイト読み込みます。n バイト読み込む前にストリームの終端に達したとき、IncompleteReadError を送出します。例外の IncompleteReadError.partial 属性に、読み込んだ分の不完全なバイト列が格納されます。

このメソッドは コルーチン です。

coroutine readuntil(separator=b'\n')

separator が見つかるまでストリームからデータを読み込みます。

成功時には、データと区切り文字は内部バッファから削除されます (消費されます)。返されるデータの最後には区切り文字が含まれます。

設定したストリームの制限は、結果を検証するために使用されます。制限により、返すことのできるデータの最大長さ (区切り文字を含まず) が設定されます。

EOF が発生し、完全な区切り文字が見つからない場合には、IncompleteReadError 例外が創出されるとともに、内部バッファはリセットされます。 IncompleteReadError.partial 属性に、区切り文字が部分的に含まれる場合があります。

制限超過のためにデータを読み取ることができない場合、LimitOverrunError 例外が送出され、データは内部バッファ内に残ります。そのため、データを再度読み取ることができます。

バージョン 3.5.2 で追加.

at_eof()

バッファーが空で feed_eof() が呼ばれていた場合 True を返します。

18.5.5.3. StreamWriter

class asyncio.StreamWriter(transport, protocol, reader, loop)

トランスポートをラップします。

これは write()writelines()can_write_eof()write_eof()get_extra_info() および close() メソッドを提供します。フロー制御を待機できる任意の Future を返す drain() メソッドを追加します。また、Transport を直接参照する transport 属性も追加します。

このクラスは スレッド安全ではありません

transport

トランスポートです。

can_write_eof()

トランスポートが write_eof() をサポートしている場合は True を、していない場合は False を返します。WriteTransport.can_write_eof() を参照してください。

close()

トランスポートを閉じます: BaseTransport.close() を参照してください。

coroutine drain()

下層のトランスポートの書き込みバッファーがフラッシュされる機会を与えます。

意図されている用途は書き込みです:

w.write(data)
yield from w.drain()

トランスポートバッファのサイズが最高水位点に達した場合 (プロトコルが一時停止された場合)、バッファサイズが最低水位点まで引き出されて、プロトコルが再開されるまで、ブロックします。待機するものがなくなると、直ちに yield-from を続行します。

drain() から yield することにより、ループは書き込み操作をスケジュールし、バッファのフラッシュを行うことができます。このことは、トランスポートに大量のデータが書き込まれる可能性がある場合に非常に有用で、コルーチンは write() 呼び出しの間に yield-from を行いません。

このメソッドは コルーチン です。

get_extra_info(name, default=None)

オプションのトランスポート情報を返します: BaseTransport.get_extra_info() を参照してください。

write(data)

トランスポートにバイト列 data を書き込みます: WriteTransport.write() を参照してください。

writelines(data)

バイト列のデータのリスト (またはリテラブル) をトランスポートに書き込みます: WriteTransport.writelines() を参照してください。

write_eof()

バッファーされたデータをフラッシュした後送信側のトランスポートをクローズします: WriteTransport.write_eof() を参照してください。

18.5.5.4. StreamReaderProtocol

class asyncio.StreamReaderProtocol(stream_reader, client_connected_cb=None, loop=None)

ProtocolStreamReader を適合させる些末なヘルパークラスです。Protocol のサブクラスです。

stream_readerStreamReader のインスタンスです。client_connected_cb は接続されたときに (stream_reader, stream_writer) を引数として呼び出されるオプションの関数です。loop は使用するイベントループのインスタンスです。

(これは StreamReader 自身を Protocol のサブクラスとする代わりのヘルパークラスです。StreamReader はその他の潜在的な用途を持つため、そして StreamReader の利用者が誤って不適切なプロトコルのメソッドを呼び出すことを回避するためこのように実装されています)

18.5.5.5. IncompleteReadError

exception asyncio.IncompleteReadError

不完全な読み込みエラーです。EOFError のサブクラスです。

expected

想定されていたバイト数 (int) です。

partial

ストリームの終端に達する前に読み込んだバイト文字列 (bytes) です。

18.5.5.6. LimitOverrunError

exception asyncio.LimitOverrunError

区切り文字を探している間にバッファリミットに到達しました。

consumed

未消費のバイトの合計数。

18.5.5.7. ストリームの例

18.5.5.7.1. ストリームを使った TCP Echo クライアント

asyncio.open_connection() 関数を使った TCP Echo クライアントです:

import asyncio

@asyncio.coroutine
def tcp_echo_client(message, loop):
    reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888,
                                                        loop=loop)

    print('Send: %r' % message)
    writer.write(message.encode())

    data = yield from reader.read(100)
    print('Received: %r' % data.decode())

    print('Close the socket')
    writer.close()

message = 'Hello World!'
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_echo_client(message, loop))
loop.close()

18.5.5.7.2. ストリームを使った TCP Echo サーバー

asyncio.start_server() 関数を使った TCP Echo サーバーです:

import asyncio

@asyncio.coroutine
def handle_echo(reader, writer):
    data = yield from reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    print("Received %r from %r" % (message, addr))

    print("Send: %r" % message)
    writer.write(data)
    yield from writer.drain()

    print("Close the client socket")
    writer.close()

loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

18.5.5.7.3. HTTP ヘッダーの取得

コマンドラインから渡された URL の HTTP ヘッダーを問い合わせる簡単な例です:

import asyncio
import urllib.parse
import sys

@asyncio.coroutine
def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        connect = asyncio.open_connection(url.hostname, 443, ssl=True)
    else:
        connect = asyncio.open_connection(url.hostname, 80)
    reader, writer = yield from connect
    query = ('HEAD {path} HTTP/1.0\r\n'
             'Host: {hostname}\r\n'
             '\r\n').format(path=url.path or '/', hostname=url.hostname)
    writer.write(query.encode('latin-1'))
    while True:
        line = yield from reader.readline()
        if not line:
            break
        line = line.decode('latin1').rstrip()
        if line:
            print('HTTP header> %s' % line)

    # Ignore the body, close the socket
    writer.close()

url = sys.argv[1]
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(print_http_headers(url))
loop.run_until_complete(task)
loop.close()

使い方:

python example.py http://example.com/path/page.html

または HTTPS を使用:

python example.py https://example.com/path/page.html

18.5.5.7.4. ストリームを使ってデータを待つオープンソケットの登録

open_connection() 関数を使ってソケットがデータを受信するまで待つコルーチンです:

import asyncio
try:
    from socket import socketpair
except ImportError:
    from asyncio.windows_utils import socketpair

@asyncio.coroutine
def wait_for_data(loop):
    # Create a pair of connected sockets
    rsock, wsock = socketpair()

    # Register the open socket to wait for data
    reader, writer = yield from asyncio.open_connection(sock=rsock, loop=loop)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = yield from reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()

    # Close the second socket
    wsock.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(wait_for_data(loop))
loop.close()

参考

プロトコルを使ってデータを待つオープンソケットの登録 の例では AbstractEventLoop.create_connection() メソッドによって作成された低レベルプロトコルを使用しています。

読み込みイベント用のファイル記述子の監視 の例では、ソケットのファイル記述子を登録するのに低水準の AbstractEventLoop.add_reader() メソッドを使用しています。