使用 asyncio 開發

非同步程式設計 (asynchronous programming) 與傳統的"順序"程式設計 (sequential programming) 不同。

本頁列出常見的錯誤和陷阱,並解釋如何避免它們。

除錯模式

在預設情況下 asyncio 以正式生產模式 (production mode) 執行。為了讓開發更輕鬆,asyncio 還有一種除錯模式 (debug mode)

有幾種方法可以啟用 asyncio 除錯模式:

除了啟用除錯模式外,還要考慮:

啟用除錯模式時:

  • asyncio 會檢查未被等待的協程並記錄他們;這會減輕"被遺忘的等待 (forgotten await)" 問題。

  • 許多非執行緒安全 (non-threadsafe) 的 asyncio APIs(例如 loop.call_soon()loop.call_at() 方法),如果從錯誤的執行緒呼叫就會引發例外。

  • 如果執行一個 I/O 操作花費的時間太長,則將 I/O 選擇器 (selector) 的執行時間記錄到日誌中。

  • 執行時間超過 100 毫秒的回呼 (callback) 將會被記錄於日誌。屬性 loop.slow_callback_duration 可用於設定以秒為單位的最小執行持續時間,超過這個值執行時間就會被視為"緩慢"。

並行性和多執行緒 (Concurrency and Multithreading)

事件迴圈在執行緒中運行(通常是主執行緒),並在其執行緒中執行所有回呼和 Tasks(任務)。當一個 Task 在事件迴圈中運行時,沒有其他 Task 可以在同一個執行緒中運行。當一個 Task 執行一個 await 運算式時,正在執行的 Task 會被暫停,而事件迴圈會執行下一個 Task。

要從不同的 OS 執行緒為一個 callback 排程,應該使用 loop.call_soon_threadsafe() 方法。例如:

loop.call_soon_threadsafe(callback, *args)

幾乎所有 asyncio 物件都不支援執行緒安全 (thread safe),這通常不是問題,除非在 Task 或回呼函式之外有程式需要和它們一起運作。如果需要這樣的程式來呼叫低階 asyncio API,應該使用 loop.call_soon_threadsafe() 方法,例如:

loop.call_soon_threadsafe(fut.cancel)

要從不同的 OS 執行緒為一個協程物件排程,應該使用 run_coroutine_threadsafe() 函式。它會回傳一個 concurrent.futures.Future 以存取結果:

async def coro_func():
     return await asyncio.sleep(1, 42)

# Later in another OS thread:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()

為了能夠處理訊號和執行子行程,事件迴圈必須於主執行緒中運行。

loop.run_in_executor() 方法可以和 concurrent.futures.ThreadPoolExecutor 一起使用,這能夠在作業系統上另一個不同的執行緒中執行阻塞程式,且避免阻塞執行事件迴圈的執行緒。

目前沒有什麼辦法能直接從另一個行程(例如透過 multiprocessing 啟動的程序)來為協程或回呼排程。事件循环方法集小節列出了可以從 pipes(管道)讀取並監視 file descriptor(檔案描述器)而不會阻塞事件迴圈的 API。此外,asyncio 的子行程 API 提供了一種啟動行程並從事件迴圈與其通訊的辦法。最後,之前提到的 loop.run_in_executor() 方法也可和 concurrent.futures.ProcessPoolExecutor 搭配使用,以在另一個行程中執行程式。

執行阻塞的程式

不應該直接呼叫阻塞(CPU 密集型)程式。例如一個執行 1 秒 CPU 密集型計算的函式,那麼所有並行非同步 Tasks 和 IO 操作都會被延遲 1 秒。

一個 executor(執行器)可以被用來在不同的執行緒、或甚至不同的行程中執行任務,以避免使用事件迴圈阻塞 OS 執行緒。詳情請見 loop.run_in_executor() 方法。

日誌記錄

asyncio 使用 logging 模組,所有日誌記錄都是透過 "asyncio" logger 執行的。

日誌級別被預設為 logging.INFO,它可以很容易地被調整:

logging.getLogger("asyncio").setLevel(logging.WARNING)

网络日志会阻塞事件循环。 建议使用一个单独进程来处理日志或者使用非阻塞式 IO。 例如,可以参阅 处理日志 handler 的阻塞

偵測從未被等待的 (never-awaited) 協程

當協程函式被呼叫而不是被等待時(即執行 coro() 而不是 await coro())或者協程沒有透過 asyncio.create_task() 被排程,asyncio 將會發出 RuntimeWarning

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

輸出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

除錯模式中的輸出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

常用的修復方法是去等待協程或者呼叫 asyncio.create_task() 函式:

async def main():
    await test()

偵測從未被獲取的 (never-retrieved) 例外

如果呼叫 Future.set_exception(),但 Future 物件從未被等待,例外將無法被傳播 (propagate) 到使用者程式。在這種情況下,當 Future 物件被垃圾回收 (garbage collected) 時,asyncio 將發出一則日誌訊息。

未處理例外的例子:

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

輸出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

啟用除錯模式以取得任務建立處的追蹤資訊 (traceback):

asyncio.run(main(), debug=True)

除錯模式中的輸出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed