18.5.5. ストリーム (コルーチンベースの API)¶
18.5.5.1. ストリーム関数¶
注釈
このモジュール内のトップレベル関数は、便利なラッパーとしてのみ意図されています。特別なことは何もありませんし、それらが思い通りに動作しない場合は、ご自由にコードをコピーしてください。
-
coroutine
asyncio.
open_connection
(host=None, port=None, *, loop=None, limit=None, **kwds)¶ create_connection()
のラッパーで (reader, writer) ペアを返します。返されたリーダーは
StreamReader
のインスタンスで、ライターはStreamWriter
のインスタンスです。引数は protocol_factory を除き、
AbstractEventLoop.create_connection()
の通常の引数です; 最も一般的なものは位置引数のホストとポートで、その後にオプションのキーワード引数が続きます。追加のキーワード引数 loop には使用するイベントループインスタンスを、limit には
StreamReader
に渡すバッファーリミットを設定します。この関数は コルーチン です。
-
coroutine
asyncio.
start_server
(client_connected_cb, host=None, port=None, *, loop=None, limit=None, **kwds)¶ 各クライアントが接続するコールバックでソケットサーバーを開始します。戻り値は
create_server()
と同じです。client_connected_cb 引数は client_reader と client_writer という 2 個の引数で呼び出されます。 client_reader は
StreamReader
オブジェクトで、client_writer はStreamWriter
オブジェクトです。 client_connected_cb 引数には単純なコールバック関数か コルーチン関数 のどちらかを指定できます; コルーチン関数の場合、自動的にTask
に変換されます。残りの引数は protocol_factory を除きすべて
create_server()
の通常の引数です; 最も一般的なのは位置引数 host と port で、さまざまなオプションのキーワード引数が続きます。追加のキーワード引数 loop には使用するイベントループインスタンスを、limit には
StreamReader
に渡すバッファーリミットを設定します。この関数は コルーチン です。
-
coroutine
asyncio.
open_unix_connection
(path=None, *, loop=None, limit=None, **kwds)¶ create_unix_connection()
のラッパーで (reader, writer) ペアを返します。戻り値やその他詳細については
open_connection()
を参照してください。この関数は コルーチン です。
利用できる環境: UNIX。
-
coroutine
asyncio.
start_unix_server
(client_connected_cb, path=None, *, loop=None, limit=None, **kwds)¶ 接続された各クライアントごとのコールバックとともに UNIX ドメインソケットサーバーを開始します。
戻り値やその他詳細については
start_server()
を参照してください。この関数は コルーチン です。
利用できる環境: UNIX。
18.5.5.2. StreamReader¶
-
class
asyncio.
StreamReader
(limit=None, loop=None)¶ このクラスは スレッド安全ではありません。
-
exception
()¶ 例外を取得します。
-
feed_eof
()¶ EOF の肯定応答を行います。
-
feed_data
(data)¶ バイト列 data を内部バッファーに取り込みます。データを待っているあらゆる処理が再開されます。
-
set_exception
(exc)¶ 例外を設定します。
-
set_transport
(transport)¶ トランスポートを設定します。
-
coroutine
read
(n=-1)¶ n バイト読み込みます。n が指定されないか
-1
が指定されていた場合 EOF になるまで読み込み、全データを返します。EOF に達しており内部バッファーが空であれば、空の
bytes
オブジェクトを返します。このメソッドは コルーチン です。
-
coroutine
readline
()¶ 1 行読み込みます。 "行" とは、
\n
で終了するバイト列のシーケンスです。EOF を受信し、かつ
\n
が見つからない場合、このメソッドは読み込んだ分の不完全なバイト列を返します。EOF に達しており内部バッファーが空であれば、空の
bytes
オブジェクトを返します。このメソッドは コルーチン です。
-
coroutine
readexactly
(n)¶ 厳密に n バイト読み込みます。n バイト読み込む前にストリームの終端に達したとき、
IncompleteReadError
を送出します。例外のIncompleteReadError.partial
属性に、読み込んだ分の不完全なバイト列が格納されます。このメソッドは コルーチン です。
-
coroutine
readuntil
(separator=b'\n')¶ separator
が見つかるまでストリームからデータを読み込みます。成功時には、データと区切り文字は内部バッファから削除されます (消費されます)。返されるデータの最後には区切り文字が含まれます。
設定したストリームの制限は、結果を検証するために使用されます。制限により、返すことのできるデータの最大長さ (区切り文字を含まず) が設定されます。
EOF が発生し、完全な区切り文字が見つからない場合には、
IncompleteReadError
例外が創出されるとともに、内部バッファはリセットされます。IncompleteReadError.partial
属性に、区切り文字が部分的に含まれる場合があります。制限超過のためにデータを読み取ることができない場合、
LimitOverrunError
例外が送出され、データは内部バッファ内に残ります。そのため、データを再度読み取ることができます。バージョン 3.5.2 で追加.
-
at_eof
()¶ バッファーが空で
feed_eof()
が呼ばれていた場合True
を返します。
-
18.5.5.3. StreamWriter¶
-
class
asyncio.
StreamWriter
(transport, protocol, reader, loop)¶ トランスポートをラップします。
これは
write()
、writelines()
、can_write_eof()
、write_eof()
、get_extra_info()
およびclose()
メソッドを提供します。フロー制御を待機できる任意のFuture
を返すdrain()
メソッドを追加します。また、Transport
を直接参照する transport 属性も追加します。このクラスは スレッド安全ではありません。
-
transport
¶ トランスポートです。
-
can_write_eof
()¶ トランスポートが
write_eof()
をサポートしている場合はTrue
を、していない場合はFalse
を返します。WriteTransport.can_write_eof()
を参照してください。
-
close
()¶ トランスポートを閉じます:
BaseTransport.close()
を参照してください。
-
coroutine
drain
()¶ 下層のトランスポートの書き込みバッファーがフラッシュされる機会を与えます。
意図されている用途は書き込みです:
w.write(data) yield from w.drain()
トランスポートバッファのサイズが最高水位点に達した場合 (プロトコルが一時停止された場合)、バッファサイズが最低水位点まで引き出されて、プロトコルが再開されるまで、ブロックします。待機するものがなくなると、直ちに yield-from を続行します。
drain()
から yield することにより、ループは書き込み操作をスケジュールし、バッファのフラッシュを行うことができます。このことは、トランスポートに大量のデータが書き込まれる可能性がある場合に非常に有用で、コルーチンはwrite()
呼び出しの間に yield-from を行いません。このメソッドは コルーチン です。
-
get_extra_info
(name, default=None)¶ オプションのトランスポート情報を返します:
BaseTransport.get_extra_info()
を参照してください。
-
write
(data)¶ トランスポートにバイト列 data を書き込みます:
WriteTransport.write()
を参照してください。
-
writelines
(data)¶ バイト列のデータのリスト (またはリテラブル) をトランスポートに書き込みます:
WriteTransport.writelines()
を参照してください。
-
write_eof
()¶ バッファーされたデータをフラッシュした後送信側のトランスポートをクローズします:
WriteTransport.write_eof()
を参照してください。
-
18.5.5.4. StreamReaderProtocol¶
-
class
asyncio.
StreamReaderProtocol
(stream_reader, client_connected_cb=None, loop=None)¶ Protocol
とStreamReader
を適合させる些末なヘルパークラスです。Protocol
のサブクラスです。stream_reader は
StreamReader
のインスタンスです。client_connected_cb は接続されたときに (stream_reader, stream_writer) を引数として呼び出されるオプションの関数です。loop は使用するイベントループのインスタンスです。(これは
StreamReader
自身をProtocol
のサブクラスとする代わりのヘルパークラスです。StreamReader
はその他の潜在的な用途を持つため、そしてStreamReader
の利用者が誤って不適切なプロトコルのメソッドを呼び出すことを回避するためこのように実装されています)
18.5.5.5. IncompleteReadError¶
18.5.5.6. LimitOverrunError¶
18.5.5.7. ストリームの例¶
18.5.5.7.1. ストリームを使った TCP Echo クライアント¶
asyncio.open_connection()
関数を使った TCP Echo クライアントです:
import asyncio
@asyncio.coroutine
def tcp_echo_client(message, loop):
reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888,
loop=loop)
print('Send: %r' % message)
writer.write(message.encode())
data = yield from reader.read(100)
print('Received: %r' % data.decode())
print('Close the socket')
writer.close()
message = 'Hello World!'
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_echo_client(message, loop))
loop.close()
参考
AbstractEventLoop.create_connection()
メソッドを使った TCP Echo クライアントプロトコル の例
18.5.5.7.2. ストリームを使った TCP Echo サーバー¶
asyncio.start_server()
関数を使った TCP Echo サーバーです:
import asyncio
@asyncio.coroutine
def handle_echo(reader, writer):
data = yield from reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print("Received %r from %r" % (message, addr))
print("Send: %r" % message)
writer.write(data)
yield from writer.drain()
print("Close the client socket")
writer.close()
loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
参考
AbstractEventLoop.create_server()
メソッドを使った TCP Echo サーバープロトコル の例
18.5.5.7.3. HTTP ヘッダーの取得¶
コマンドラインから渡された URL の HTTP ヘッダーを問い合わせる簡単な例です:
import asyncio
import urllib.parse
import sys
@asyncio.coroutine
def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
connect = asyncio.open_connection(url.hostname, 443, ssl=True)
else:
connect = asyncio.open_connection(url.hostname, 80)
reader, writer = yield from connect
query = ('HEAD {path} HTTP/1.0\r\n'
'Host: {hostname}\r\n'
'\r\n').format(path=url.path or '/', hostname=url.hostname)
writer.write(query.encode('latin-1'))
while True:
line = yield from reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print('HTTP header> %s' % line)
# Ignore the body, close the socket
writer.close()
url = sys.argv[1]
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(print_http_headers(url))
loop.run_until_complete(task)
loop.close()
使い方:
python example.py http://example.com/path/page.html
または HTTPS を使用:
python example.py https://example.com/path/page.html
18.5.5.7.4. ストリームを使ってデータを待つオープンソケットの登録¶
open_connection()
関数を使ってソケットがデータを受信するまで待つコルーチンです:
import asyncio
try:
from socket import socketpair
except ImportError:
from asyncio.windows_utils import socketpair
@asyncio.coroutine
def wait_for_data(loop):
# Create a pair of connected sockets
rsock, wsock = socketpair()
# Register the open socket to wait for data
reader, writer = yield from asyncio.open_connection(sock=rsock, loop=loop)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = yield from reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
# Close the second socket
wsock.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(wait_for_data(loop))
loop.close()
参考
プロトコルを使ってデータを待つオープンソケットの登録 の例では AbstractEventLoop.create_connection()
メソッドによって作成された低レベルプロトコルを使用しています。
読み込みイベント用のファイル記述子の監視 の例では、ソケットのファイル記述子を登録するのに低水準の AbstractEventLoop.add_reader()
メソッドを使用しています。