コルーチンと Task

この節では、コルーチンと Task を利用する高レベルの asyncio の API の概略を解説します。

コルーチン

async/await 構文で宣言された コルーチン は、 asyncio を使ったアプリケーションを書くのに推奨される方法です。例えば、次のコードスニペット (Python 3.7 以降が必要) は "hello" を出力し、そこから 1 秒待って "world" を出力します:

>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

単にコルーチンを呼び出しただけでは、コルーチンの実行スケジュールは予約されていないことに注意してください:

>>> main()
<coroutine object main at 0x1053bb7c8>

実際にコルーチンを走らせるために、 asyncio は3つの機構を提供しています:

  • 最上位のエントリーポイントである "main()" 関数を実行する asyncio.run() 関数 (上の例を参照してください。)

  • コルーチンを await すること。次のコード片は 1 秒間待機した後に "hello" と出力し、 更に 2 秒間待機してから "world" と出力します:

    import asyncio
    import time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(what)
    
    async def main():
        print(f"started at {time.strftime('%X')}")
    
        await say_after(1, 'hello')
        await say_after(2, 'world')
    
        print(f"finished at {time.strftime('%X')}")
    
    asyncio.run(main())
    

    予想される出力:

    started at 17:13:52
    hello
    world
    finished at 17:13:55
    
  • asyncio の Tasks としてコルーチンを並行して走らせる asyncio.create_task() 関数。

    上のコード例を編集して、ふたつの say_after コルーチンを 並行して 走らせてみましょう:

    async def main():
        task1 = asyncio.create_task(
            say_after(1, 'hello'))
    
        task2 = asyncio.create_task(
            say_after(2, 'world'))
    
        print(f"started at {time.strftime('%X')}")
    
        # Wait until both tasks are completed (should take
        # around 2 seconds.)
        await task1
        await task2
    
        print(f"finished at {time.strftime('%X')}")
    

    予想される出力が、スニペットの実行が前回よりも 1 秒早いことを示していることに注意してください:

    started at 17:14:32
    hello
    world
    finished at 17:14:34
    

Awaitable

あるオブジェクトを await 式の中で使うことができる場合、そのオブジェクトを awaitable オブジェクトと言います。多くの asyncio API は awaitable を受け取るように設計されています。

awaitable オブジェクトには主に3つの種類があります: コルーチン, Task, そして Future です

コルーチン

Python のコルーチンは awaitable であり、そのため他のコルーチンを待機させられます:

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

重要

このドキュメントにおいて「コルーチン」という用語は以下2つの密接に関連した概念に対して使用できます:

  • コルーチン関数: async def 関数;

  • コルーチンオブジェクト: コルーチン関数 を呼び出すと返ってくるオブジェクト.

asyncio は、古くからある ジェネレータベース のコルーチンもサポートしています。

Task

Task は、コルーチンを 並行に スケジュールするのに使います。

asyncio.create_task() のような関数で、コルーチンが Task にラップされているとき、自動的にコルーチンは即時実行されるようにスケジュールされます:

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Future

Future は、非同期処理の 最終結果 を表現する特別な 低レベルの awaitable オブジェクトです。

Future オブジェクトが他の awaitable を 待機させている と言うときは、ある場所で Future が解決されるまでコルーチンが待機するということです。

asyncioのFutureオブジェクトを使うと、async/awaitとコールバック形式のコードを併用できます。

通常、アプリケーション水準のコードで Future オブジェクトを作る 必要はありません

Future オブジェクトはライブラリや asyncio の API で表に出ることもあり、他の awaitable を待機させられます:

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

Future オブジェクトを返す低レベル関数の良い例は loop.run_in_executor() です。

非同期プログラムの実行

asyncio.run(coro, *, debug=False)

coroutine coro を実行し、結果を返します。

この関数は、非同期イベントループの管理と 非同期ジェネレータの終了処理 を行いながら、渡されたコルーチンを実行します。

この関数は、同じスレッドで他の非同期イベントループが実行中のときは呼び出せません。

debugTrue の場合、イベントループはデバッグモードで実行されます。

この関数は常に新しいイベントループを作成し、終了したらそのイベントループを閉じます。 この関数は非同期プログラムのメインのエントリーポイントとして使われるべきで、理想的には 1 回だけ呼び出されるべきです。

以下はプログラム例です:

async def main():
    await asyncio.sleep(1)
    print('hello')

asyncio.run(main())

バージョン 3.7 で追加.

注釈

asyncio.run() のソースコードは Lib/asyncio/runners.py にあります。

Task の作成

asyncio.create_task(coro, *, name=None)

coro coroutineTask でラップし、その実行をスケジュールします。 Task オブジェクトを返します。

もし nameNone でない場合、Task.set_name() を使用し、name がタスクの名前として設定されます。

その Task オブジェクトは get_running_loop() から返されたループの中で実行されます。現在のスレッドに実行中のループが無い場合は、 RuntimeError が送出されます。

この関数は Python 3.7 で追加 されました。 Python 3.7 より前では、代わりに低レベルの asyncio.ensure_future() 関数が使えます:

async def coro():
    ...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
...

バージョン 3.7 で追加.

バージョン 3.8 で変更: name パラメータが追加されました。

スリープ

coroutine asyncio.sleep(delay, result=None, *, loop=None)

delay 秒だけ停止します。

result が提供されている場合は、コルーチン完了時にそれが呼び出し元に返されます。

sleep() は常に現在の Task を一時中断し、他の Task が実行されるのを許可します。

Deprecated since version 3.8, will be removed in version 3.10: loop パラメータ。

現在の時刻を5秒間、毎秒表示するコルーチンの例:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

並行な Task 実行

awaitable asyncio.gather(*aws, loop=None, return_exceptions=False)

aws シーケンスにある awaitable オブジェクト並行 実行します。

aws にある awaitable がコルーチンである場合、自動的に Task としてスケジュールされます。

全ての awaitable が正常終了した場合、その結果は返り値を集めたリストになります。 返り値の順序は、 aws での awaitable の順序に相当します。

return_exceptionsFalse である場合(デフォルト)、gather() で await しているタスクに対して、最初の例外が直接伝えられます。aws に並んでいる他の awaitable は、キャンセルされずに 引き続いて実行されます。

return_exceptionsTrue だった場合、例外は成功した結果と同じように取り扱われ、結果リストに集められます。

gather()キャンセル された場合、起動された全ての (未完了の) awaitable も キャンセル されます。

aws シーケンスにある Task あるいは Future が キャンセル された場合、 CancelledError を送出したかのうように扱われます。つまり、この場合 gather() 呼び出しはキャンセル されません。 これは、起動された 1 つの Task あるいは Future のキャンセルが、他の Task あるいは Future のキャンセルを引き起こすのを避けるためです。

Deprecated since version 3.8, will be removed in version 3.10: loop パラメータ。

以下はプログラム例です:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

注釈

return_exceptions が False の場合、いったん完了状態となった gather() をキャンセルしても起動された awaitables がキャンセルされないことがあります。例えば、 gather は例外を呼び出し元に送出したあと完了状態になることがあるため、 (起動した awaitable のいずれかから送出された) gather からの例外をキャッチした後で gather.cancel() を呼び出しても、他の awaitable がキャンセルされない可能性があります。

バージョン 3.7 で変更: gather 自身がキャンセルされた場合は、 return_exceptions の値に関わらずキャンセルが伝搬されます。

キャンセルからの保護

awaitable asyncio.shield(aw, *, loop=None)

キャンセル から awaitable オブジェクト を保護します。

aw がコルーチンだった場合、自動的に Task としてスケジュールされます。

文:

res = await shield(something())

は、以下と同じです

res = await something()

それを含むコルーチンがキャンセルされた場合を 除きsomething() 内で動作している Task はキャンセルされません。 something() 側から見るとキャンセルは発生しません。 呼び出し元がキャンセルされた場合でも、 "await" 式は CancelledError を送出します。

注意: something() が他の理由 (例えば、原因が自分自身) でキャンセルされた場合は shield() でも保護できません。

完全にキャンセルを無視したい場合 (推奨はしません) は、 shield() 関数は次のように try/except 節と組み合わせることになるでしょう:

try:
    res = await shield(something())
except CancelledError:
    res = None

Deprecated since version 3.8, will be removed in version 3.10: loop パラメータ。

タイムアウト

coroutine asyncio.wait_for(aw, timeout, *, loop=None)

aw awaitable が、完了するかタイムアウトになるのを待ちます。

aw がコルーチンだった場合、自動的に Task としてスケジュールされます。

timeout には None もしくは待つ秒数の浮動小数点数か整数を指定できます。 timeoutNone の場合、 Future が完了するまで待ちます。

タイムアウトが起きた場合は、 Task をキャンセルし asyncio.TimeoutError を送出します。

Task の キャンセル を避けるためには、 shield() の中にラップしてください。

この関数は、 Future が実際にキャンセルされるまで待つので、全体の待ち時間は timeout を超えることもあります。

待機が中止された場合 aw も中止されます。

Deprecated since version 3.8, will be removed in version 3.10: loop パラメータ。

以下はプログラム例です:

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

バージョン 3.7 で変更: aw がタイムアウトでキャンセルされたとき、 wait_foraw がキャンセルされるまで待ちます。 以前は、すぐに asyncio.TimeoutError を送出していました。

要素の終了待機

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

Run awaitable objects in the aws iterable concurrently and block until the condition specified by return_when.

Task と Future の 2 つ (done, pending) を返します。

使い方:

done, pending = await asyncio.wait(aws)

timeout (浮動小数点数または整数) が指定されていたら、処理を返すのを待つ最大秒数を制御するのに使われます。

この関数は asyncio.TimeoutError を送出しないことに注意してください。 タイムアウトが起きたときに完了していなかった Future や Task は、2 つ目の集合の要素として返されるだけです。

return_when でこの関数がいつ結果を返すか指定します。指定できる値は以下の 定数のどれか一つです:

定数

説明

FIRST_COMPLETED

いずれかの Future が終了したかキャンセルされたときに返します。

FIRST_EXCEPTION

いずれかの Future が例外の送出で終了した場合に返します。 例外を送出したフューチャがない場合は、ALL_COMPLETED と等価になります。

ALL_COMPLETED

すべての Future が終了したかキャンセルされたときに返します。

wait_for() と異なり、 wait() はタイムアウトが起きたときに Future をキャンセルしません。

バージョン 3.8 で非推奨: aws にある awaitable のどれかがコルーチンの場合、自動的に Task としてスケジュールされます。 コルーチンオブジェクトを wait() に直接渡すのは 紛らわしい振る舞い を引き起こすため非推奨です。

Deprecated since version 3.8, will be removed in version 3.10: loop パラメータ。

注釈

wait() は自動的にコルーチンを Task としてスケジュールし、その後、暗黙的に作成された Task オブジェクトを組になった集合 (done, pending) に入れて返します。 従って、次のコードは予想した通りには動作しません:

async def foo():
    return 42

coro = foo()
done, pending = await asyncio.wait({coro})

if coro in done:
    # This branch will never be run!

上のスクリプト片は次のように修正できます:

async def foo():
    return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

if task in done:
    # Everything will work as expected now.

バージョン 3.8 で非推奨: wait() にコルーチンオブジェクトを直接渡すのは非推奨です。

asyncio.as_completed(aws, *, loop=None, timeout=None)

イテラブル aws 内の awaitable オブジェクト を並列実行します。コルーチンのイテレータを返します。戻り値の各コルーチンは、残りの awaitable のうちで最も早く得られた結果を待ち受けることができます。

全フューチャが終了する前にタイムアウトが発生した場合 asyncio.TimeoutError を送出します。

Deprecated since version 3.8, will be removed in version 3.10: loop パラメータ。

以下はプログラム例です:

for coro in as_completed(aws):
    earliest_result = await coro
    # ...

外部スレッドからのスケジュール

asyncio.run_coroutine_threadsafe(coro, loop)

与えられたイベントループにコルーチンを送ります。 この処理は、スレッドセーフです。

他の OS スレッドから結果を待つための concurrent.futures.Future を返します。

この関数は、イベントループが動作しているスレッドとは異なる OS スレッドから呼び出すためのものです。 例えば次のように使います:

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

コルーチンから例外が送出された場合、返された Future に通知されます。 これはイベントループの Task をキャンセルするのにも使えます:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

このドキュメントの asyncio-multithreading 節を参照してください。

他の asyncio 関数とは異なり、この関数は明示的に渡される loop 引数を必要とします。

バージョン 3.5.1 で追加.

イントロスペクション

asyncio.current_task(loop=None)

現在実行中の Task インスタンスを返します。実行中の Task が無い場合は None を返します。

loopNone の場合、 get_running_loop() が現在のループを取得するのに使われます。

バージョン 3.7 で追加.

asyncio.all_tasks(loop=None)

ループで実行された Task オブジェクトでまだ完了していないものの集合を返します。

loopNone の場合、 get_running_loop() は現在のループを取得するのに使われます。

バージョン 3.7 で追加.

Task オブジェクト

class asyncio.Task(coro, *, loop=None, name=None)

Python コルーチン を実行する Future オブジェクトです。 スレッドセーフではありません。

Task はイベントループのコルーチンを実行するのに使われます。 Future でコルーチンが待機している場合、 Task は自身のコルーチンの実行を一時停止させ、 Future の完了を待ちます。 Future が 完了 したら、 Task が内包しているコルーチンの実行を再開します。

イベントループは協調スケジューリングを使用します。つまり、イベントループは同時に 1 つの Task のみ実行します。 Task が Future の完了を待っているときは、イベントループは他の Task やコールバックを動作させるか、 IO 処理を実行します。

Task を作成するには高レベルの asyncio.create_task() 関数、あるいは低レベルの loop.create_task() 関数や ensure_future() 関数を使用してください。 手作業での Task の実装は推奨されません。

実行中のタスクをキャンセルするためには、cancel() メソッドを使用します。このメソッドを呼ぶと、タスクはそれを内包するコルーチンに対して CancelledError 例外を送出します。キャンセルの際にコルーチンが Future オブジェクトを待っていた場合、その Future オブジェクトはキャンセルされます。

cancelled() は、タスクがキャンセルされたかを調べるのに使用できます。タスクを内包するコルーチンで CancelledError 例外が抑制されておらず、かつタスクが実際にキャンセルされている場合に、このメソッドは True を変えます。

asyncio.Task は、Future.set_result()Future.set_exception() を除いて、Future の API をすべて継承しています。

Task は contextvars モジュールをサポートします。Task が作られたときに現在のコンテキストがコピーされ、のちに Task のコルーチンを実行する際に、コピーされたコンテキストが使用されます。

バージョン 3.7 で変更: contextvars モジュールのサポートを追加。

バージョン 3.8 で変更: name パラメータが追加されました。

Deprecated since version 3.8, will be removed in version 3.10: loop パラメータ。

cancel()

このタスクに、自身のキャンセルを要求します。

このメソッドは、イベントループの次のステップにおいて、タスクがラップしているコルーチン内で CancelledError 例外が送出されるように準備します。

コルーチン側では try ... ... except CancelledError ... finally ブロックで例外を処理することにより、クリーンアップ処理を行なったり、リクエストを拒否したりする機会が与えられます。この特性を使ってキャンセル処理を完全に抑え込むことも可能であることから、 Future.cancel() と異なり、 Task.cancel() は Task が実際にキャンセルされることを保証しません。ただしそのような処理は一般的ではありませんし、そのような処理をしないことが望ましいです。

以下の例は、コルーチンがどのようにしてキャンセルのリクエストを阻止するかを示しています:

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now
cancelled()

Task が キャンセルされた 場合に True を返します。

cancel() メソッドによりキャンセルがリクエストされ、かつ Task がラップしているコルーチンが内部で送出された CancelledError 例外を伝達したとき、 Task は実際に キャンセル されます。

done()

Task が 完了 しているなら True を返します。

Task がラップしているコルーチンが値を返すか、例外を送出するか、または Task がキャンセルされたとき、 Task は 完了 します。

result()

Task の結果を返します。

Task が 完了 している場合、ラップしているコルーチンの結果が返されます (コルーチンが例外を送出された場合、その例外が例外が再送出されます)

Task が キャンセル されている場合、このメソッドは CancelledError 例外を送出します。

Task の結果がまだ未設定の場合、このメソッドは InvalidStateError 例外を送出します。

exception()

Task の例外を返します。

ラップされたコルーチンが例外を送出した場合、その例外が返されます。ラップされたコルーチンが正常終了した場合、このメソッドは None を返します。

Task が キャンセル されている場合、このメソッドは CancelledError 例外を送出します。

Task がまだ 完了 していない場合、このメソッドは InvalidStateError 例外を送出します。

add_done_callback(callback, *, context=None)

Task が 完了 したときに実行されるコールバックを追加します。

このメソッドは低水準のコールバックベースのコードでのみ使うべきです。

詳細については Future.add_done_callback() のドキュメントを参照してください。

remove_done_callback(callback)

コールバックリストから callback を削除します。

このメソッドは低水準のコールバックベースのコードでのみ使うべきです。

詳細については Future.remove_done_callback() のドキュメントを参照してください。

get_stack(*, limit=None)

このタスクのスタックフレームのリストを返します。

コルーチンが完了していない場合、これはサスペンドされた時点でのスタックを返します。コルーチンが正常に処理を完了したか、キャンセルされていた場合は空のリストを返します。コルーチンが例外で終了した場合はトレースバックフレームのリストを返します。

フレームは常に古いものから新しい物へ並んでいます。

サスペンドされているコルーチンの場合スタックフレームが 1 個だけ返されます。

オプション引数 limit は返すフレームの最大数を指定します; デフォルトでは取得可能な全てのフレームを返します。返されるリストの順番は、スタックが返されるか、トレースバックが返されるかによって変わります: スタックでは新しい順に並んだリストが返されますが、トレースバックでは古い順に並んだリストが返されます(これは traceback モジュールの振る舞いと一致します)。

print_stack(*, limit=None, file=None)

このタスクのスタックまたはトレースバックを出力します。

このメソッドは get_stack() によって取得されるフレームに対し、 traceback モジュールと同じような出力を生成します。

引数 limitget_stack() にそのまま渡されます。

引数 file は出力を書き込む I/O ストリームを指定します; デフォルトでは出力は標準エラー出力 sys.stderr に書き込まれます。

get_coro()

Task がラップしているコルーチンオブジェクトを返します。

バージョン 3.8 で追加.

get_name()

Task の名前を返します。

Task に対して明示的に名前が設定されていない場合, デフォルトの asyncio Task 実装はタスクをインスタンス化する際にデフォルトの名前を生成します。

バージョン 3.8 で追加.

set_name(value)

Task に名前を設定します。

引数 value は文字列に変換可能なオブジェクトであれば何でもかまいません。

Task のデフォルト実装では、名前はオブジェクトの repr() メソッドの出力で確認できます。

バージョン 3.8 で追加.

classmethod all_tasks(loop=None)

イベントループのすべての Task の集合を返します。

By default all tasks for the current event loop are returned. If loop is None, the get_event_loop() function is used to get the current loop.

Deprecated since version 3.7, will be removed in version 3.9: Do not call this as a task method. Use the asyncio.all_tasks() function instead.

classmethod current_task(loop=None)

Return the currently running task or None.

If loop is None, the get_event_loop() function is used to get the current loop.

Deprecated since version 3.7, will be removed in version 3.9: Do not call this as a task method. Use the asyncio.current_task() function instead.

Generator-based Coroutines

注釈

Support for generator-based coroutines is deprecated and is scheduled for removal in Python 3.10.

Generator-based coroutines predate async/await syntax. They are Python generators that use yield from expressions to await on Futures and other coroutines.

Generator-based coroutines should be decorated with @asyncio.coroutine, although this is not enforced.

@asyncio.coroutine

Decorator to mark generator-based coroutines.

This decorator enables legacy generator-based coroutines to be compatible with async/await code:

@asyncio.coroutine
def old_style_coroutine():
    yield from asyncio.sleep(1)

async def main():
    await old_style_coroutine()

This decorator should not be used for async def coroutines.

Deprecated since version 3.8, will be removed in version 3.10: Use async def instead.

asyncio.iscoroutine(obj)

objコルーチンオブジェクト であれば True を返します。

This method is different from inspect.iscoroutine() because it returns True for generator-based coroutines.

asyncio.iscoroutinefunction(func)

Return True if func is a coroutine function.

This method is different from inspect.iscoroutinefunction() because it returns True for generator-based coroutine functions decorated with @coroutine.