使用 asyncio 開發

非同步程式設計 (asynchronous programming) 與傳統的"順序"程式設計 (sequential programming) 不同。

本頁列出常見的錯誤和陷阱,並解釋如何避免它們。

除錯模式

在預設情況下 asyncio 以正式生產模式 (production mode) 執行。為了讓開發更輕鬆,asyncio 還有一種除錯模式 (debug mode)

有幾種方法可以啟用 asyncio 除錯模式:

除了啟用除錯模式外,還要考慮:

啟用除錯模式時:

  • 許多非執行緒安全 (non-threadsafe) 的 asyncio APIs(例如 loop.call_soon()loop.call_at() 方法),如果從錯誤的執行緒呼叫就會引發例外。

  • 如果執行一個 I/O 操作花費的時間太長,則將 I/O 選擇器 (selector) 的執行時間記錄到日誌中。

  • 執行時間超過 100 毫秒的回呼 (callback) 將會被記錄於日誌。屬性 loop.slow_callback_duration 可用於設定以秒為單位的最小執行持續時間,超過這個值執行時間就會被視為"緩慢"。

並行性和多執行緒 (Concurrency and Multithreading)

事件迴圈在執行緒中運行(通常是主執行緒),並在其執行緒中執行所有回呼和 Tasks(任務)。當一個 Task 在事件迴圈中運行時,沒有其他 Task 可以在同一個執行緒中運行。當一個 Task 執行一個 await 運算式時,正在執行的 Task 會被暫停,而事件迴圈會執行下一個 Task。

要從不同的 OS 執行緒為一個 callback 排程,應該使用 loop.call_soon_threadsafe() 方法。例如:

loop.call_soon_threadsafe(callback, *args)

幾乎所有 asyncio 物件都不支援執行緒安全 (thread safe),這通常不是問題,除非在 Task 或回呼函式之外有程式需要和它們一起運作。如果需要這樣的程式來呼叫低階 asyncio API,應該使用 loop.call_soon_threadsafe() 方法,例如:

loop.call_soon_threadsafe(fut.cancel)

要從不同的 OS 執行緒為一個協程物件排程,應該使用 run_coroutine_threadsafe() 函式。它會回傳一個 concurrent.futures.Future 以存取結果:

async def coro_func():
     return await asyncio.sleep(1, 42)

# 之後在另一個 OS 執行緒中:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# 等待結果:
result = future.result()

為了能夠處理訊號,事件迴圈必須於主執行緒中運行。

loop.run_in_executor() 方法可以和 concurrent.futures.ThreadPoolExecutorInterpreterPoolExecutor 一起使用,這能夠在作業系統上另一個不同的執行緒中執行阻塞程式,且避免阻塞執行事件迴圈的執行緒。

目前沒有什麼辦法能直接從另一個行程(例如透過 multiprocessing 啟動的程序)來為協程或回呼排程。Event loop methods小節列出了可以從 pipes(管道)讀取並監視 file descriptor(檔案描述器)而不會阻塞事件迴圈的 API。此外,asyncio 的子行程 API 提供了一種啟動行程並從事件迴圈與其通訊的辦法。最後,之前提到的 loop.run_in_executor() 方法也可和 concurrent.futures.ProcessPoolExecutor 搭配使用,以在另一個行程中執行程式。

執行阻塞的程式

不應該直接呼叫阻塞(CPU 密集型)程式。例如一個執行 1 秒 CPU 密集型計算的函式,那麼所有並行非同步 Tasks 和 IO 操作都會被延遲 1 秒。

一個 executor(執行器)可以被用來在不同的執行緒(包含不同直譯器)、或甚至不同的行程中執行任務,以避免事件迴圈阻塞了 OS 執行緒。詳情請見 loop.run_in_executor() 方法。

日誌記錄

asyncio 使用 logging 模組,所有日誌記錄都是透過 "asyncio" logger 執行的。

日誌級別被預設為 logging.INFO,它可以很容易地被調整:

logging.getLogger("asyncio").setLevel(logging.WARNING)

網路日誌記錄可能會阻塞事件迴圈。建議使用獨立的執行緒來處理日誌或使用非阻塞 IO,範例請參見 Dealing with handlers that block

偵測從未被等待的 (never-awaited) 協程

當協程函式被呼叫而不是被等待時(即執行 coro() 而不是 await coro())或者協程沒有透過 asyncio.create_task() 被排程,asyncio 將會發出 RuntimeWarning

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

輸出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

除錯模式中的輸出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

常用的修復方法是去等待協程或者呼叫 asyncio.create_task() 函式:

async def main():
    await test()

偵測從未被取得的 (never-retrieved) 例外

如果呼叫 Future.set_exception(),但 Future 物件從未被等待,例外將無法被傳播 (propagate) 到使用者程式。在這種情況下,當 Future 物件被垃圾回收 (garbage collected) 時,asyncio 將發出一則日誌訊息。

未處理例外的例子:

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

輸出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

啟用除錯模式以取得任務建立處的追蹤資訊 (traceback):

asyncio.run(main(), debug=True)

除錯模式中的輸出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

Asynchronous generators best practices

Writing correct and efficient asyncio code requires awareness of certain pitfalls. This section outlines essential best practices that can save you hours of debugging.

Close asynchronous generators explicitly

It is recommended to manually close the asynchronous generator. If a generator exits early - for example, due to an exception raised in the body of an async for loop - its asynchronous cleanup code may run in an unexpected context. This can occur after the tasks it depends on have completed, or during the event loop shutdown when the async-generator's garbage collection hook is called.

To avoid this, explicitly close the generator by calling its aclose() method, or use the contextlib.aclosing() context manager:

import asyncio
import contextlib

async def gen():
  yield 1
  yield 2

async def func():
  async with contextlib.aclosing(gen()) as g:
    async for x in g:
      break  # Don't iterate until the end

asyncio.run(func())

As noted above, the cleanup code for these asynchronous generators is deferred. The following example demonstrates that the finalization of an asynchronous generator can occur in an unexpected order:

import asyncio
work_done = False

async def cursor():
    try:
        yield 1
    finally:
        assert work_done

async def rows():
    global work_done
    try:
        yield 2
    finally:
        await asyncio.sleep(0.1) # immitate some async work
        work_done = True


async def main():
    async for c in cursor():
        async for r in rows():
            break
        break

asyncio.run(main())

For this example, we get the following output:

unhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-3' coro=<<async_generator_athrow without __name__>()> exception=AssertionError()>
Traceback (most recent call last):
  File "example.py", line 6, in cursor
    yield 1
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "example.py", line 8, in cursor
    assert work_done
           ^^^^^^^^^
AssertionError

The cursor() asynchronous generator was finalized before the rows generator - an unexpected behavior.

The example can be fixed by explicitly closing the cursor and rows async-generators:

async def main():
    async with contextlib.aclosing(cursor()) as cursor_gen:
        async for c in cursor_gen:
            async with contextlib.aclosing(rows()) as rows_gen:
                async for r in rows_gen:
                    break
            break

Create asynchronous generators only when the event loop is running

It is recommended to create asynchronous generators only after the event loop has been created.

To ensure that asynchronous generators close reliably, the event loop uses the sys.set_asyncgen_hooks() function to register callback functions. These callbacks update the list of running asynchronous generators to keep it in a consistent state.

When the loop.shutdown_asyncgens() function is called, the running generators are stopped gracefully and the list is cleared.

The asynchronous generator invokes the corresponding system hook during its first iteration. At the same time, the generator records that the hook has been called and does not call it again.

Therefore, if iteration begins before the event loop is created, the event loop will not be able to add the generator to its list of active generators because the hooks are set after the generator attempts to call them. Consequently, the event loop will not be able to terminate the generator if necessary.

Consider the following example:

import asyncio

async def agenfn():
    try:
        yield 10
    finally:
        await asyncio.sleep(0)


with asyncio.Runner() as runner:
    agen = agenfn()
    print(runner.run(anext(agen)))
    del agen

輸出:

10
Exception ignored while closing generator <async_generator object agenfn at 0x000002F71CD10D70>:
Traceback (most recent call last):
  File "example.py", line 13, in <module>
    del agen
        ^^^^
RuntimeError: async generator ignored GeneratorExit

This example can be fixed as follows:

import asyncio

async def agenfn():
    try:
        yield 10
    finally:
        await asyncio.sleep(0)

async def main():
    agen = agenfn()
    print(await anext(agen))
    del agen

asyncio.run(main())

Avoid concurrent iteration and closure of the same generator

Async generators may be reentered while another __anext__() / athrow() / aclose() call is in progress. This may lead to an inconsistent state of the async generator and can cause errors.

Let's consider the following example:

import asyncio

async def consumer():
    for idx in range(100):
        await asyncio.sleep(0)
        message = yield idx
        print('received', message)

async def amain():
    agenerator = consumer()
    await agenerator.asend(None)

    fa = asyncio.create_task(agenerator.asend('A'))
    fb = asyncio.create_task(agenerator.asend('B'))
    await fa
    await fb

asyncio.run(amain())

輸出:

received A
Traceback (most recent call last):
  File "test.py", line 38, in <module>
    asyncio.run(amain())
    ~~~~~~~~~~~^^^^^^^^^
  File "Lib/asyncio/runners.py", line 204, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "Lib/asyncio/runners.py", line 127, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "Lib/asyncio/base_events.py", line 719, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
  File "test.py", line 36, in amain
    await fb
RuntimeError: anext(): asynchronous generator is already running

Therefore, it is recommended to avoid using asynchronous generators in parallel tasks or across multiple event loops.