事件迴圈

原始碼: Lib/asyncio/events.pyLib/asyncio/base_events.py


前言

事件迴圈是每個 asyncio 應用程式的核心。事件迴圈執行非同步任務和回呼、執行網路 IO 操作並啟動子行程。

應用程式開發人員通常應使用高階的 asyncio 函式,例如 asyncio.run(),並且很少需要參照事件迴圈物件或呼叫其方法。本節主要針對那些需要更細粒度控制事件迴圈行為的低階程式碼、函式庫和框架的作者。

取得事件迴圈

以下的低階函式可用於取得、設置或建立事件迴圈:

asyncio.get_running_loop()

在當前作業系統執行緒中回傳正在運行的事件迴圈。

如果沒有運行的事件迴圈,則引發 RuntimeError

此函式只能從協程或回呼函式中呼叫。

在 3.7 版被加入.

asyncio.get_event_loop()

取得目前的事件迴圈。

當從協程或回呼函式呼叫此函式(例如,使用 call_soon 或類似的 API 於排程呼叫),此函式將永遠回傳正在運行的事件迴圈。

如果沒有設定正在運行的事件迴圈,該函式將回傳 get_event_loop_policy().get_event_loop() 呼叫的結果。

由於此函式具有相當複雜的行為(尤其是在使用自訂事件迴圈策略時),在協程和回呼函式中,建議使用 get_running_loop() 函式,而不是 get_event_loop()

如上所述,可以考慮使用高階的 asyncio.run() 函式,而不是使用這些較低階的函式手動建立和關閉事件迴圈。

在 3.12 版之後被棄用: 如果沒有當前事件迴圈,則會發出棄用警告。在未來的某個 Python 發行版中,這將變成錯誤。

asyncio.set_event_loop(loop)

loop 設置為當前 OS 執行緒的當前事件迴圈。

asyncio.new_event_loop()

建立並回傳新的事件迴圈物件。

請注意 get_event_loop()set_event_loop()new_event_loop() 函式的行為可以透過設定自訂事件迴圈策略進行調整。

目錄

本頁文件包含以下章節:

事件迴圈方法

事件迴圈提供以下低階 API:

啟動和停止迴圈

loop.run_until_complete(future)

運行直到 future (一個 Future 實例)完成。

如果引數是協程物件,則它將被隱式排程為 asyncio.Task 運行。

回傳 Future 的結果或引發其例外。

loop.run_forever()

運行事件迴圈直到 stop() 被呼叫。

如果在呼叫 run_forever() 之前呼叫 stop(),則迴圈將使用超時為零的方式輪詢 I/O 選擇器,運行所有回應 I/O 事件(以及已經排程的事件)的回呼函數,然後退出。

如果在 run_forever() 運行時呼叫 stop(),則迴圈將運行當前批次的回呼函式,然後退出。請注意,由回呼函式排程的新回呼在此情況下不會運行;而是在下次呼叫 run_forever()run_until_complete() 時運行。

loop.stop()

停止事件迴圈。

loop.is_running()

如果事件迴圈當前正在運行,則回傳 True

loop.is_closed()

如果事件迴圈已關閉,則回傳 True

loop.close()

關閉事件迴圈。

不得於迴圈運行中呼叫此函式。將丟棄任何待處理的回呼。

此方法清除所有佇列並關閉執行器,但不等待執行器完成。

此方法是冪等且不可逆的。在事件迴圈關閉後不應呼叫其他方法。

coroutine loop.shutdown_asyncgens()

排程所有當前打開的非同步產生器物件使用 aclose() 呼叫來關閉。呼叫此方法後,如果疊代新的非同步產生器,事件迴圈將發出警告。應該使用此方法可靠地完成所有已排程的非同步產生器。

請注意,使用 asyncio.run() 時不需要呼叫此函式。

範例:

try:
    loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()

在 3.6 版被加入.

coroutine loop.shutdown_default_executor(timeout=None)

排程預設執行器的關閉,並等待它加入 ThreadPoolExecutor 中的所有執行緒。一旦呼叫了此方法,使用預設執行器與 loop.run_in_executor() 將引發 RuntimeError

timeout 參數指定執行器完成加入所需的時間(以 float 秒為單位)。預設情況下為 None,不會限制執行器所花費的時間。

如果達到 timeout,將發出 RuntimeWarning 警告,預設執行器將立即終止,不等待其執行緒完成加入。

備註

使用 asyncio.run() 時請勿呼叫此方法,因為後者會自動處理預設執行器的關閉。

在 3.9 版被加入.

在 3.12 版的變更: 加入 timeout 參數。

排程回呼函式

loop.call_soon(callback, *args, context=None)

在事件迴圈的下一次疊代中排程以 args 引數呼叫 callback callback

回傳 asyncio.Handle 的實例,稍後可以用於取消回呼函式。

回呼函式按照其註冊的順序呼叫。每個回呼函式將被呼叫恰好一次。

選用的僅限關鍵字引數 context 指定了要給 callback 執行的自定義 contextvars.Context。當未提供 context 時,回呼函式使用當前情境。

call_soon_threadsafe() 不同,此方法不是執行緒安全的。

loop.call_soon_threadsafe(callback, *args, context=None)

這是 call_soon() 的執行緒安全變體。當從另一個執行緒排程回呼函式時,必須使用此函式,因為 call_soon() 不是執行緒安全的。

如果在已關閉的迴圈上呼叫,則引發 RuntimeError。在主應用程式關閉時,這可能發生在次要執行緒上。

請參閱文件的並行和多執行緒部分。

在 3.7 版的變更: 新增了 context 僅限關鍵字參數。詳細資訊請參閱 PEP 567

備註

大多數 asyncio 排程函式不允許傳遞關鍵字引數。要傳遞關鍵字引數,請使用 functools.partial()

# will schedule "print("Hello", flush=True)"
loop.call_soon(
    functools.partial(print, "Hello", flush=True))

通常使用 partial 物件比使用 lambda 更方便,因為 asyncio 可以在除錯和錯誤訊息中更好地呈現 partial 物件。

排程延遲的回呼函式

事件迴圈提供為回呼函式排程在將來某個時間點才呼叫的機制。事件迴圈使用了單調時鐘來追蹤時間。

loop.call_later(delay, callback, *args, context=None)

排程 callback 在給定的 delay 秒數後呼叫(可以是整數或浮點數)。

回傳 asyncio.TimerHandle 的實例,可用於取消回呼函式。

callback 將只被呼叫恰好一次。如果有兩個回呼函式被排程在完全相同的時間,則其呼叫順序是不定的。

可選的位置引數 args 將在呼叫回呼函式時傳遞。如果要使用關鍵字引數呼叫回呼函數,請使用 functools.partial()

可選的僅限關鍵字 context 引數允許為 callback 指定自定義的 contextvars.Context 以提供運行。當未提供 context 時,將使用當前情境。

在 3.7 版的變更: 新增了 context 僅限關鍵字參數。詳細資訊請參閱 PEP 567

在 3.8 版的變更: 在 Python 3.7 及更早版本中,使用預設事件迴圈實作時,delay 不能超過一天。這在 Python 3.8 中已經修復。

loop.call_at(when, callback, *args, context=None)

排程 callback 在給定的絕對時間戳 when (整數或浮點數)處呼叫,使用與 loop.time() 相同的時間參照。

此方法的行為與 call_later() 相同。

回傳 asyncio.TimerHandle 的實例,可用於取消回呼函式。

在 3.7 版的變更: 新增了 context 僅限關鍵字參數。詳細資訊請參閱 PEP 567

在 3.8 版的變更: 在 Python 3.7 及更早版本中,使用預設事件迴圈實作時,when 和當前時間之間的差值不能超過一天。這在 Python 3.8 中已經修復。

loop.time()

根據事件迴圈的內部單調時鐘,回傳當前時間,以 float 值表示。

備註

在 3.8 版的變更: 在 Python 3.7 及更早版本中,超時(相對 delay 或絕對 when)不應超過一天。這在 Python 3.8 中已經修復。

也參考

函式 asyncio.sleep()

建立 Futures 和 Tasks

loop.create_future()

建立附加到事件迴圈的 asyncio.Future 物件。

這是在 asyncio 中建立 Futures 的首選方式。這允許第三方事件迴圈提供 Future 物件的替代實作(具有更好的性能或儀器計測表現)。

在 3.5.2 版被加入.

loop.create_task(coro, *, name=None, context=None)

排程執行協程 coro。回傳 Task 物件。

第三方事件迴圈可以使用其自己的 Task 子類別以實現互操作性(interoperability)。在這種情況下,結果類型是 Task 的子類別。

如果提供了 name 引數且不為 None,則將其設置為任務的名稱,使用 Task.set_name()

可選的僅限關鍵字 context 引數允許為 coro 指定自定義的 contextvars.Context 以提供運行。當未提供 context 時,將建立當前情境的副本。

在 3.8 版的變更: 加入 name 參數。

在 3.11 版的變更: 加入 context 參數。

loop.set_task_factory(factory)

設置將由 loop.create_task() 使用的任務工廠。

如果 factoryNone,將設置預設的任務工廠。否則,factory 必須是一個具有匹配簽名 (loop, coro, context=None)callable,其中 loop 是有效事件迴圈的參照,coro 是一個協程物件。該可呼叫物件必須回傳一個與 asyncio.Future 相容的物件。

loop.get_task_factory()

回傳任務工廠,如果使用預設任務工廠則回傳 None

打開網路連線

coroutine loop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None, all_errors=False)

打開以 hostport 指定之給定地址的串流傳輸連線。

根據 host*(或提供的 *family 引數)的情況,socket 家族可以是 AF_INETAF_INET6

Socket 類型將為 SOCK_STREAM

protocol_factory 必須是一個回傳 asyncio protocol 實作的可呼叫函式。

此方法將嘗試在背景建立連線。成功時,它將回傳一對 (transport, protocol)

底層操作的時間軸簡介如下:

  1. 建立連線並為其建立傳輸

  2. protocol_factory 在無引數的情況下被呼叫,並且預計回傳一個 協定 實例。

  3. 通過呼叫其 connection_made() 方法,將協議實例與傳輸連線在一起。

  4. 成功時回傳一個 (transport, protocol) 元組。

建立的傳輸是一個依賴實作的雙向串流。

其他引數:

  • 若有給定 ssl 且非 false,將建立 SSL/TLS 傳輸(預設建立普通 TCP 傳輸)。如果 sslssl.SSLContext 物件,則使用該情境來建立傳輸;如果 sslTrue,則使用 ssl.create_default_context() 回傳的預設情境。

  • server_hostname 設置或覆蓋目標伺服器憑證將匹配的主機名稱。僅在 ssl 不為 None 時傳遞。預設情況下,將使用 host 引數的值。如果 host 為空,則沒有預設值,必須傳遞 server_hostname 的值。若 server_hostname 為空字串,將停用主機名稱匹配(這是一個嚴重的安全風險,可能導致中間人攻擊)。

  • familyprotoflags 是可選的位址家族、協議和旗標,用於傳遞至 getaddrinfo() 進行 host 解析。若有給定這些應該都是相應 socket 模組常數的整數。

  • 若有給定,happy_eyeballs_delay 會啟用此連線的 Happy Eyeballs。它應該是一個浮點數,表示等待連線嘗試完成的秒數,然後在並行啟動下一次嘗試。這是 RFC 8305 中定義的「連線嘗試延遲」。RFC 建議的合理預設值為 0.25 秒(250 毫秒)。

  • interleave 控制主機名稱解析為多個 IP 位址時的地址重新排序。若為 0 或未指定,將不執行重排序,並按 getaddrinfo() 回傳的順序嘗試位址。如果指定正整數,則按地址家族交錯排列,給定的整數直譯為 RFC 8305 中定義的「首個地址家族計數」。如果未指定 happy_eyeballs_delay,則預設值為 0,如果指定則為 1

  • 若有給定 sock 則其應為已存在且已連線的 socket.socket 物件,可供傳輸使用。如果提供了 sock,則不應指定 hostportfamilyprotoflagshappy_eyeballs_delayinterleavelocal_addr 中的任何一項。

    備註

    引數 sock 將 socket 所有權轉移給所建立的傳輸 socket,請呼叫傳輸的 close() 方法。

  • 若有給定 local_addr 則其為一個 (local_host, local_port) 元組,用於在本地綁定 socket。將使用 getaddrinfo() 查找 local_hostlocal_port,方式類似於 hostport

  • ssl_handshake_timeout (對於 TLS 連線)是等待 TLS 交握的時間,以秒為單位,在那之前若未完成則會中斷連線。如果為 None (預設值),則會等待 60.0 秒。

  • ssl_shutdown_timeout 是等待 SSL 關閉完成以前中斷連線的時間,以秒為單位。如果為 None (預設值),則會等待 30.0 秒。

  • all_errors 決定在無法建立連線時會引發哪些例外。預設情況下,只會引發單一 Exception:如果只有一個例外或所有錯誤訊息相同,則引發第一個例外,否則引發包含所有錯誤訊息的單一 OSError。當 all_errorsTrue 時,將引發包含所有例外的 ExceptionGroup (即使只有一個例外)。

在 3.5 版的變更: 新增 ProactorEventLoop 中的 SSL/TLS 支援。

在 3.6 版的變更: 所有 TCP 連線都預設有 socket.TCP_NODELAY socket 選項。

在 3.7 版的變更: 增加 ssl_handshake_timeout 參數。

在 3.8 版的變更: 加入 happy_eyeballs_delayinterleave 參數。

Happy Eyeballs 演算法:雙協定堆疊主機 (Dual-Stack Hosts) 的成功。當伺服器的 IPv4 路徑和協議運作正常,但伺服器的 IPv6 路徑和協議不運作時,雙棧用戶端應用程式會比僅具 IPv4 的用戶端體驗到顯著的連線延遲。這是不希望的,因為這會導致雙棧用戶端的使用者體驗變差。本文件具體說明了減少此用戶可見延遲的演算法要求並提供了一種演算法。

更多資訊請見: https://datatracker.ietf.org/doc/html/rfc6555

在 3.11 版的變更: 增加 ssl_shutdown_timeout 參數。

在 3.12 版的變更: 已新增 all_errors

也參考

函式 open_connection() 是高階的替代 API。它回傳一對 (StreamReader, StreamWriter) 可直接在 async/await 程式碼中使用。

coroutine loop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_port=None, allow_broadcast=None, sock=None)

建立一個資料報連線。

Socket 家族可以是 AF_INETAF_INET6AF_UNIX,視乎 host(或提供的 family 引數)而定。

Socket 類型將為 SOCK_DGRAM

protocol_factory 必須是可呼叫的函式,回傳 protocol 實作。

成功時回傳 (transport, protocol) 元組。

其他引數:

  • local_addr,如果提供,是一個 (local_host, local_port) 元組,用於在本地綁定 socket。local_hostlocal_port 使用 getaddrinfo() 來查找。

  • remote_addr,如果提供,是一個 (remote_host, remote_port) 元組,用於將 socket 連線到遠端位址。 remote_hostremote_port 使用 getaddrinfo() 來查找。

  • familyprotoflags 是用於傳遞給 getaddrinfo() 以解析 host 的可選地址家族、協定和旗標。如果提供,這些應該都是來自相應的 socket 模組常數的整數。

  • reuse_port 告訴核心允許將此端點綁定到與其他現有端點相同的埠,只要它們在建立時都設定了此旗標。此選項不受 Windows 和某些 Unix 系統支援。如果未定義 SO_REUSEPORT 常數,則不支援此功能。

  • allow_broadcast 告訴核心允許此端點向廣播位址發送訊息。

  • sock 可以選擇性地指定,以使用預先存在且已連線的 socket.socket 物件供傳輸使用。如果指定,local_addrremote_addr 應省略(必須是 None)。

    備註

    引數 sock 將 socket 所有權轉移給所建立的傳輸 socket,請呼叫傳輸的 close() 方法。

請參閱 UDP 回應用戶端協議UDP 回應伺服器協議 範例。

在 3.4.4 版的變更: 新增 familyprotoflagsreuse_addressreuse_portallow_broadcastsock 參數。

在 3.8 版的變更: 新增對於 Windows 的支援。

在 3.8.1 版的變更: 不再支援 reuse_address 參數,因為使用 SO_REUSEADDR 對於 UDP 存有重大的安全疑慮。明確傳遞 reuse_address=True 將引發例外。

當具有不同 UID 的多個行程使用 SO_REUSEADDR 將 socket 分配給相同的 UDP socket 地址時,傳入的封包可能會在 socket 之間隨機分佈。

對於有支援的平台,reuse_port 可以用作類似功能的替代方案。使用 reuse_port,將改為使用 SO_REUSEPORT,該選項明確禁止具有不同 UID 的行程將 socket 分配給相同的 socket 地址。

在 3.11 版的變更: 自 Python 3.9.0、3.8.1、3.7.6 和 3.6.10 起,已完全移除 reuse_address 參數。

coroutine loop.create_unix_connection(protocol_factory, path=None, *, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

建立一個 Unix 連線。

Socket 家族將為 AF_UNIX;socket 類型將為 SOCK_STREAM

成功時回傳 (transport, protocol) 元組。

path 是 Unix 域 socket 的名稱,除非指定 sock 參數,否則為必填。支援抽象 Unix sockets、strbytesPath 路徑。

有關此方法的引數資訊,請參閱 loop.create_connection() 方法的文件。

Availability: Unix.

在 3.7 版的變更: 新增 ssl_handshake_timeout 參數。path 參數現在可以是 path-like object

在 3.11 版的變更: 增加 ssl_shutdown_timeout 參數。

建立網路伺服器

coroutine loop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

建立一個 TCP 伺服器(socket 類型 SOCK_STREAM),監聽 host 位址的 port

回傳一個 Server 物件。

引數:

  • protocol_factory 必須是可呼叫的函式,回傳 protocol 實作。

  • 可以將 host 參數設為幾種類型,以確定伺服器將監聽的位置:

    • 如果 host 是字串,則 TCP 伺服器綁定到由 host 指定的單個網路介面。

    • 如果 host 是字串序列,則 TCP 伺服器綁定到序列指定的所有網路介面。

    • host 是空字串或 None,則所有介面都被假定並回傳多個 socket 的清單(可能一個用於 IPv4,另一個用於 IPv6)。

  • 可以設定 port 參數以指定伺服器應該監聽的埠。如果是 0None(預設值),將隨機選擇一個未使用的埠(請注意,如果 host 解析為多個網路介面,將為每個介面隨機選擇不同的隨機埠)。

  • family 可以設定為 socket.AF_INETAF_INET6 以強制使用 IPv4 或 IPv6。如果未設定,family 將從主機名稱決定(預設為 AF_UNSPEC)。

  • flagsgetaddrinfo() 的位元遮罩。

  • 可以可選地指定 sock 以使用現有的 socket 物件。如果指定了,hostport 不能指定。

    備註

    sock 引數將 socket 的所有權轉移給建立的伺服器。要關閉 socket,請呼叫伺服器的 close() 方法。

  • backlog 是傳遞給 listen() 的最大佇列連線數(預設為 100)。

  • ssl 可以設定為 SSLContext 實例以在接受的連線上啟用 TLS。

  • reuse_address 告訴內核重用 TIME_WAIT 狀態下的本地 socket,而不等待其自然超時過期。如果未指定,在 Unix 上將自動設置為 True

  • reuse_port 告訴內核允許此端點繫結到與其他現有端點繫結的相同埠,只要它們在建立時都設置了此旗標。此選項在旗標 Windows 上不受支援。

  • (對於 TLS 伺服器)ssl_handshake_timeout 是在中斷連線之前等待 TLS 握手完成的時間(以秒為單位)。如果為 None(預設),則為 60.0 秒。

  • ssl_shutdown_timeout 是等待 SSL 關閉完成以前中斷連線的時間,以秒為單位。如果為 None (預設值),則會等待 30.0 秒。

  • start_serving 設置為 True(預設)將使建立的伺服器立即開始接受連接。當設置為 False 時,用戶應該等待 Server.start_serving()Server.serve_forever() 來使伺服器開始接受連線。

在 3.5 版的變更: 新增 ProactorEventLoop 中的 SSL/TLS 支援。

在 3.5.1 版的變更: host 參數可以是字串序列。

在 3.6 版的變更: 新增 ssl_handshake_timeoutstart_serving 參數。所有 TCP 連線都預設有 socket.TCP_NODELAY socket 選項。

在 3.11 版的變更: 增加 ssl_shutdown_timeout 參數。

也參考

start_server() 函式是一個更高階的替代 API,它回傳一對 StreamReaderStreamWriter,可以在 async/await 程式碼中使用。

coroutine loop.create_unix_server(protocol_factory, path=None, *, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

類似 loop.create_server(),但適用於 AF_UNIX socket 家族。

path 是 Unix 域 socket 的名稱,除非提供了 sock 引數,否則必須給定。支援抽象 Unix sockets、strbytesPath 路徑。

有關此方法的引數資訊,請參閱 loop.create_server() 方法的文件。

Availability: Unix.

在 3.7 版的變更: 新增 ssl_handshake_timeoutstart_serving 參數。path 參數現在可為一個 Path 物件。

在 3.11 版的變更: 增加 ssl_shutdown_timeout 參數。

coroutine loop.connect_accepted_socket(protocol_factory, sock, *, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

將已接受的連線包裝成傳輸層/協議對。

此方法可以由在 asyncio 外接受連線但使用 asyncio 處理連線的伺服器使用。

參數:

  • protocol_factory 必須是可呼叫的函式,回傳 protocol 實作。

  • sock 是從 socket.accept 回傳的預先存在的 socket 物件。

    備註

    引數 sock 將 socket 所有權轉移給所建立的傳輸 socket,請呼叫傳輸的 close() 方法。

  • ssl 可以設置為 SSLContext 以在已接受的連線上啟用 SSL。

  • (對於 SSL 連線)ssl_handshake_timeout 是在中斷連線之前等待 SSL 握手完成的時間(以秒為單位)。如果為 None(預設),則為 60.0 秒。

  • ssl_shutdown_timeout 是等待 SSL 關閉完成以前中斷連線的時間,以秒為單位。如果為 None (預設值),則會等待 30.0 秒。

回傳 (transport, protocol) 對。

在 3.5.3 版被加入.

在 3.7 版的變更: 增加 ssl_handshake_timeout 參數。

在 3.11 版的變更: 增加 ssl_shutdown_timeout 參數。

傳輸檔案

coroutine loop.sendfile(transport, file, offset=0, count=None, *, fallback=True)

通過 transport 發送 file。回傳發送的總位元組數。

如果可用,該方法使用高性能 os.sendfile()

file 必須是以二進位模式打開的常規檔案物件。

offset 告訴從哪裡開始讀取檔案。如果指定了,count 是要傳輸的總位元組數,而不是發送檔案直到達到 EOF。即使此方法引發錯誤時,檔案位置也始終更新,可以使用 file.tell() 取得實際發送的位元組數。

fallback 設置為 True 會使 asyncio 在平台不支援 sendfile 系統呼叫時(例如 Windows 或 Unix 上的 SSL socket)手動讀取和發送檔案。

如果系統不支援 sendfile 系統呼叫且 fallbackFalse,則引發 SendfileNotAvailableError

在 3.7 版被加入.

TLS 升級

coroutine loop.start_tls(transport, protocol, sslcontext, *, server_side=False, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

將基於傳輸的現有連線升級到 TLS。

建立 TLS 編解碼器實例並在 transportprotocol 之間插入它。編解碼器既實作了對於 transport 的協議,也實作了對於 protocol 的傳輸。

回傳建立的雙介面實例。在 await 後,protocol 必須停止使用原始的 transport,僅與回傳的物件通信,因為編碼器快取了 protocol 端的資料,並且與 transport 間歇性地交換額外的 TLS session 封包。

在某些情況下(例如傳入的傳輸已經關閉),此函式可能回傳 None

參數:

  • transportprotocol 實例,由像 create_server()create_connection() 等方法回傳。

  • sslcontext:配置好的 SSLContext 實例。

  • 當升級伺服器端連線時(像由 create_server() 建立的那樣)傳遞 True

  • server_hostname:設置或覆蓋將用於匹配目標伺服器憑證的主機名。

  • ssl_handshake_timeout (對於 TLS 連線)是等待 TLS 交握的時間,以秒為單位,在那之前若未完成則會中斷連線。如果為 None (預設值),則會等待 60.0 秒。

  • ssl_shutdown_timeout 是等待 SSL 關閉完成以前中斷連線的時間,以秒為單位。如果為 None (預設值),則會等待 30.0 秒。

在 3.7 版被加入.

在 3.11 版的變更: 增加 ssl_shutdown_timeout 參數。

監視檔案描述器

loop.add_reader(fd, callback, *args)

開始監視 fd 檔案描述器的讀取可用性,一但 fd 可讀取,使用指定引數呼叫 callback

loop.remove_reader(fd)

停止監視 fd 檔案描述器的讀取可用性。如果 fd 之前正在監視讀取,則回傳 True

loop.add_writer(fd, callback, *args)

開始監視 fd 檔案描述器的寫入可用性,一旦 fd 可寫入,使用指定引數呼叫 callback

使用 functools.partial()callback 傳送關鍵字引數

loop.remove_writer(fd)

停止監視 fd 檔案描述器的寫入可用性。如果 fd 之前正在監視寫入,則回傳 True

另請參閱平台支援部分以了解這些方法的一些限制。

直接使用 socket 物件

一般情況下,使用基於傳輸的 API(如 loop.create_connection()loop.create_server())的協議實作比直接使用 socket 的實作更快。然而在某些情況下性能不是關鍵,直接使用 socket 物件更方便。

coroutine loop.sock_recv(sock, nbytes)

sock 接收最多 nbytessocket.recv() 的非同步版本。

將接收到的資料作為 bytes 物件回傳。

sock 必須是非阻塞 socket。

在 3.7 版的變更: 儘管此方法一直記錄為協程方法,但 Python 3.7 之前的版本回傳 Future。自 Python 3.7 起,這是 async def 方法。

coroutine loop.sock_recv_into(sock, buf)

sock 接收資料到 buf 緩衝區。仿照阻塞 socket.recv_into() 方法。

回傳寫入緩衝區位元組的數目。

sock 必須是非阻塞 socket。

在 3.7 版被加入.

coroutine loop.sock_recvfrom(sock, bufsize)

sock 接收最多 bufsize 大小的資料單元。socket.recvfrom() 的非同步版本。

回傳一個元組 (received data, remote address)。

sock 必須是非阻塞 socket。

在 3.11 版被加入.

coroutine loop.sock_recvfrom_into(sock, buf, nbytes=0)

sock 接收最多 nbytes 大小的資料單元到 bufsocket.recvfrom_into() 的非同步版本。

回傳一個元組 (number of bytes received, remote address)。

sock 必須是非阻塞 socket。

在 3.11 版被加入.

coroutine loop.sock_sendall(sock, data)

data 發送到 sock socket。socket.sendall() 的非同步版本。

此方法將繼續發送到 socket,直到 data 中的所有資料都已發送或發生錯誤。成功時回傳 None。錯誤時引發例外。此外,沒有辦法確定接收端成功處理了多少資料(如果有的話)。

sock 必須是非阻塞 socket。

在 3.7 版的變更: 儘管該方法一直被記錄為協程方法,但在 Python 3.7 之前它回傳 Future。從 Python 3.7 開始,這是一個 async def 方法。

coroutine loop.sock_sendto(sock, data, address)

sockaddress 發送一個資料單元。socket.sendto() 的非同步版本。

回傳發送的位元組數。

sock 必須是非阻塞 socket。

在 3.11 版被加入.

coroutine loop.sock_connect(sock, address)

sock 連線到位於 address 的遠端 socket。

socket.connect() 的非同步版本。

sock 必須是非阻塞 socket。

在 3.5.2 版的變更: 不再需要解析 addresssock_connect 將嘗試透過呼叫 socket.inet_pton() 檢查 address 是否已解析。如果沒有,將使用 loop.getaddrinfo() 解析 address

coroutine loop.sock_accept(sock)

接受一個連線。模擬阻塞的 socket.accept() 方法。

Socket 必須繫結到一個地址並偵聽連線。回傳值是一個 (conn, address) 對,其中 conn 是一個 socket 物件,可在連線上發送和接收資料,address 是連接另一端對應的 socket 地址。

sock 必須是非阻塞 socket。

在 3.7 版的變更: 儘管該方法一直被記錄為協程方法,但在 Python 3.7 之前它回傳 Future。從 Python 3.7 開始,這是一個 async def 方法。

coroutine loop.sock_sendfile(sock, file, offset=0, count=None, *, fallback=True)

如果可行,使用高性能 os.sendfile 發送檔案。回傳發送的總位元組數。

socket.sendfile() 的非同步版本。

sock 必須是非阻塞的 socket.SOCK_STREAM socket

file 必須是以二進位模式打開的常規檔案物件。

offset 告訴從哪裡開始讀取檔案。如果指定了,count 是要傳輸的總位元組數,而不是發送檔案直到達到 EOF。即使此方法引發錯誤時,檔案位置也始終更新,可以使用 file.tell() 取得實際發送的位元組數。

當設置為 True 時,fallback 使 asyncio 在平台不支援 sendfile 系統呼叫時(例如 Windows 或 Unix 上的 SSL socket)手動讀取和發送檔案。

如果系統不支援 sendfile 系統呼叫且 fallbackFalse,引發 SendfileNotAvailableError

sock 必須是非阻塞 socket。

在 3.7 版被加入.

DNS

coroutine loop.getaddrinfo(host, port, *, family=0, type=0, proto=0, flags=0)

socket.getaddrinfo() 的非同步版本。

coroutine loop.getnameinfo(sockaddr, flags=0)

socket.getnameinfo() 的非同步版本。

備註

Both getaddrinfo and getnameinfo internally utilize their synchronous versions through the loop's default thread pool executor. When this executor is saturated, these methods may experience delays, which higher-level networking libraries may report as increased timeouts. To mitigate this, consider using a custom executor for other user tasks, or setting a default executor with a larger number of workers.

在 3.7 版的變更: getaddrinfogetnameinfo 方法一直被記錄為回傳協程,但在 Python 3.7 之前它們實際上回傳 asyncio.Future 物件。從 Python 3.7 開始,兩個方法都是協程。

使用管道

coroutine loop.connect_read_pipe(protocol_factory, pipe)

在事件迴圈中註冊 pipe 的讀取端。

protocol_factory 必須是一個回傳 asyncio protocol 實作的可呼叫函式。

pipe類檔案物件

回傳 (transport, protocol) 對,其中 transport 支援 ReadTransport 介面,protocol 是由 protocol_factory 實例化的物件。

使用 SelectorEventLoop 事件迴圈時,pipe 設置為非阻塞模式。

coroutine loop.connect_write_pipe(protocol_factory, pipe)

在事件迴圈中註冊 pipe 的寫入端。

protocol_factory 必須是一個回傳 asyncio protocol 實作的可呼叫函式。

pipefile-like object

回傳 (transport, protocol) 對,其中 transport 支援 WriteTransport 介面,protocol 是由 protocol_factory 實例化的物件。

使用 SelectorEventLoop 事件迴圈時,pipe 設置為非阻塞模式。

備註

SelectorEventLoop 在 Windows 上不支援上述方法。對於 Windows 請使用 ProactorEventLoop

Unix 訊號

loop.add_signal_handler(signum, callback, *args)

callback 設置為 signum 訊號的處理程式。

該回呼將由 loop 呼叫,與該事件迴圈的其他排隊回呼和可運行的協程一起。與使用 signal.signal() 註冊的訊號處理程式不同,使用此函式註冊的回呼允許與事件迴圈進行互動。

如果訊號號無效或不可捕獲,引發 ValueError。如果設定處理程序有問題,拋出 RuntimeError

使用 functools.partial()callback 傳送關鍵字引數

signal.signal() 一樣,此函式必須在主執行緒中呼叫。

loop.remove_signal_handler(sig)

移除 sig 訊號的處理程式。

如果訊號處理程式被移除,回傳 True;如果給定訊號沒有設置處理程式,回傳 False

Availability: Unix.

也參考

signal 模組。

在執行緒池或行程池中執行程式碼

awaitable loop.run_in_executor(executor, func, *args)

安排在指定的執行器中呼叫 func

The executor argument should be an concurrent.futures.Executor instance. The default executor is used if executor is None. The default executor can be set by loop.set_default_executor(), otherwise, a concurrent.futures.ThreadPoolExecutor will be lazy-initialized and used by run_in_executor() if needed.

範例:

import asyncio
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

if __name__ == '__main__':
    asyncio.run(main())

請注意,由於 multiprocessing(由 ProcessPoolExecutor 使用)的特殊性,選項 3 需要進入點保護(if __name__ == '__main__')。請參閱主模組的安全引入

此方法回傳 asyncio.Future 物件。

使用 functools.partial() 將來關鍵字引數傳遞func

在 3.5.3 版的變更: loop.run_in_executor() 不再配置它建立的執行緒池執行器的 max_workers,而是讓執行緒池執行器(ThreadPoolExecutor)設定預設值。

loop.set_default_executor(executor)

executor 設置為 run_in_executor() 使用的預設執行器。executor 必須是 ThreadPoolExecutor 的實例。

在 3.11 版的變更: executor 必須是 ThreadPoolExecutor 的實例。

錯誤處理 API

允許自定義事件迴圈中的例外處理方式。

loop.set_exception_handler(handler)

handler 設定為新的事件迴圈例外處理程式。

如果 handlerNone,則將設置預設例外處理程式。否則,handler 必須是一個可呼叫物件,簽名匹配 (loop, context),其中 loop 是參照活躍事件迴圈的,context 是包含例外詳細資訊的 dict 物件(有關情境的詳細資訊,請參閱 call_exception_handler() 文件)。

如果代表 TaskHandle 呼叫處理程式,它將在該任務或回呼處理程式的 contextvars.Context 中運行。

在 3.12 版的變更: 處理程式可能在引發例外的任務或處理程式的 Context 中被呼叫。

loop.get_exception_handler()

回傳當前的例外處理程式,如果未設置自定義例外處理程式,則回傳 None

在 3.5.2 版被加入.

loop.default_exception_handler(context)

預設例外處理程式。

當發生例外且未設置例外處理程式時呼叫此函式。自定義例外處理程式可以呼叫此函式以轉由預設處理程式處理。

context 參數與 call_exception_handler() 中的意思相同。

loop.call_exception_handler(context)

呼叫當前事件迴圈例外處理程式。

context 是一個包含以下鍵的 dict 物件(未來的 Python 版本中可能會引入新的鍵):

  • 'message':錯誤訊息;

  • 'exception'(可選):例外物件;

  • 'future'(可選): asyncio.Future 實例;

  • 'task'(可選): asyncio.Task 實例;

  • 'handle'(可選): asyncio.Handle 實例;

  • 'protocol'(可選): Protocol 實例;

  • 'transport'(可選): Transport 實例;

  • 'socket'(可選): socket.socket 實例;

  • 'asyncgen'(可選): 非同步產生器引發

    例外。

備註

此方法不應在子類別事件迴圈中被覆寫。為了自定義例外處理,請使用 set_exception_handler() 方法。

啟用除錯模式

loop.get_debug()

取得事件迴圈的除錯模式(bool)。

如果環境變數 PYTHONASYNCIODEBUG 被設定為非空字串,則預設值為 True,否則為 False

loop.set_debug(enabled: bool)

設定事件迴圈的除錯模式。

在 3.7 版的變更: 現在也可以使用新的 Python 開發模式 啟用除錯模式。

loop.slow_callback_duration

此屬性可用於設定被視為"慢"的最短執行時間(以秒為單位)。啟用偵錯模式後,"慢"回呼將被記錄。

預設值為 100 毫秒

運行子行程

本小節描述的方法是低階的。在常規的 async/await 程式碼中,請考慮使用高階 asyncio.create_subprocess_shell()asyncio.create_subprocess_exec() 輔助功能而不是。

備註

在 Windows 上,預設事件迴圈 ProactorEventLoop 支援子行程,而 SelectorEventLoop 不支援。詳細資訊請參見 Windows 上對於子行程的支援

coroutine loop.subprocess_exec(protocol_factory, *args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)

args 指定的一個或多個字串引數建立子行程。

args 必須是由以下項表示的字串串列:

第一個字串指定程序可執行檔案,其餘字串指定引數。字串引數一起組成程序的 argv

這與標準函式庫 subprocess.Popen 類似,使用 shell=False 呼叫並將字串串列作為第一個引數傳遞;然而,Popen 接受單個字串串列引數,subprocess_exec 接受多個字串引數。

protocol_factory 必須是回傳 asyncio.SubprocessProtocol 子類別的可呼叫物件。

其他參數:

  • stdin 可以是以下任意一個:

    • 類檔案物件

    • 現有的檔案描述器(正整數),例如用 os.pipe() 建立的

    • subprocess.PIPE 常數(預設),它將建立一個新的管道並連線,

    • None 將使子行程從此行程繼承檔案描述器

    • subprocess.DEVNULL 常數,表示將使用特殊的 os.devnull 檔案

  • stdout 可以是以下任意一個:

    • 類檔案物件

    • subprocess.PIPE 常數(預設),它將建立一個新的管道並連線,

    • None 將使子行程從此行程繼承檔案描述器

    • subprocess.DEVNULL 常數,表示將使用特殊的 os.devnull 檔案

  • stderr 可以是以下任意一個:

    • 類檔案物件

    • subprocess.PIPE 常數(預設),它將建立一個新的管道並連線,

    • None 將使子行程從此行程繼承檔案描述器

    • subprocess.DEVNULL 常數,表示將使用特殊的 os.devnull 檔案

    • subprocess.STDOUT 常數,它將標準錯誤串流連線到行程的標準輸出串流

  • 所有其他關鍵字引數都會傳遞給 subprocess.Popen 而不進行直譯,但 bufsizeuniversal_newlinesshelltextencodingerrors 除外,這些不應該指定。

    asyncio 子行程 API 不支援將串流解碼為文本。可以使用 bytes.decode() 將從串流回傳的位元組轉換為文本。

如果傳遞給 stdinstdoutstderr 的類檔案物件表示管道,則該管道的另一端應該使用 connect_write_pipe()connect_read_pipe() 註冊到事件迴圈中。

有關其他引數的文件,請參閱 subprocess.Popen 類別的建構函式。

回傳 (transport, protocol) 對,其中 transport 符合 asyncio.SubprocessTransport 基底類別,protocol 是由 protocol_factory 實例化的物件。

coroutine loop.subprocess_shell(protocol_factory, cmd, *, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)

使用平台的 “shell” 語法從 cmd 建立子行程,cmd 可以是 str 或編碼為 檔案系統編碼bytes 字串。

這類似於標準函式庫中的 subprocess.Popen 類別,使用 shell=True 呼叫。

protocol_factory 必須是回傳 SubprocessProtocol 子類別的可呼叫物件。

有關其餘引數的更多詳細資訊,請參閱 subprocess_exec()

回傳一對 (transport, protocol),其中 transport 符合 SubprocessTransport 基底類別,而 protocol 是由 protocol_factory 實例化的物件。

備註

由應用程式負責確保適當引用所有空白和特殊字元,以避免 shell 注入風險。可以使用 shlex.quote() 函式來正確跳脫用於構建 shell 命令的字串中的空白和特殊字元。

回呼處理

class asyncio.Handle

loop.call_soon()loop.call_soon_threadsafe() 回傳的回呼包裝器。

get_context()

回傳與處理相關聯的 contextvars.Context 物件。

在 3.12 版被加入.

cancel()

取消回呼。如果回呼已被取消或執行,此方法將不起作用。

cancelled()

如果回呼已被取消,回傳 True

在 3.7 版被加入.

class asyncio.TimerHandle

loop.call_later()loop.call_at() 回傳的回呼包裝器。

這個類別是 Handle 的子類別。

when()

回傳預定的回呼時間,以 float 秒為單位。

時間是一個絕對的時間戳,使用與 loop.time() 相同的時間參照。

在 3.7 版被加入.

Server 物件

Server 物件是由 loop.create_server()loop.create_unix_server()start_server()start_unix_server() 函式所建立。

請勿直接實例化 Server 類別。

class asyncio.Server

Server 物件是非同步情境管理器。當在 async with 陳述中使用時,可以保證在完成 async with 陳述時,Server 物件將會關閉並停止接受新的連線:

srv = await loop.create_server(...)

async with srv:
    # some code

# At this point, srv is closed and no longer accepts new connections.

在 3.7 版的變更: 自 Python 3.7 起,Server 物件是非同步情境管理器。

在 3.11 版的變更: 此類別在 Python 3.9.11、3.10.3 和 3.11 中以 asyncio.Server 的形式被公開。

close()

停止服務:關閉監聽的 sockets 並將 sockets 屬性設為 None

代表現有傳入用戶端連線的 sockets 仍然保持開啟。

伺服器以非同步方式關閉;使用 wait_close() 協程等待伺服器關閉(不再有活躍連線)。

get_loop()

回傳與伺服器物件關聯的事件迴圈。

在 3.7 版被加入.

coroutine start_serving()

開始接受連線。

此方法是幂等的,因此可以在伺服器已經運行時呼叫。

start_serving 僅限關鍵字參數只能在 loop.create_server()asyncio.start_server() 中使用,允許建立一個最初不接受連線的 Server 物件。在這種情況下,可以使用 Server.start_serving()Server.serve_forever() 來使 Server 開始接受連線。

在 3.7 版被加入.

coroutine serve_forever()

開始接受連線,直到協程被取消。取消 serve_forever 任務會導致伺服器關閉。

如果伺服器已經接受連線,則可以呼叫此方法。每個 Server 物件只能存在一個 serve_forever 任務。

範例:

async def client_connected(reader, writer):
    # Communicate with the client with
    # reader/writer streams.  For example:
    await reader.readline()

async def main(host, port):
    srv = await asyncio.start_server(
        client_connected, host, port)
    await srv.serve_forever()

asyncio.run(main('127.0.0.1', 0))

在 3.7 版被加入.

is_serving()

如果伺服器正在接受新連線,則回傳 True

在 3.7 版被加入.

coroutine wait_closed()

等待 close() 方法完成且所有活動連線都已結束。

sockets

伺服器正在監聽的類似 socket 的物件串列,asyncio.trsock.TransportSocket

在 3.7 版的變更: 在 Python 3.7 之前,Server.sockets 曾經直接回傳內部伺服器 sockets 的串列。在 3.7 中回傳了該串列的副本。

事件迴圈實作

asyncio 內附兩個不同的事件迴圈實作:SelectorEventLoopProactorEventLoop

預設情況下,asyncio 配置為在 Unix 上使用 SelectorEventLoop,在 Windows 上使用 ProactorEventLoop

class asyncio.SelectorEventLoop

基於 selectors 模組的事件迴圈。

使用特定平台上最有效的 selector。也可以手動配置要使用的確切 selector 實作:

import asyncio
import selectors

class MyPolicy(asyncio.DefaultEventLoopPolicy):
   def new_event_loop(self):
      selector = selectors.SelectSelector()
      return asyncio.SelectorEventLoop(selector)

asyncio.set_event_loop_policy(MyPolicy())

Availability: Unix, Windows.

class asyncio.ProactorEventLoop

用於 Windows 的事件迴圈,使用"I/O 完成埠"(IOCP)。

Availability: Windows.

class asyncio.AbstractEventLoop

為符合 asyncio 標準的事件迴圈的抽象基礎類別。

事件迴圈方法 部分列出了替代 AbstractEventLoop 實作應該定義的所有方法。

範例

請注意,本節中的所有範例都 故意 展示如何使用低階事件迴圈 API,如 loop.run_forever()loop.call_soon()。現代 asyncio 應用程式很少需要這種方式撰寫;請考慮使用高階的函式,如 asyncio.run()

使用 call_soon() 的 Hello World 範例

使用 loop.call_soon() 方法排程回呼的範例。回呼會顯示 "Hello World",然後停止事件迴圈:

import asyncio

def hello_world(loop):
    """A callback to print 'Hello World' and stop the event loop"""
    print('Hello World')
    loop.stop()

loop = asyncio.new_event_loop()

# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)

# Blocking call interrupted by loop.stop()
try:
    loop.run_forever()
finally:
    loop.close()

也參考

使用協程和 run() 函式建立的類似 Hello World 範例。

使用 call_later() 顯示目前日期

一個回呼的範例,每秒顯示目前日期。回呼使用 loop.call_later() 方法在 5 秒後重新排程自己,然後停止事件迴圈:

import asyncio
import datetime

def display_date(end_time, loop):
    print(datetime.datetime.now())
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, display_date, end_time, loop)
    else:
        loop.stop()

loop = asyncio.new_event_loop()

# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()
try:
    loop.run_forever()
finally:
    loop.close()

也參考

使用協程和 run() 函式建立的類似 current date 範例。

監聽檔案描述器以進行讀取事件

使用 loop.add_reader() 方法等待檔案描述器接收到某些資料,然後關閉事件迴圈:

import asyncio
from socket import socketpair

# Create a pair of connected file descriptors
rsock, wsock = socketpair()

loop = asyncio.new_event_loop()

def reader():
    data = rsock.recv(100)
    print("Received:", data.decode())

    # We are done: unregister the file descriptor
    loop.remove_reader(rsock)

    # Stop the event loop
    loop.stop()

# Register the file descriptor for read event
loop.add_reader(rsock, reader)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

try:
    # Run the event loop
    loop.run_forever()
finally:
    # We are done. Close sockets and the event loop.
    rsock.close()
    wsock.close()
    loop.close()

也參考

設定 SIGINT 和 SIGTERM 的訊號處理程式

(此 signals 範例僅在 Unix 上運作。)

使用 loop.add_signal_handler() 方法註冊訊號 SIGINTSIGTERM 的處理程式:

import asyncio
import functools
import os
import signal

def ask_exit(signame, loop):
    print("got signal %s: exit" % signame)
    loop.stop()

async def main():
    loop = asyncio.get_running_loop()

    for signame in {'SIGINT', 'SIGTERM'}:
        loop.add_signal_handler(
            getattr(signal, signame),
            functools.partial(ask_exit, signame, loop))

    await asyncio.sleep(3600)

print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")

asyncio.run(main())