イベントループ

まえがき

イベントループは全ての asyncio アプリケーションの中核をなす存在です。イベントループは非同期タスクやコールバックを実行し、ネットワーク I/O を処理し、サブプロセスを実行します。

アプリケーション開発者は通常 asyncio.run() のような高水準の ayncio 関数だけを利用し、ループオブジェクトを参照したり、ループオブジェクトのメソッドを呼び出したりすることはほとんどありません。この節は、イベントループの振る舞いに対して細かい調整が必要な、低水準のコード、ライブラリ、フレームワークの開発者向けです。

イベントループの取得

以下の低水準関数はイベントループの取得、設定、生成するために使います:

asyncio.get_running_loop()

現在の OS スレッドで実行中のイベントループを取得します。

実行中のイベントループがない場合は RuntimeError 例外を送出します。この関数はコルーチンまたはコールバックからのみ呼び出し可能です。

バージョン 3.7 で追加.

asyncio.get_event_loop()

現在のイベントループを取得します。

OS スレッドに現在のイベントループが未設定で、 OS スレッドがメインスレッドであり、かつ set_event_loop() がまだ呼び出されていない場合、 asyncio は新しいイベントループを生成し、それを現在のイベントループに設定します。

この関数の振る舞いは (特にイベントループポリシーをカスタマイズした場合) 複雑なため、コルーチンやコールバックでは get_event_loop() よりも get_running_loop() を使うほうが好ましいと考えられます。

また、低水準の関数を使って手作業でイベントループの管理をするかわりに、 asyncio.run() を使うことを検討してください。

asyncio.set_event_loop(loop)

loop を OS スレッドの現在のイベントループに設定します。

asyncio.new_event_loop()

Create a new event loop object.

get_event_loop(), set_event_loop(), および new_event_loop() 関数の振る舞いは、 カスタムイベントループポリシーを設定する ことにより変更することができます。

内容

このページは以下の節から構成されます:

イベントループのメソッド

イベントループは以下の 低水準な API を持っています:

ループの開始と停止

loop.run_until_complete(future)

フューチャー (Future インスタンス) が完了するまで実行します。

引数が コルーチンオブジェクト の場合、暗黙のうちに asyncio.Task として実行されるようにスケジュールされます。

Future の結果を返すか、例外を送出します。

loop.run_forever()

stop() が呼び出されるまでイベントループを実行します。

run_forever() メソッドが呼ばれるより前に stop() メソッドが呼ばれた場合、イベントループはタイムアウトをゼロにして一度だけ I/O セレクタの問い合わせ処理を行い、 I/O イベントに対してスケジュールされた全てのコールバック (および既にスケジュール済みのコールバック) を実行したのち、終了します。

run_forever() メソッドを実行中に stop() メソッドが呼び出された場合、イベントループは現在処理されているすべてのコールバックを実行してから終了します。 この場合、コールバックにより新たにスケジュールされるコールバックは実行されないことに注意してください; これら新たにスケジュールされたコールバックは、次に run_forever() または run_until_complete() が呼び出されたときに実行されます。

loop.stop()

イベントループを停止します。

loop.is_running()

イベントループが現在実行中の場合 True を返します。

loop.is_closed()

イベントループが閉じられていた場合 True を返します。

loop.close()

イベントループをクローズします。

この関数が呼び出される時点で、イベントループが実行中であってはいけません。保留中のコールバックはすべて破棄されます。

このメソッドは全てのキューをクリアし、エグゼキューターが実行完了するのを待たずにシャットダウンします。

このメソッドはべき等 (何回実行しても結果は同じ) であり取り消せません。イベントループがクローズされた後、他のいかなるメソッドも呼び出すべきではありません。

coroutine loop.shutdown_asyncgens()

現在オープンになっているすべての asynchronous generator (非同期ジェネレータ) オブジェクトをスケジュールし、 aclose() メソッドを呼び出すことでそれらをクローズします。 このメソッドの呼び出し後に新しい非同期ジェネレータがイテレートされると、イベントループは警告を発します。このメソッドはスケジュールされたすべての非同期ジェネレータの終了処理を確実に行うために使用すべきです。

asyncio.run() を使った場合はこの関数を呼び出す必要はありません。

以下はプログラム例です:

try:
    loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()

バージョン 3.6 で追加.

コールバックのスケジューリング

loop.call_soon(callback, *args, context=None)

Schedule a callback to be called with args arguments at the next iteration of the event loop.

コールバックは登録された順に呼び出されます。各コールバックは厳密に1回だけ呼び出されます。

オプションのキーワード引数 context を使って、コールバック*callback* を実行する際のコンテキスト contextvars.Context を設定することができます。コンテキスト context が指定されない場合は現在のコンテキストが使われます。

asyncio.Handle のインスタンスを返します。このインスタンスを使ってスケジュールしたコールバックをキャンセルすることができます。

このメソッドはスレッドセーフではありません。

loop.call_soon_threadsafe(callback, *args, context=None)

call_soon() のスレッドセーフ版です。必ず 別のスレッドから コールバックをスケジュールする際に使ってください。

このドキュメントの asyncio-multithreading 節を参照してください。

バージョン 3.7 で変更: キーワード引数 context が追加されました。詳細は PEP 567 を参照してください。

注釈

ほとんどの asyncio モジュールのスケジューリング関数は、キーワード引数をコールバックに渡すことを許していません。キーワード引数を渡すためには functools.partial() を使ってください:

# will schedule "print("Hello", flush=True)"
loop.call_soon(
    functools.partial(print, "Hello", flush=True))

asyncio は partial オブジェクトのデバッグメッセージやエラーメッセージをよりよく可視化することができるため、通常はラムダ式よりも partial オブジェクトを使う方が便利です。

遅延コールバックのスケジューリング

イベントループは、コールバック関数を未来のある時点で呼び出されるようにスケジュールする仕組みを提供します。イベントループは時刻が戻らない単調な時計 (monotonic clock) を使って時刻を追跡します。

loop.call_later(delay, callback, *args, context=None)

delay 秒経過後にコールバック関数 callback を呼び出すようにスケジュールします。 delay には整数または浮動小数点数を指定します。

asyncio.TimerHandle のインスタンスを返します。このインスタンスを使ってスケジュールしたコールバックをキャンセルすることができます。

callback は厳密に一度だけ呼び出されます。2つのコールバックが完全に同じ時間にスケジュールされた場合、呼び出しの順序は未定義です。

オプションの位置引数 args はコールバックが呼び出されるときに位置引数として渡されます。キーワード引数を指定してコールバックを呼び出したい場合は functools.partial() を使用してください。

オプションのキーワード引数 context を使って、コールバック*callback* を実行する際のコンテキスト contextvars.Context を設定することができます。コンテキスト context が指定されない場合は現在のコンテキストが使われます。

バージョン 3.7 で変更: キーワード引数 context が追加されました。詳細は PEP 567 を参照してください。

バージョン 3.7.1 で変更: In Python 3.7.0 and earlier with the default event loop implementation, the delay could not exceed one day. This has been fixed in Python 3.7.1.

loop.call_at(when, callback, *args, context=None)

絶対値の時刻 when (整数または浮動小数点数) にコールバックを呼び出すようにスケジュールします。 loop.time() と同じ参照時刻を使用します。

このメソッドの振る舞いは call_later() と同じです。

asyncio.TimerHandle のインスタンスを返します。このインスタンスを使ってスケジュールしたコールバックをキャンセルすることができます。

バージョン 3.7 で変更: キーワード引数 context が追加されました。詳細は PEP 567 を参照してください。

バージョン 3.7.1 で変更: In Python 3.7.0 and earlier with the default event loop implementation, the difference between when and the current time could not exceed one day. This has been fixed in Python 3.7.1.

loop.time()

現在の時刻を float 値で返します。時刻はイベントループが内部で参照している時刻が戻らない単調な時計 (monotonic clock) に従います。

注釈

バージョン 3.8 で変更: Python 3.7 またはそれ以前のバージョンでは、タイムアウト (相対値 delay もしくは絶対値 when) は1日を超えることができませんでした。この問題は Python 3.8 で修正されました。

参考

関数 asyncio.sleep()

フューチャーとタスクの生成

loop.create_future()

イベントループに接続した asyncio.Future オブジェクトを生成します。

asyncio でフューチャーオブジェクトを作成するために推奨される方法です。このメソッドにより、サードパーティ製のイベントループがFutures クラスの(パフォーマンスや計測方法が優れた) 代替実装を提供することを可能にします。

バージョン 3.5.2 で追加.

loop.create_task(coro)

コルーチン の実行をスケジュールします。 Task オブジェクトを返します。

サードパーティのイベントループは相互運用のための自身の Task のサブクラスを使用できます。この場合、結果は Task のサブクラスになります。

loop.set_task_factory(factory)

loop.create_task() が使用するタスクファクトリーを設定します。

factoryNone の場合、デフォルトのタスクファクトリーが設定されます。そうでなければ、 factory(loop, coro) に一致する関数シグネチャを持った 呼び出し可能オブジェクト でなければなりません。ここで loop はアクティブなイベントループへの参照であり、 coro はコルーチンオブジェクトです。呼び出し可能オブジェクトは asyncio.Future と互換性のあるオブジェクトを返さなければなりません。

loop.get_task_factory()

タスクファクトリを返します。デフォルトのタスクファクトリを使用中の場合は None を返します。

ネットワーク接続の確立

coroutine loop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)

hostport で指定されたアドレスとのストリーミングトランスポート接続をオープンします。

ソケットファミリーは host (または family 引数が与えられた場合は family) に依存し、 AF_INETAF_INET6 のいずれかを指定します。

ソケットタイプは SOCK_STREAM になります。

protocol_factoryasyncio プロトコル の実装を返す呼び出し可能オブジェクトでなければなりません。

このメソッドはバックグラウンドで接続の確立を試みます。成功した場合、メソッドは (transport, protocol) のペアを返します。

時系列での下層処理の概要は以下のとおりです:

  1. 接続を確立し、その接続に対する トランスポート が生成されます。

  2. protocol_factory が引数なしで呼び出され、ファクトリが プロトコル インスタンスを返すよう要求します。

  3. プロトコルインスタンスが connection_made() メソッドを呼び出すことにより、トランスポートと紐付けられます。

  4. 成功すると (transport, protocol) タプルが返されます。

作成されたトランスポートは実装依存の双方向ストリームです。

その他の引数:

  • ssl: 偽値以外が与えられた場合、SSL/TLS トランスポートが作成されます (デフォルトでは暗号化なしの TCP トランスポートが作成されます)。 sslssl.SSLContext オブジェクトの場合、このコンテキストがトランスポートを作成するために使用されます; sslTrue の場合、 ssl.create_default_context() が返すデフォルトのコンテキストが使われます。

  • server_hostname は対象サーバーの証明書との一致を確認するためのホスト名を設定または上書きします。この引数は sslNone でない場合のみ設定すべきです。デフォルトでは host に指定したサーバー名が使用されます。 host が空の文字列の場合のデフォルト値は設定されていません。その場合、 server_hostname を必ず指定してください。 server_hostname も空の文字列の場合は、ホスト名の一致確認は行われません (これは深刻なセキュリティリスクであり、中間者攻撃を受ける可能性があります)。

  • family, proto, flags は任意のアドレスファミリであり、host 解決のための getaddrinfo() 経由で渡されるプロトコルおよびフラグになります。このオプションが与えられた場合、これらはすべて socket モジュール定数に従った整数でなければなりません。

  • sock を与える場合、トランスポートに使用される、既存の、すでに接続済の socket.socket オブジェクトを指定します。sock を指定する場合、hostportfamilyprotoflags および local_addr を指定してはなりません。

  • local_addr, if given, is a (local_host, local_port) tuple used to bind the socket to locally. The local_host and local_port are looked up using getaddrinfo(), similarly to host and port.

  • ssl_handshake_timeout は TLS ハンドシェイクが完了するまでの (TLS 接続のための) 待ち時間を秒単位で指定します。指定した待ち時間を超えると接続は中断します。 None が与えられた場合はデフォルト値 60.0 が使われます。

バージョン 3.7 で追加: The ssl_handshake_timeout parameter.

バージョン 3.6 で変更: 全ての TCP 接続に対してデフォルトでソケットオプション TCP_NODELAY が設定されるようになりました。

バージョン 3.5 で変更: ProactorEventLoop において SSL/TLS のサポートが追加されました。

参考

open_connection() 関数は高水準の代替 API です。この関数は(StreamReader, StreamWriter) のペアを返し、 async/await コードから直接使うことができます。

coroutine loop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_address=None, reuse_port=None, allow_broadcast=None, sock=None)

注釈

SO_REUSEADDR の利用が UDP に対して重大なセキュリティ上の懸念をもたらすため、 reuse_address パラメータはサポートされなくなりました。明示的に reuse_address=True を設定すると例外を送出します。

When multiple processes with differing UIDs assign sockets to an indentical UDP socket address with SO_REUSEADDR, incoming packets can become randomly distributed among the sockets.

サポートされているプラットフォームでは、 reuse_port が同様の機能に対する代用品として利用できます。 reuse_port は代替機能として SO_REUSEPORT を使っており、複数のプロセスが異なる UID で同一のソケットに対して割り当てられるのを明確に禁止します。

データグラム接続 (UDP) を生成します。

ソケットファミリーは host (または family 引数が与えられた場合は family) に依存し、 AF_INETAF_INET6AF_UNIX のいずれかを指定します。

ソケットタイプは SOCK_DGRAM になります。

protocol_factoryasyncio プロトコル の実装を返す呼び出し可能オブジェクトでなければなりません。

成功すると (transport, protocol) タプルが返されます。

その他の引数:

  • local_addr が指定される場合、(local_host, local_port) のタプルで、ソケットをローカルで束縛するために使用されます。local_hostlocal_portgetaddrinfo() を使用して検索されます。

  • remote_addr が指定される場合、(remote_host, remote_por) のタプルで、ソケットをリモートアドレスに束縛するために使用されます。remote_hostremote_portgetaddrinfo() を使用して検索されます。

  • family, proto, flags は任意のアドレスファミリです。これらのファミリ、プロトコル、フラグは、host 解決のため getaddrinfo() 経由でオプションで渡されます。これらのオプションを指定する場合、すべて socket モジュール定数に従った整数でなければなりません。

  • reuse_port は、同じポートにバインドされた既存の端点すべてがこのフラグを設定して生成されている場合に限り、この端点を既存の端点と同じポートにバインドすることを許可するよう、カーネルに指示します(訳註: ソケットのオプション SO_REUSEPORT を使用します)。このオプションは、Windows やいくつかの UNIX システムではサポートされていません。SO_REUSEPORT 定数が定義されていなければ、この機能はサポートされません。

  • allow_broadcast は、カーネルに、このエンドポイントがブロードキャストアドレスにメッセージを送信することを許可するように指示します。

  • オプションの sock を指定することで、既存の、すでに接続されている socket.socket をトランスポートで使用することができます。このオプションを使用する場合、local_addrremote_addr は省略してください (None でなければなりません)。

On Windows, with ProactorEventLoop, this method is not supported.

UDP echo クライアントプロトコル および UDP echo サーバープロトコル の例を参照してください。

バージョン 3.4.4 で変更: family, proto, flags, reuse_address, reuse_port, *allow_broadcast, sock パラメータが追加されました。

バージョン 3.7.6 で変更: セキュリティ上の懸念により、 reuse_address パラメータはサポートされなくなりました。

coroutine loop.create_unix_connection(protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)

Unix 接続を生成します。

ソケットファミリーは AF_UNIX になります; また、ソケットタイプは SOCK_STREAM になります。

成功すると (transport, protocol) タプルが返されます。

path は Unix ドメインソケット名で、 sock パラメータが指定されない場合は必須です。 抽象 Unix ソケット、 strbytes、 and Path 形式でのパスがサポートされています。

このメソッドの引数についての詳細は loop.create_connection() メソッドのドキュメントを参照してください。

利用可能な環境: Unix。

バージョン 3.7 で追加: The ssl_handshake_timeout parameter.

バージョン 3.7 で変更: The path parameter can now be a path-like object.

ネットワークサーバの生成

coroutine loop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

アドレス host のポート port をリッスンする (ソケットタイプが SOCK_STREAM である) TCP サーバーを生成します。

Server オブジェクトを返します。

引数:

  • protocol_factoryasyncio プロトコル の実装を返す呼び出し可能オブジェクトでなければなりません。

  • host パラメータはいくつかの方法で指定することができ、その値によってサーバーがどこをリッスンするかが決まります。

    • host が文字列の場合、 TCP サーバーは host で指定した単一のネットワークインターフェースに束縛されます。

    • host が文字列のシーケンスである場合、 TCP サーバーはそのシーケンスで指定された全てのネットワークインターフェースに束縛されます。

    • host が空の文字列か None の場合、すべてのインターフェースが想定され、複合的なソケットのリスト (通常は一つが IPv4、もう一つが IPv6) が返されます。

  • familysocket.AF_INET または AF_INET6 を指定することにより、ソケットでそれぞれ IPv4 または IPv6 の使用を強制することができます。設定されない場合、 family はホスト名から決定されます (socket.AF_UNSPEC がデフォルトになります)。

  • flagsgetaddrinfo() のためのビットマスクになります。

  • サーバーで既存のソケットオブジェクトを使用するために、オプションの引数 sock にソケットオブジェクトを設定することができます。指定した場合、 hostport を指定してはいけません。

  • backloglisten() に渡される、キューに入るコネクションの最大数になります (デフォルトは 100)。

  • 確立した接続の上で TLS を有効化するために、 sslSSLContext のインスタンスを指定することができます。

  • reuse_address は、TIME_WAIT 状態にあるローカルソケットを、その状態が自然にタイムアウトするのを待つことなく再利用するようカーネルに指示します(訳註: ソケットのオプション SO_REUSEADDR を使用します)。指定しない場合、UNIX では自動的に True が設定されます。

  • reuse_port は、同じポートにバインドされた既存の端点すべてがこのフラグを設定して生成されている場合に限り、この端点を既存の端点と同じポートにバインドすることを許可するよう、カーネルに指示します(訳註: ソケットのオプション SO_REUSEPORT を使用します)。このオプションは、Windows ではサポートされていません。

  • ssl_handshake_timeout は TLS ハンドシェイクが完了するまでの (TLS サーバーのための) 待ち時間を秒単位で指定します。指定した待ち時間を超えると接続は中断します。 None が与えられた場合はデフォルト値 60.0 が使われます。

  • start_servingTrue に設定された場合 (これがデフォルトです)、 生成されたサーバーは即座に接続の受け付けを開始します。 False が指定された場合、ユーザーは接続の受け付けを開始するために Server.start_serving() または Server.serve_forever() を待ち受け (await) る必要があります。

バージョン 3.7 で追加: Added ssl_handshake_timeout and start_serving parameters.

バージョン 3.6 で変更: 全ての TCP 接続に対してデフォルトでソケットオプション TCP_NODELAY が設定されるようになりました。

バージョン 3.5 で変更: ProactorEventLoop において SSL/TLS のサポートが追加されました。

バージョン 3.5.1 で変更: host パラメータに文字列のシーケンスを指定できるようになりました。

参考

start_server() 関数は高水準の代替 API です。この関数は StreamReaderStreamWriter のペアを返し、async/await コードから使うことができます。

coroutine loop.create_unix_server(protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

loop.create_server() と似ていますが、 AF_UNIX ソケットファミリーとともに動作します。

path は Unix ドメインソケット名で、 sock パラメータが指定されない場合は必須です。 抽象 Unix ソケット、 strbytes、 and Path 形式でのパスがサポートされています。

このメソッドの引数についての詳細は loop.create_server() メソッドのドキュメントを参照してください。

利用可能な環境: Unix。

バージョン 3.7 で追加: The ssl_handshake_timeout and start_serving parameters.

バージョン 3.7 で変更: The path parameter can now be a Path object.

coroutine loop.connect_accepted_socket(protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None)

すでに確立した接続を transport と protocol のペアでラップします。

このメソッドは asyncio の範囲外で確立された接続を使うサーバーに対しても使えますが、その場合でも接続は asyncio を使って処理されます。

引数:

  • protocol_factoryasyncio プロトコル の実装を返す呼び出し可能オブジェクトでなければなりません。

  • socksocket.accept メソッドが返す既存のソケットオブジェクトです。

  • ssl には SSLContext を指定できます。指定すると、受け付けたコネクション上での SSL を有効にします。

  • ssl_handshake_timeout は SSL ハンドシェイクが完了するまでの (SSL 接続のための) 待ち時間を秒単位で指定します。 None が与えられた場合はデフォルト値 60.0 が使われます。

(transport, protocol) のペアを返します。

バージョン 3.7 で追加: The ssl_handshake_timeout parameter.

バージョン 3.5.3 で追加.

ファイルの転送

coroutine loop.sendfile(transport, file, offset=0, count=None, *, fallback=True)

transport を通じて file を送信します。送信したデータの総バイト数を返します。

このメソッドは、もし利用可能であれば高性能な os.sendfile() を利用します。

file はバイナリモードでオープンされた通常のファイルオブジェクトでなければなりません。

offset はファイルの読み込み開始位置を指定します。 count が指定された場合、ファイルの EOF までファイルを送信する代わりに、 count で指定された総バイト数の分だけ送信します。ファイルオブジェクトが指し示す位置は、メソッドがエラーを送出した場合でも更新されます。この場合実際に送信されたバイト数は file.tell() メソッドで取得することができます。

fallbackTrue に指定することで、 asyncio がプラットフォームが sendfile システムコールをサポートしていない場合 (たとえば Windows や Unix の SSL ソケットなど) に別の方法でファイルの読み込みと送信を行うようにすることができます。

システムが sendfile システムコールをサポートしておらず、かつ fallbackFalse の場合、 SendfileNotAvailableError 例外を送出します。

バージョン 3.7 で追加.

TLS へのアップグレード

coroutine loop.start_tls(transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None)

既存のトランスポートベースの接続を TLS にアップグレードします。

新しいトランスポートのインスタンスを返します。 protocol は必ず await 直後に利用を開始しなければなりません。 start_tls メソッドに渡した transport は、このメソッドの呼び出し以後決して使ってはいけません。

引数:

  • transportprotocol には、 create_server()create_connection() が返すものと同等のインスタンスを指定します。

  • sslcontext: 構成済みの SSLContext インスタンスです。

  • (create_server() で生成されたような) サーバーサイドの接続をアップグレードする場合は server_sideTrue を渡します。

  • server_hostname: 対象のサーバーの証明書との照合に使われるホスト名を設定または上書きします。

  • ssl_handshake_timeout は TLS ハンドシェイクが完了するまでの (TLS 接続のための) 待ち時間を秒単位で指定します。指定した待ち時間を超えると接続は中断します。 None が与えられた場合はデフォルト値 60.0 が使われます。

バージョン 3.7 で追加.

ファイル記述子の監視

loop.add_reader(fd, callback, *args)

ファイル記述子 fd に対する読み込みが可能かどうかの監視を開始し、 fd が読み込み可能になると、指定した引数でコールバック callback を呼び出します。

loop.remove_reader(fd)

ファイル記述子 fd に対する読み込みが可能かどうかの監視を停止します。

loop.add_writer(fd, callback, *args)

ファイル記述子 fd に対する書き込みが可能かどうかの監視を開始し、 fd が書き込み可能になると、指定した引数でコールバック callback を呼び出します。

コールバック callbackキーワード引数を渡す 場合は functools.partial() を使ってください。

loop.remove_writer(fd)

ファイル記述子 fd に対する書き込みが可能かどうかの監視を停止します。

これらのメソッドに対する制限事項については プラットフォームのサポート状況 節も参照してください。

ソケットオブジェクトと直接やりとりする

一般に、 loop.create_connection()loop.create_server() のようなトランスポートベースの API を使ったプロトコルの実装はソケットと直接やり取りする実装に比べて高速です。しかしながら、パフォーマンスが重要でなく、直接 socket オブジェクトとやりとりした方が便利なユースケースがいくつかあります。

coroutine loop.sock_recv(sock, nbytes)

nbytes で指定したバイト数までのデータをソケット sock から受信します。 このメソッドは socket.recv() の非同期版です。

受信したデータをバイトオブジェクトとして返します。

sock はノンブロッキングソケットでなければなりません。

バージョン 3.7 で変更: このメソッドは常にコルーチンメソッドとしてドキュメントに記載されてきましたが、 Python 3.7 以前のリリースでは Future オブジェクトを返していました。 Python 3.7 からは async def メソッドになりました。

coroutine loop.sock_recv_into(sock, buf)

ソケット sock からデータを受信してバッファ buf に格納します。ブロッキングコードの socket.recv_into() メソッドをモデルとしています。

バッファに書き込んだデータのバイト数を返します。

sock はノンブロッキングソケットでなければなりません。

バージョン 3.7 で追加.

coroutine loop.sock_sendall(sock, data)

データ data をソケット sock に送信します。 socket.sendall() メソッドの非同期版です。

このメソッドは data をすべて送信し終えるか、またはエラーが起きるまでデータをソケットに送信し続けます。送信に成功した場合 None を返します。エラーの場合は例外が送出されます。エラーとなった場合、接続の受信側で正しく処理されたデータの総量を特定する方法はありません。

sock はノンブロッキングソケットでなければなりません。

バージョン 3.7 で変更: このメソッドは常にコルーチンメソッドとしてドキュメントに記載されてきましたが、 Python 3.7 以前のリリースでは Future オブジェクトを返していました。 Python 3.7 からは async def メソッドになりました。

coroutine loop.sock_connect(sock, address)

ソケット sock をアドレス address のリモートソケットに接続します。

socket.connect() の非同期版です。

sock はノンブロッキングソケットでなければなりません。

バージョン 3.5.2 で変更: address を名前解決する必要はなくなりました。 sock_connectsocket.inet_pton() を呼び出して address が解決済みかどうかを確認します。未解決の場合、 address の名前解決には loop.getaddrinfo() メソッドが使われます。

coroutine loop.sock_accept(sock)

接続を受け付けます。ブロッキングコールの socket.accept() メソッドをモデルとしています。

ソケットはアドレスに束縛済みで、接続を listen 中である必要があります。戻り値は (conn, address) のペアで、conn は接続を通じてデータの送受信を行うための 新しい ソケットオブジェクト、address は接続先の端点でソケットに束縛されているアドレスを示します。

sock はノンブロッキングソケットでなければなりません。

バージョン 3.7 で変更: このメソッドは常にコルーチンメソッドとしてドキュメントに記載されてきましたが、 Python 3.7 以前のリリースでは Future オブジェクトを返していました。 Python 3.7 からは async def メソッドになりました。

参考

loop.create_server() および start_server()

coroutine loop.sock_sendfile(sock, file, offset=0, count=None, *, fallback=True)

ファイルを送信します。利用可能なら高性能な os.sendfile を使います。送信したデータの総バイト数を返します。

socket.sendfile() メソッドの非同期版です。

socksocket.SOCK_STREAM タイプのノンブロッキングな socket でなければなりません。

file はバイナリモードでオープンされた通常のファイルオブジェクトでなければなりません。

offset はファイルの読み込み開始位置を指定します。 count が指定された場合、ファイルの EOF までファイルを送信する代わりに、 count で指定された総バイト数の分だけ送信します。ファイルオブジェクトが指し示す位置は、メソッドがエラーを送出した場合でも更新されます。この場合実際に送信されたバイト数は file.tell() メソッドで取得することができます。

fallbackTrue に設定された場合、 プラットフォームが sendfile システムコールをサポートしていない場合 (たとえば Windows や Unix の SSL ソケットなど) に asyncio が別の方法でファイルの読み込みと送信を行うようにすることができます。

システムが sendfile システムコールをサポートしておらず、かつ fallbackFalse の場合、 SendfileNotAvailableError 例外を送出します。

sock はノンブロッキングソケットでなければなりません。

バージョン 3.7 で追加.

DNS

coroutine loop.getaddrinfo(host, port, *, family=0, type=0, proto=0, flags=0)

socket.getaddrinfo() の非同期版です。

coroutine loop.getnameinfo(sockaddr, flags=0)

socket.getnameinfo() の非同期版です。

バージョン 3.7 で変更: getaddrinfogetnameinfo の2つのメソッドは、いずれも常にコルーチンメソッドとしてドキュメントに記載されてきましたが、 Python 3.7 以前のリリースでは、実際には asyncio.Future オブジェクトを返していました。 Python 3.7 からはどちらのメソッドもコルーチンになりました。

パイプとやりとりする

coroutine loop.connect_read_pipe(protocol_factory, pipe)

イベントループの読み込み側終端に pipe を登録します。

protocol_factoryasyncio プロトコル の実装を返す呼び出し可能オブジェクトでなければなりません。

pipe には file-like オブジェクト を指定します。

(transport, protocol) のペアを返します。ここで transportReadTransport のインターフェースをサポートし、 protocolprotocol_factory ファクトリでインスタンス化されたオブジェクトです。

SelectorEventLoop イベントループの場合、pipe は非ブロックモードに設定されていなければなりません。

coroutine loop.connect_write_pipe(protocol_factory, pipe)

pipe の書き込み側終端をイベントループに登録します。

protocol_factoryasyncio プロトコル の実装を返す呼び出し可能オブジェクトでなければなりません。

pipefile-like オブジェクト です。

(transport, protocol) のペアを返します。ここで transportWriteTransport のインスタンスであり、 protocolprotocol_factory ファクトリでインスタンス化されたオブジェクトです。

SelectorEventLoop イベントループの場合、pipe は非ブロックモードに設定されていなければなりません。

注釈

SelectorEventLoop は Windows 上で上記のメソッドをサポートしていません。 Windowsでは代わりに ProactorEventLoop を使ってください。

参考

loop.subprocess_exec() および loop.subprocess_shell() メソッド。

Unix シグナル

loop.add_signal_handler(signum, callback, *args)

コールバック callback をシグナル signum に対するハンドラに設定します。

コールバックは loop、登録された他のコールバック、およびイベントループの実行可能なコルーチンから呼び出されます。 signal.signal() を使って登録されたシグナルハンドラと異なり、この関数で登録されたコールバックはイベントループと相互作用することが可能です。

シグナルナンバーが誤っているか捕捉不可能な場合 ValueError が送出されます。ハンドラーの設定に問題があった場合 RuntimeError が送出されます。

コールバック callbackキーワード引数を渡す 場合は functools.partial() を使ってください。

signal.signal() と同じく、この関数はメインスレッドから呼び出されなければなりません。

loop.remove_signal_handler(sig)

シグナル sig に対するハンドラを削除します。

シグナルハンドラが削除された場合 True を返します。シグナルに対してハンドラが設定されていない場合には False を返します。

利用可能な環境: Unix。

参考

signal モジュール。

スレッドまたはプロセスプールでコードを実行する

awaitable loop.run_in_executor(executor, func, *args)

指定したエグゼキュータで関数 func が実行されるように準備します。

引数 executorconcurrent.futures.Executor のインスタンスでなければなりません。 executorNone の場合はデフォルトのエグゼキュータが使われます。

以下はプログラム例です:

import asyncio
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

asyncio.run(main())

このメソッドは asyncio.Future オブジェクトを返します。

関数 funcキーワード引数を渡す 場合は functools.partial() を使ってください。

バージョン 3.5.3 で変更: loop.run_in_executor() は内部で生成するスレッドプールエグゼキュータの max_workers を設定せず、代わりにスレッドプールエグゼキュータ (ThreadPoolExecutor) にデフォルト値を設定させるようになりました。

loop.set_default_executor(executor)

executorrun_in_executor() が使うデフォルトのエグゼキュータに設定します。 executorThreadPoolExecutor のインスタンスでなければなりません。

バージョン 3.7 で非推奨: ThreadPoolExecutor のインスタンスでないエグゼキュータの使用は非推奨となり、 Python 3.9 ではエラーになります。

executorconcurrent.futures.ThreadPoolExecutor のインスタンスでなければなりません。

エラーハンドリング API

イベントループ内での例外の扱い方をカスタマイズできます。

loop.set_exception_handler(handler)

handler を新しいイベントループ例外ハンドラーとして設定します。

handlerNone の場合、デフォルトの例外ハンドラが設定されます。そうでなければ、 handler(loop, context) に一致する関数シグネチャを持った呼び出し可能オブジェクトでなければなりません。ここで loop はアクティブなイベントループへの参照であり、 context は例外の詳細な記述からなる dict オブジェクトです (context についての詳細は call_exception_handler() メソッドのドキュメントを参照してください)。

loop.get_exception_handler()

現在の例外ハンドラを返します。カスタム例外ハンドラが設定されていない場合は None を返します。

バージョン 3.5.2 で追加.

loop.default_exception_handler(context)

デフォルトの例外ハンドラーです。

デフォルト例外ハンドラは、例外ハンドラが未設定の場合、例外が発生した時に呼び出されます。デフォルト例外ハンドラの挙動を受け入れるために、カスタム例外ハンドラから呼び出すことも可能です。

引数 context の意味は call_exception_handler() と同じです。

loop.call_exception_handler(context)

現在のイベントループ例外ハンドラーを呼び出します。

context は以下のキーを含む dict オブジェクトです (将来の Python バージョンで新しいキーが追加される可能性があります):

注釈

このメソッドはイベントループの派生クラスでオーバーロードされてはいけません。カスタム例外ハンドラの設定には set_exception_handler() メソッドを使ってください。

デバッグモードの有効化

loop.get_debug()

イベントループのデバッグモード (bool) を取得します。

環境変数 PYTHONASYNCIODEBUG に空でない文字列が設定されている場合のデフォルト値は True、そうでない場合は False になります。

loop.set_debug(enabled: bool)

イベントループのデバッグモードを設定します。

バージョン 3.7 で変更: The new -X dev command line option can now also be used to enable the debug mode.

サブプロセスの実行

この節で解説しているのは低水準のメソッドです。通常の async/await コードでは、高水準の関数である asyncio.create_subprocess_shell()asyncio.create_subprocess_exec() を代わりに使うことを検討してください。

注釈

The default asyncio event loop on Windows does not support subprocesses. See Subprocess Support on Windows for details.

coroutine loop.subprocess_exec(protocol_factory, *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)

args で指定されたひとつの、または複数の文字列引数からサブプロセスを生成します。

args は下記のいずれかに当てはまる文字列のリストでなければなりません:

引数の最初の文字列はプログラムの実行ファイルを指定します。それに続く残りの文字列は引数を指定し、そのプログラムに対する argv を構成します。

このメソッドは標準ライブラリの subprocess.Popen クラスを、 shell=False かつ最初の引数に文字列のリストを渡して呼び出した場合に似ています。しかし、 Popen クラスは文字列のリストを引数としてひとつだけ取るのに対して、 subprocess_exec は複数の文字列引数をとることができます。

protocol_factoryasyncio.SubprocessProtocol クラスの派生クラスを返す呼び出し可能オブジェクトでなければなりません。

その他の引数:

  • stdin: either a file-like object representing a pipe to be connected to the subprocess's standard input stream using connect_write_pipe(), or the subprocess.PIPE constant (default). By default a new pipe will be created and connected.

  • stdout: either a file-like object representing the pipe to be connected to the subprocess's standard output stream using connect_read_pipe(), or the subprocess.PIPE constant (default). By default a new pipe will be created and connected.

  • stderr: either a file-like object representing the pipe to be connected to the subprocess's standard error stream using connect_read_pipe(), or one of subprocess.PIPE (default) or subprocess.STDOUT constants.

    By default a new pipe will be created and connected. When subprocess.STDOUT is specified, the subprocess' standard error stream will be connected to the same pipe as the standard output stream.

  • その他のキーワード引数は、指定してはならない bufsizeuniversal_newlines および shell を除き、すべて解釈されずに subprocess.Popen に渡されます。

他の引数についての詳細は subprocess.Popen クラスのコンストラクタを参照してください。

(transport, protocol) のペアを返します。ここで transportasyncio.SubprocessTransport 基底クラスに適合するオブジェクトで、 protocolprotocol_factory によりインスタンス化されたオブジェクトです。

coroutine loop.subprocess_shell(protocol_factory, cmd, *, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)

コマンド cmd からプラットフォームの "シェル" シンタックスを使ってサブプロセスを生成します。 cmdstr 文字列もしくは ファイルシステムのエンコーディング でエンコードされた bytes 文字列です。

これは標準ライブラリの subprocess.Popen クラスを shell=True で呼び出した場合と似ています。

protocol_factorySubprocessProtocol の派生クラスを返す呼び出し可能オブジェクトでなければなりません。

その他の引数についての詳細は subprocess_exec() メソッドを参照してください。

``(transport, protocol)``のペアを返します。ここで transportSubprocessTransport 基底クラスに適合するオブジェクトで、 protocolprotocol_factory によりインスタンス化されたオブジェクトです。

注釈

シェルインジェクション の脆弱性を回避するために全ての空白文字および特殊文字を適切にクオートすることは、アプリケーション側の責任で確実に行ってください。シェルコマンドを構成する文字列内の空白文字と特殊文字のエスケープは、 shlex.quote() 関数を使うと適切に行うことができます。

コールバックのハンドル

class asyncio.Handle

loop.call_soon()loop.call_soon_threadsafe() が返すコールバックのラッパーです。

cancel()

コールバックをキャンセルします。コールバックがキャンセル済みまたは実行済みの場合、このメソッドは何の影響もありません。

cancelled()

コールバックがキャンセルされた場合 True を返します。

バージョン 3.7 で追加.

class asyncio.TimerHandle

A callback wrapper object returned by loop.call_later(), and loop.call_at().

このクラスは Handle の派生クラスです。

when()

コールバックのスケジュール時刻を秒単位の float で返します。

戻り値の時刻は絶対値で、 loop.time() と同じ参照時刻を使って定義されています。

バージョン 3.7 で追加.

Serverオブジェクト

Server オブジェクトは loop.create_server()loop.create_unix_server()start_server() および start_unix_server() 関数により生成されます。

クラスを直接インスタンス化しないでください。

class asyncio.Server

Server オブジェクトは非同期のコンテキストマネージャです。 async with 文の中で使われた場合、 async with 文が完了した時に Server オブジェクトがクローズされること、およびそれ以降に接続を受け付けないことが保証されます。

srv = await loop.create_server(...)

async with srv:
    # some code

# At this point, srv is closed and no longer accepts new connections.

バージョン 3.7 で変更: Python 3.7 から、 Server オブジェクトは非同期のコンテキストマネージャになりました。

close()

サーバーを停止します: 待機しているソケットをクローズし sockets 属性に None を設定します。

既存の受信中のクライアントとの接続を表すソケットはオープンのままです。

サーバーは非同期に停止されます。サーバーの停止を待ちたい場合は wait_closed() コルーチンを使用します。

get_loop()

サーバオブジェクトに付随するイベントループを返します。

バージョン 3.7 で追加.

coroutine start_serving()

接続の受け付けを開始します。

このメソッドはべき等です。すなわちサーバがすでにサービスを開始した後でも呼び出すことができます。

キーワード専用のパラメータ start_servingloop.create_server()asyncio.start_server() メソッドに対して使用することにより、初期に接続を受け付けない Server オブジェクトを生成することができます。この場合 Server.start_serving() または Server.serve_forever() メソッドを使ってオブジェクトが接続の受け付けを開始するようにすることができます。

バージョン 3.7 で追加.

coroutine serve_forever()

接続の受け入れを開始し、コルーチンがキャンセルされるまで継続します。 serve_forever タスクのキャンセルによりサーバーもクローズされます。

このメソッドはサーバーがすでに接続の受け入れを開始していても呼び出し可能です。ひとつの Server オブジェクトにつき serve_forever タスクはひとつだけ存在できます。

以下はプログラム例です:

async def client_connected(reader, writer):
    # Communicate with the client with
    # reader/writer streams.  For example:
    await reader.readline()

async def main(host, port):
    srv = await asyncio.start_server(
        client_connected, host, port)
    await srv.serve_forever()

asyncio.run(main('127.0.0.1', 0))

バージョン 3.7 で追加.

is_serving()

サーバーが新規に接続の受け入れを開始した場合 True を返します。

バージョン 3.7 で追加.

coroutine wait_closed()

close() メソッドが完了するまで待ちます。

sockets

List of socket.socket objects the server is listening on, or None if the server is closed.

バージョン 3.7 で変更: Python 3.7 より前のバージョンでは、 Server.sockets は内部に持っているサーバーソケットのリストを直接返していました。 Python 3.7 ではリストのコピーが返されるようになりました。

イベントループの実装

asyncio は2つの異なるイベントループの実装、 class:SelectorEventLoopProactorEventLoop、 を提供します:

By default asyncio is configured to use SelectorEventLoop on all platforms.

class asyncio.SelectorEventLoop

selectors に基づくイベントループです。

プラットフォーム上で利用可能な最も効率の良い selector を使います。特定のセレクタ実装を使うように手動で構成することも可能です:

import asyncio
import selectors

selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)

Availability: Unix, Windows。

class asyncio.ProactorEventLoop

"I/O 完了ポート" (IOCP) を使った Windows 向けのイベントループです。

利用可能な環境: Windows 。

An example how to use ProactorEventLoop on Windows:

import asyncio
import sys

if sys.platform == 'win32':
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
class asyncio.AbstractEventLoop

asyncio に適合するイベントループの抽象基底クラスです。

イベントループのメソッド 節は、 AbstractEventLoop の代替実装が定義すべき全てのメソッドを列挙しています。

使用例

この節の全ての使用例は 意図的に loop.run_forever()loop.call_soon() のような 低水準のイベントループ API の使用法を示しています。一方で現代的な asyncio アプリケーションはここに示すような方法をほとんど必要としません。 asyncio.run() のような高水準の関数の使用を検討してください。

call_soon() を使った Hello World

loop.call_soon() メソッドを使ってコールバックをスケジュールする例です。コールバックは "Hello World" を出力しイベントループを停止します:

import asyncio

def hello_world(loop):
    """A callback to print 'Hello World' and stop the event loop"""
    print('Hello World')
    loop.stop()

loop = asyncio.get_event_loop()

# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)

# Blocking call interrupted by loop.stop()
try:
    loop.run_forever()
finally:
    loop.close()

参考

コルーチンと run() 関数を使用した同じような Hello World の例。

call_later() で現在の日時を表示する

毎秒現在時刻を表示するコールバックの例です。コールバックは loop.call_later() メソッドを使って自身を5秒後に実行するよう再スケジュールし、イベントループを停止します:

import asyncio
import datetime

def display_date(end_time, loop):
    print(datetime.datetime.now())
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, display_date, end_time, loop)
    else:
        loop.stop()

loop = asyncio.get_event_loop()

# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()
try:
    loop.run_forever()
finally:
    loop.close()

参考

コルーチンと run() 関数を使用した同じような 現在時刻出力 の例。

読み込みイベント用ファイル記述子の監視

ファイル記述子が loop.add_reader() メソッドを使って何らかのデータを受信するまで待機し、その後イベントループをクローズします:

import asyncio
from socket import socketpair

# Create a pair of connected file descriptors
rsock, wsock = socketpair()

loop = asyncio.get_event_loop()

def reader():
    data = rsock.recv(100)
    print("Received:", data.decode())

    # We are done: unregister the file descriptor
    loop.remove_reader(rsock)

    # Stop the event loop
    loop.stop()

# Register the file descriptor for read event
loop.add_reader(rsock, reader)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

try:
    # Run the event loop
    loop.run_forever()
finally:
    # We are done. Close sockets and the event loop.
    rsock.close()
    wsock.close()
    loop.close()

参考

SIGINT および SIGTERM 用のシグナルハンドラーの設定

(ここに挙げる signals の例は Unix でのみ動きます。)

loop.add_signal_handler() メソッドを使用して SIGINTSIGTERM の2つのシグナルに対するハンドラを登録します:

import asyncio
import functools
import os
import signal

def ask_exit(signame, loop):
    print("got signal %s: exit" % signame)
    loop.stop()

async def main():
    loop = asyncio.get_running_loop()

    for signame in {'SIGINT', 'SIGTERM'}:
        loop.add_signal_handler(
            getattr(signal, signame),
            functools.partial(ask_exit, signame, loop))

    await asyncio.sleep(3600)

print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")

asyncio.run(main())