asyncio での開発

非同期プログラミングは伝統的な "同期的" プログラミングとは異なります。

このページはよくある間違いや落とし穴を列挙し、それらを回避する方法を説明します。

デバッグモード

asyncio はデフォルトで本運用モードで実行されます。いっぽう、開発を容易にするために asyncio は "デバッグモード" を持っています。

asyncio のデバッグモードを有効化する方法はいくつかあります:

デバッグモードを有効化することに加え、以下も検討してください:

  • asyncio ロガー のログレベルを logging.DEBUG に設定します。例えばアプリケーションの起動時に以下を実行します:

    logging.basicConfig(level=logging.DEBUG)
    
  • warnings モジュールが ResourceWarning 警告を表示するように設定します。やり方のひとつは -W default コマンドラインオプションを使うことです。

デバッグモードが有効化されたときの動作:

  • asyncio は 待ち受け処理 (await) を伴わないコルーチン がないかをチェックし、それらを記録します; これにより "待ち受け忘れ" の落とし穴にはまる可能性を軽減します。

  • スレッドセーフでない asyncio APIs の多く (loop.call_soon()loop.call_at() など) は、誤ったスレッドから呼び出されたときに例外を送出します。

  • I/O セレクタが I/O 処理を実行する時間が長すぎる場合、その実行時間が記録されます。

  • 実行時間が100ミリ秒を超えるコールバックは記録されます。 "遅い" の判断基準となる実行時間の最小値は loop.slow_callback_duration 属性で設定できます。

並行処理とマルチスレッド処理

イベントループはスレッド(典型的にはメインスレッド)内で動作し、すべてのコールバックとタスクをそのスレッド内で実行します。ひとつのタスクがイベントループ内で実行される間、他のタスクを同じスレッド内で実行することはできません。タスクが await 式を実行すると、実行中のタスクはサスペンドされ、イベントループは次のタスクを実行します。

別の OS スレッドからのコールバック (callback) をスケジュールする場合、 loop.call_soon_threadsafe() メソッドを使ってください。例:

loop.call_soon_threadsafe(callback, *args)

ほぼ全ての非同期オブジェクトはスレッドセーフではありませんが、タスクやコールバックの外側で非同期オブジェクトを使うコードが存在しない限り、それが問題にはなることはほとんどありません。もしそのような目的で低レベルの asyncio API を呼び出すようなコードを書く必要がある場合、 loop.call_soon_threadsafe() メソッドを使ってください。例:

loop.call_soon_threadsafe(fut.cancel)

別の OS スレッドからコルーチンオブジェクトをスケジュールする場合は、 run_coroutine_threadsafe() メソッドを使ってください。 run_coroutine_threadsafe() は結果にアクセスするための concurrent.futures.Future オブジェクトを返します:

async def coro_func():
     return await asyncio.sleep(1, 42)

# Later in another OS thread:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()

シグナルの処理やサブプロセスの実行を行うには、イベントループはメインスレッド内で実行しなければなりません。

The loop.run_in_executor() メソッドを concurrent.futures.ThreadPoolExecutor とともに使用することで、イベントループの OS スレッドをブロックすることなく、別の OS スレッド内でブロッキングコードを実行することができます。

現在のところ、 (たとえば multiprocessing で開始したような) 別のプロセスからコルーチンやコールバックを直接スケジュールすることはできません。 イベントループのメソッド 節では、イベントループをブロックすることなくパイプからの読み込みやファイルデスクリプタの監視ができる API のリストを掲載しています。さらに、 asyncio の サブプロセス API はイベントループからプロセスを開始したりプロセスと通信したりする方法を提供します。 最後に、前述の loop.run_in_executor() メソッドは concurrent.futures.ProcessPoolExecutor とともに使用することで、別のプロセス内でコードを実行することもできます。

ブロッキングコードの実行

ブロッキングコード (CPU バウンドなコード) を直接呼び出すべきではありません。たとえば、 CPU 負荷の高い関数を1秒実行したとすると、並行に処理されている全ての非同期タスクと I/O 処理は1秒遅れる可能性があります。

エグゼキューターを使用することにより、イベントループの OS スレッドをブロックすることなく、別のスレッドや別のプロセス上でタスクを実行することができます。詳しくは loop.run_in_executor() メソッドを参照してください。

ログ記録

asyncio は logging モジュールを利用し、 全てのログ記録は "asyncio" ロガーを通じて行われます。

デフォルトのログレベルは logging.INFO ですが、これは簡単に調節できます:

logging.getLogger("asyncio").setLevel(logging.WARNING)

ネットワークログ記録は、イベントループをブロックし得ます。ログ処理のスレッドを分離するか、ノンブロッキング IO を使用することを推奨します。例えば、 ブロックする handler を扱う を見てください。

待ち受け処理を伴わないコルーチンの検出

コルーチンが呼び出されただけで、待ち受け処理がない場合 (たとえば await coro() のかわりに coro() と書いてしまった場合) 、またはコルーチンが asyncio.create_task() を使わずにスケジュールされた場合、 asyncio は RuntimeWarning 警告を送出します:

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

出力:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

デバッグモードの出力:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

通常の修正方法はコルーチンを待ち受ける (await) か、 asyncio.create_task() 関数を呼び出すことです:

async def main():
    await test()

回収されない例外の検出

もし Future.set_exception() メソッドが呼び出されても、その Future オブジェクトを待ち受けていなければ、例外は決してユーザーコードまで伝播しません。この場合 asyncio は、 Future オブジェクトがガベージコレクションの対象となったときにログメッセージを送出することがあります。

処理されない例外の例:

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

出力:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

タスクが生成された箇所を特定するには、 デバッグモードを有効化して トレースバックを取得してください:

asyncio.run(main(), debug=True)

デバッグモードの出力:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed