使用 asyncio 開發¶
非同步程式設計 (asynchronous programming) 與傳統的"順序"程式設計 (sequential programming) 不同。
本頁列出常見的錯誤和陷阱,並解釋如何避免它們。
除錯模式¶
在預設情況下 asyncio 以正式生產模式 (production mode) 執行。為了讓開發更輕鬆,asyncio 還有一種除錯模式 (debug mode)。
有幾種方法可以啟用 asyncio 除錯模式:
將
PYTHONASYNCIODEBUG環境變數設定為1。將
debug=True傳遞給asyncio.run()。呼叫
loop.set_debug()。
除了啟用除錯模式外,還要考慮:
將 asyncio logger(日誌記錄器)的日誌級别設置為
logging.DEBUG,例如下面的程式片段可以在應用程式啟動時運行:logging.basicConfig(level=logging.DEBUG)
配置
warnings模組以顯示ResourceWarning警告。一種方法是使用-Wdefault命令列選項。
啟用除錯模式時:
許多非執行緒安全 (non-threadsafe) 的 asyncio APIs(例如
loop.call_soon()和loop.call_at()方法),如果從錯誤的執行緒呼叫就會引發例外。如果執行一個 I/O 操作花費的時間太長,則將 I/O 選擇器 (selector) 的執行時間記錄到日誌中。
執行時間超過 100 毫秒的回呼 (callback) 將會被記錄於日誌。屬性
loop.slow_callback_duration可用於設定以秒為單位的最小執行持續時間,超過這個值執行時間就會被視為"緩慢"。
並行性和多執行緒 (Concurrency and Multithreading)¶
事件迴圈在執行緒中運行(通常是主執行緒),並在其執行緒中執行所有回呼和 Tasks(任務)。當一個 Task 在事件迴圈中運行時,沒有其他 Task 可以在同一個執行緒中運行。當一個 Task 執行一個 await 運算式時,正在執行的 Task 會被暫停,而事件迴圈會執行下一個 Task。
要從不同的 OS 執行緒為一個 callback 排程,應該使用 loop.call_soon_threadsafe() 方法。例如:
loop.call_soon_threadsafe(callback, *args)
幾乎所有 asyncio 物件都不支援執行緒安全 (thread safe),這通常不是問題,除非在 Task 或回呼函式之外有程式需要和它們一起運作。如果需要這樣的程式來呼叫低階 asyncio API,應該使用 loop.call_soon_threadsafe() 方法,例如:
loop.call_soon_threadsafe(fut.cancel)
要從不同的 OS 執行緒為一個協程物件排程,應該使用 run_coroutine_threadsafe() 函式。它會回傳一個 concurrent.futures.Future 以存取結果:
async def coro_func():
return await asyncio.sleep(1, 42)
# 之後在另一個 OS 執行緒中:
future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# 等待結果:
result = future.result()
為了能夠處理訊號,事件迴圈必須於主執行緒中運行。
loop.run_in_executor() 方法可以和 concurrent.futures.ThreadPoolExecutor 或 InterpreterPoolExecutor 一起使用,這能夠在作業系統上另一個不同的執行緒中執行阻塞程式,且避免阻塞執行事件迴圈的執行緒。
目前沒有什麼辦法能直接從另一個行程(例如透過 multiprocessing 啟動的程序)來為協程或回呼排程。Event loop methods小節列出了可以從 pipes(管道)讀取並監視 file descriptor(檔案描述器)而不會阻塞事件迴圈的 API。此外,asyncio 的子行程 API 提供了一種啟動行程並從事件迴圈與其通訊的辦法。最後,之前提到的 loop.run_in_executor() 方法也可和 concurrent.futures.ProcessPoolExecutor 搭配使用,以在另一個行程中執行程式。
執行阻塞的程式¶
不應該直接呼叫阻塞(CPU 密集型)程式。例如一個執行 1 秒 CPU 密集型計算的函式,那麼所有並行非同步 Tasks 和 IO 操作都會被延遲 1 秒。
一個 executor(執行器)可以被用來在不同的執行緒(包含不同直譯器)、或甚至不同的行程中執行任務,以避免事件迴圈阻塞了 OS 執行緒。詳情請見 loop.run_in_executor() 方法。
日誌記錄¶
asyncio 使用 logging 模組,所有日誌記錄都是透過 "asyncio" logger 執行的。
日誌級別被預設為 logging.INFO,它可以很容易地被調整:
logging.getLogger("asyncio").setLevel(logging.WARNING)
網路日誌記錄可能會阻塞事件迴圈。建議使用獨立的執行緒來處理日誌或使用非阻塞 IO,範例請參見 Dealing with handlers that block。
偵測從未被等待的 (never-awaited) 協程¶
當協程函式被呼叫而不是被等待時(即執行 coro() 而不是 await coro())或者協程沒有透過 asyncio.create_task() 被排程,asyncio 將會發出 RuntimeWarning:
import asyncio
async def test():
print("never scheduled")
async def main():
test()
asyncio.run(main())
輸出:
test.py:7: RuntimeWarning: coroutine 'test' was never awaited
test()
除錯模式中的輸出:
test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
File "../t.py", line 9, in <module>
asyncio.run(main(), debug=True)
< .. >
File "../t.py", line 7, in main
test()
test()
常用的修復方法是去等待協程或者呼叫 asyncio.create_task() 函式:
async def main():
await test()
偵測從未被取得的 (never-retrieved) 例外¶
如果呼叫 Future.set_exception(),但 Future 物件從未被等待,例外將無法被傳播 (propagate) 到使用者程式。在這種情況下,當 Future 物件被垃圾回收 (garbage collected) 時,asyncio 將發出一則日誌訊息。
未處理例外的例子:
import asyncio
async def bug():
raise Exception("not consumed")
async def main():
asyncio.create_task(bug())
asyncio.run(main())
輸出:
Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
exception=Exception('not consumed')>
Traceback (most recent call last):
File "test.py", line 4, in bug
raise Exception("not consumed")
Exception: not consumed
啟用除錯模式以取得任務建立處的追蹤資訊 (traceback):
asyncio.run(main(), debug=True)
除錯模式中的輸出:
Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
exception=Exception('not consumed') created at asyncio/tasks.py:321>
source_traceback: Object created at (most recent call last):
File "../t.py", line 9, in <module>
asyncio.run(main(), debug=True)
< .. >
Traceback (most recent call last):
File "../t.py", line 4, in bug
raise Exception("not consumed")
Exception: not consumed
Asynchronous generators best practices¶
Writing correct and efficient asyncio code requires awareness of certain pitfalls. This section outlines essential best practices that can save you hours of debugging.
Close asynchronous generators explicitly¶
It is recommended to manually close the
asynchronous generator. If a generator
exits early - for example, due to an exception raised in the body of
an async for loop - its asynchronous cleanup code may run in an
unexpected context. This can occur after the tasks it depends on have completed,
or during the event loop shutdown when the async-generator's garbage collection
hook is called.
To avoid this, explicitly close the generator by calling its
aclose() method, or use the contextlib.aclosing()
context manager:
import asyncio
import contextlib
async def gen():
yield 1
yield 2
async def func():
async with contextlib.aclosing(gen()) as g:
async for x in g:
break # Don't iterate until the end
asyncio.run(func())
As noted above, the cleanup code for these asynchronous generators is deferred. The following example demonstrates that the finalization of an asynchronous generator can occur in an unexpected order:
import asyncio
work_done = False
async def cursor():
try:
yield 1
finally:
assert work_done
async def rows():
global work_done
try:
yield 2
finally:
await asyncio.sleep(0.1) # immitate some async work
work_done = True
async def main():
async for c in cursor():
async for r in rows():
break
break
asyncio.run(main())
For this example, we get the following output:
unhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-3' coro=<<async_generator_athrow without __name__>()> exception=AssertionError()>
Traceback (most recent call last):
File "example.py", line 6, in cursor
yield 1
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "example.py", line 8, in cursor
assert work_done
^^^^^^^^^
AssertionError
The cursor() asynchronous generator was finalized before the rows
generator - an unexpected behavior.
The example can be fixed by explicitly closing the
cursor and rows async-generators:
async def main():
async with contextlib.aclosing(cursor()) as cursor_gen:
async for c in cursor_gen:
async with contextlib.aclosing(rows()) as rows_gen:
async for r in rows_gen:
break
break
Create asynchronous generators only when the event loop is running¶
It is recommended to create asynchronous generators only after the event loop has been created.
To ensure that asynchronous generators close reliably, the event loop uses the
sys.set_asyncgen_hooks() function to register callback functions. These
callbacks update the list of running asynchronous generators to keep it in a
consistent state.
When the loop.shutdown_asyncgens()
function is called, the running generators are stopped gracefully and the
list is cleared.
The asynchronous generator invokes the corresponding system hook during its first iteration. At the same time, the generator records that the hook has been called and does not call it again.
Therefore, if iteration begins before the event loop is created, the event loop will not be able to add the generator to its list of active generators because the hooks are set after the generator attempts to call them. Consequently, the event loop will not be able to terminate the generator if necessary.
Consider the following example:
import asyncio
async def agenfn():
try:
yield 10
finally:
await asyncio.sleep(0)
with asyncio.Runner() as runner:
agen = agenfn()
print(runner.run(anext(agen)))
del agen
輸出:
10
Exception ignored while closing generator <async_generator object agenfn at 0x000002F71CD10D70>:
Traceback (most recent call last):
File "example.py", line 13, in <module>
del agen
^^^^
RuntimeError: async generator ignored GeneratorExit
This example can be fixed as follows:
import asyncio
async def agenfn():
try:
yield 10
finally:
await asyncio.sleep(0)
async def main():
agen = agenfn()
print(await anext(agen))
del agen
asyncio.run(main())
Avoid concurrent iteration and closure of the same generator¶
Async generators may be reentered while another
__anext__() / athrow() / aclose() call is in
progress. This may lead to an inconsistent state of the async generator and can
cause errors.
Let's consider the following example:
import asyncio
async def consumer():
for idx in range(100):
await asyncio.sleep(0)
message = yield idx
print('received', message)
async def amain():
agenerator = consumer()
await agenerator.asend(None)
fa = asyncio.create_task(agenerator.asend('A'))
fb = asyncio.create_task(agenerator.asend('B'))
await fa
await fb
asyncio.run(amain())
輸出:
received A
Traceback (most recent call last):
File "test.py", line 38, in <module>
asyncio.run(amain())
~~~~~~~~~~~^^^^^^^^^
File "Lib/asyncio/runners.py", line 204, in run
return runner.run(main)
~~~~~~~~~~^^^^^^
File "Lib/asyncio/runners.py", line 127, in run
return self._loop.run_until_complete(task)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
File "Lib/asyncio/base_events.py", line 719, in run_until_complete
return future.result()
~~~~~~~~~~~~~^^
File "test.py", line 36, in amain
await fb
RuntimeError: anext(): asynchronous generator is already running
Therefore, it is recommended to avoid using asynchronous generators in parallel tasks or across multiple event loops.