用 asyncio 开发

异步编程与传统的“顺序”编程不同。

本页列出常见的错误和陷阱,并解释如何避免它们。

调试模式

默认情况下,asyncio 以生产模式运行。为了简化开发,asyncio 还有一种*调试模式* 。

有几种方法可以启用异步调试模式:

除了启用调试模式外,还要考虑:

  • asyncio logger 的级别设为 logging.DEBUG,例如下面的代码片段可以在应用程序启动时运行:

    logging.basicConfig(level=logging.DEBUG)
    
  • 配置 warnings 模块以显示 ResourceWarning 警告。一种方法是使用 -W default 命令行选项。

启用调试模式时:

  • 许多非线程安全的异步 APIs (例如 loop.call_soon()loop.call_at() 方法),如果从错误的线程调用,则会引发异常。

  • 如果执行 I/O 操作花费的时间太长,则记录 I/O 选择器的执行时间。

  • 执行时间超过 100 毫秒的回调会被写入日志。 loop.slow_callback_duration 属性可用于设置以秒为单位的将被视为“缓慢”的最小执行持续时间。

并发性和多线程

事件循环在线程中运行 (通常是主线程),并在其线程中执行所有回调和任务。当一个任务在事件循环中运行时,没有其他任务可以在同一个线程中运行。当一个任务执行一个 await 表达式时,正在运行的任务被挂起,事件循环执行下一个任务。

要调度来自另一 OS 线程的 callback,应该使用 loop.call_soon_threadsafe() 方法。 例如:

loop.call_soon_threadsafe(callback, *args)

几乎所有异步对象都不是线程安全的,这通常不是问题,除非在任务或回调函数之外有代码可以使用它们。如果需要这样的代码来调用低级异步 API,应该使用 loop.call_soon_threadsafe() 方法,例如:

loop.call_soon_threadsafe(fut.cancel)

要从不同的 OS 线程调度一个协程对象,应该使用 run_coroutine_threadsafe() 函数。它返回一个 concurrent.futures.Future。查询结果:

async def coro_func():
     return await asyncio.sleep(1, 42)

# 随后在另一个 OS 线程中:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# 等待结果:
result = future.result()

为了能处理信号,事件循环必须在主线程中运行。

loop.run_in_executor() 方法可以配合 concurrent.futures.ThreadPoolExecutorInterpreterPoolExecutor 使用以在不同的 OS 线程中执行阻塞型的代码而不会阻塞事件循环运行所在的 OS 线程。

目前没有办法直接从另一个进程(如使用 multiprocessing 启动的进程)安排协程或回调。 事件循环方法集 小节列出了一些可以从管道读取并监视文件描述符而不会阻塞事件循环的 API。此外,asyncio 的 子进程 API 提供了一种启动进程并在事件循环中与其通信的办法。最后,之前提到的 loop.run_in_executor() 方法也可以配合 concurrent.futures.ProcessPoolExecutor 使用以在另一个进程中执行代码。

运行阻塞的代码

不应该直接调用阻塞 ( CPU 绑定) 代码。例如,如果一个函数执行 1 秒的 CPU 密集型计算,那么所有并发异步任务和 IO 操作都将延迟 1 秒。

可以使用执行器让任务运行在不同的线程中,包括不同的解释器中,或者甚至在不同的进程中以避免阻塞事件循环所在的 OS 线程。请参阅 loop.run_in_executor() 方法了解详情。

日志记录

asyncio 使用 logging 模块,所有日志记录都是通过 "asyncio" logger 执行的。

默认的日志级别是 logging.INFO,它可以被方便地调整:

logging.getLogger("asyncio").setLevel(logging.WARNING)

网络日志会阻塞事件循环。建议使用一个单独线程来处理日志或者使用非阻塞式 IO。例如,可以参阅 处理日志 handler 的阻塞

检测从未被等待的协程

当协程函数被调用而不是被等待时 (即执行 coro() 而不是 await coro()) 或者协程没有通过 asyncio.create_task() 被排入计划日程,asyncio 将会发出一条 RuntimeWarning:

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

输出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

调试模式的输出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

通常的修复方法是等待协程或者调用 asyncio.create_task() 函数:

async def main():
    await test()

检测从未被获取的异常

如果调用了 Future.set_exception(),但 Future 对象从未被等待,异常将永远不会传播到用户代码。在这种情况下,当 Future 对象被垃圾收集时,asyncio 将发出一条日志消息。

未处理异常的例子:

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

输出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

激活调试模式 以获取任务创建处的跟踪信息:

asyncio.run(main(), debug=True)

调试模式的输出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

异步生成器最佳实践

要编写正确且高效的 asyncio 代码就必须注意某些陷阱。 本节概述了可为您节省大量调试时间的关键最佳实践。

显式地关闭异步生成器

建议手动关闭 异步生成器。 如果一个生成器过早关闭 —— 例如,由于在 async for 循环体中引发了异常 —— 它的异步清理代码可能会在非预期的上下文中运行。 这可能发生在它所依赖的任务完成之后,或是在异步生成器的垃圾收集钩子被调用时事件循环关闭期间。

要避免此问题,请通过调用生成器的 aclose() 方法显式地关闭它,或是使用 contextlib.aclosing() 上下文管理器:

import asyncio
import contextlib

async def gen():
  yield 1
  yield 2

async def func():
  async with contextlib.aclosing(gen()) as g:
    async for x in g:
      break  # 不迭代一直到结束

asyncio.run(func())

如上所述,这些异步生成器的清理代码会延迟执行。 下面的例子显示异步生成器的最终化可能会以非预期的顺序发生:

import asyncio
work_done = False

async def cursor():
    try:
        yield 1
    finally:
        assert work_done

async def rows():
    global work_done
    try:
        yield 2
    finally:
        await asyncio.sleep(0.1) # immitate some async work
        work_done = True


async def main():
    async for c in cursor():
        async for r in rows():
            break
        break

asyncio.run(main())

对于这个例子,我们将得到以下输出:

unhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-3' coro=<<async_generator_athrow without __name__>()> exception=AssertionError()>
Traceback (most recent call last):
  File "example.py", line 6, in cursor
    yield 1
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "example.py", line 8, in cursor
    assert work_done
           ^^^^^^^^^
AssertionError

cursor() 异步生成器会在 rows 生成器之前执行最终化 —— 这是非预期的行为。

这个例子可通过显式地关闭 cursorrows 异步生成器来修正问题:

async def main():
    async with contextlib.aclosing(cursor()) as cursor_gen:
        async for c in cursor_gen:
            async with contextlib.aclosing(rows()) as rows_gen:
                async for r in rows_gen:
                    break
            break

仅在事件循环正在运行时创建异步生成器

推荐做法是只在事件循环已被创建之后才创建 异步生成器

To ensure that asynchronous generators close reliably, the event loop uses the sys.set_asyncgen_hooks() function to register callback functions. These callbacks update the list of running asynchronous generators to keep it in a consistent state.

When the loop.shutdown_asyncgens() function is called, the running generators are stopped gracefully and the list is cleared.

The asynchronous generator invokes the corresponding system hook during its first iteration. At the same time, the generator records that the hook has been called and does not call it again.

Therefore, if iteration begins before the event loop is created, the event loop will not be able to add the generator to its list of active generators because the hooks are set after the generator attempts to call them. Consequently, the event loop will not be able to terminate the generator if necessary.

比如下面的例子:

import asyncio

async def agenfn():
    try:
        yield 10
    finally:
        await asyncio.sleep(0)


with asyncio.Runner() as runner:
    agen = agenfn()
    print(runner.run(anext(agen)))
    del agen

输出:

10
Exception ignored while closing generator <async_generator object agenfn at 0x000002F71CD10D70>:
Traceback (most recent call last):
  File "example.py", line 13, in <module>
    del agen
        ^^^^
RuntimeError: async generator ignored GeneratorExit

该示例可用以下方式修复:

import asyncio

async def agenfn():
    try:
        yield 10
    finally:
        await asyncio.sleep(0)

async def main():
    agen = agenfn()
    print(await anext(agen))
    del agen

asyncio.run(main())

避免同一迭代器的并发迭代和闭包

异步生成器可以在其他 __anext__() / athrow() / aclose() 调用正在进行时重新进入。 这可能导致异步生成器状态不一致并造成错误。

Let's consider the following example:

import asyncio

async def consumer():
    for idx in range(100):
        await asyncio.sleep(0)
        message = yield idx
        print('received', message)

async def amain():
    agenerator = consumer()
    await agenerator.asend(None)

    fa = asyncio.create_task(agenerator.asend('A'))
    fb = asyncio.create_task(agenerator.asend('B'))
    await fa
    await fb

asyncio.run(amain())

输出:

received A
Traceback (most recent call last):
  File "test.py", line 38, in <module>
    asyncio.run(amain())
    ~~~~~~~~~~~^^^^^^^^^
  File "Lib/asyncio/runners.py", line 204, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "Lib/asyncio/runners.py", line 127, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "Lib/asyncio/base_events.py", line 719, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
  File "test.py", line 36, in amain
    await fb
RuntimeError: anext(): asynchronous generator is already running

Therefore, it is recommended to avoid using asynchronous generators in parallel tasks or across multiple event loops.