串流¶
串流是支援 async/await (async/await-ready) 的高階原始物件 (high-level primitive),用於處理網路連線。串流不需要使用回呼 (callback) 或低階協定和傳輸 (transport) 就能夠傳送和接收資料。
這是一個使用 asyncio 串流編寫的 TCP echo 客戶端範例:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
另請參閱下方 Examples 段落。
串流函式
下面的高階 asyncio 函式可以用來建立和處理串流:
-
coroutine
asyncio.
open_connection
(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)¶ 建立網路連線並回傳一對
(reader, writer)
物件。回傳的 reader 和 writer 物件是
StreamReader
和StreamWriter
類別的實例。limit 指定了回傳的
StreamReader
實例所使用的緩衝區 (buffer) 大小限制。limit 預設為 64 KiB。其餘的引數會直接傳遞到
loop.create_connection()
。備註
sock 参数可将套接字的所有权转给所创建的
StreamWriter
。 要关闭该套接字,请调用其close()
方法。3.7 版更變: 新增 ssl_handshake_timeout 參數。
3.8 版新加入: 增加了 happy_eyeballs_delay 和 interleave 形参。
3.10 版更變: 移除 loop 參數。
-
coroutine
asyncio.
start_server
(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)¶ 啟動 socket 伺服器。
當一個新的客戶端連線被建立時,回呼函式 client_connected_cb 就會被呼叫。該函式會接收到一對引數
(reader, writer)
,分別為StreamReader
和StreamWriter
的實例。client_connected_cb 既可以是普通的可呼叫物件 (callable),也可以是一個協程函式;如果它是一個協程函式,它將自動作為
Task
來被排程。limit 指定了回傳的
StreamReader
實例所使用的緩衝區 (buffer) 大小限制。limit 預設為 64 KiB。剩下的引數將會直接傳遞給
loop.create_server()
。備註
sock 参数可将套接字的所有权转给所创建的服务器。 要关闭该套接字,请调用服务器的
close()
方法。3.7 版更變: 新增 ssl_handshake_timeout 與 start_serving 參數。
3.10 版更變: 移除 loop 參數。
Unix Sockets
-
coroutine
asyncio.
open_unix_connection
(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)¶ 建立一個 Unix socket 連線並回傳一對
(reader, writer)
。與
open_connection()
相似,但是是操作 Unix sockets。另請參閱
loop.create_unix_connection()
文件。備註
sock 参数可将套接字的所有权转给所创建的
StreamWriter
。 要关闭该套接字,请调用其close()
方法。適用:Unix。
3.7 版更變: 新增 ssl_handshake_timeout 參數。path 參數現在可以是個 path-like object
3.10 版更變: 移除 loop 參數。
-
coroutine
asyncio.
start_unix_server
(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)¶ 啟動一個 Unix socket 伺服器。
與
start_server()
相似,但會是操作 Unix sockets。另請參閱
loop.create_unix_server()
文件。備註
sock 参数可将套接字的所有权转给所创建的服务器。 要关闭该套接字,请调用服务器的
close()
方法。適用:Unix。
3.7 版更變: 新增 ssl_handshake_timeout 與 start_serving 參數。path 參數現在可以是個 path-like object。
3.10 版更變: 移除 loop 參數。
StreamReader¶
-
class
asyncio.
StreamReader
¶ 代表一个提供从 IO 流读取数据的 API 的读取器。 作为一个 asynchronous iterable,该对象支持
async for
语句。不建議直接實例化 StreamReader 物件;使用
open_connection()
和start_server()
會是較好的做法。-
coroutine
read
(n=- 1)¶ 从流读取至多 n 个字节。
如果未提供 n 或是设为
-1
,则一直读取到 EOF,然后返回所读取的全部bytes
。 如果收到 EOF 并且内部缓冲区为空,则返回一个空bytes
对象。object.如果 n 为
0
,则立即返回一个空bytes
对象。如果 n 为正值,则一旦内部缓冲区有至少 1 个字节可用就返回至多 n 个可用的
bytes
。 如果在读取任何字节之前收到 EOF,则返回一个空bytes
对象。
-
coroutine
readline
()¶ 讀取一行,其中"行"指的是以
\n
結尾的位元組序列。如果讀取到 EOF 而沒有找到
\n
,該方法會回傳部分的已讀取資料。如果讀取到 EOF 且內部緩衝區是空的,則回傳一個空的
bytes
物件。
-
coroutine
readexactly
(n)¶ 讀取剛好 n 個位元組。
如果在讀取完 n 個位元組之前讀取到 EOF,則會引發
IncompleteReadError
。使用IncompleteReadError.partial
屬性來獲取串流結束前已讀取的部分資料。
-
coroutine
readuntil
(separator=b'\n')¶ 從串流中持續讀取資料直到出現 separator。
成功後,資料和 separator(分隔符號)會從內部緩衝區中刪除(或者說是被消費掉 (consumed))。回傳的資料在末尾會有一個 separator。
如果讀取的資料量超過了設定的串流限制,將會引發
LimitOverrunError
例外,資料將被留在內部緩衝區中,並可以再次被讀取。如果在完整的 separator 被找到之前就讀取到 EOF,則會引發
IncompleteReadError
例外,且內部緩衝區會被重置。IncompleteReadError.partial
屬性可能包含一部分的 separator。3.5.2 版新加入.
-
at_eof
()¶ 如果緩衝區是空的且
feed_eof()
曾被呼叫則回傳True
。
-
coroutine
StreamWriter¶
-
class
asyncio.
StreamWriter
¶ 表示一個有提供 API 來將資料寫入 IO 串流的 writer 物件。
不建議直接實例化 StreamWriter 物件;使用
open_connection()
和start_server()
會是較好的做法。-
write
(data)¶ 此方法會嘗試立即將 data 寫入到底層的 socket。如果失敗,資料會被放到內部寫入緩衝中排隊等待 (queue),直到它可被發送。
此方法應當與
drain()
方法一起使用:stream.write(data) await stream.drain()
-
writelines
(data)¶ 此方法會立即嘗試將一個位元組 list(或任何可疊代物件 (iterable))寫入到底層的 socket。如果失敗,資料會被放到內部寫入緩衝中排隊等待,直到它可被發送。
此方法應當與
drain()
方法一起使用:stream.writelines(lines) await stream.drain()
-
close
()¶ 此方法會關閉串流以及底層的 socket。
此方法应当于
wait_closed()
方法一起使用,但这并非强制要求:stream.close() await stream.wait_closed()
-
can_write_eof
()¶ 如果底層的傳輸支援
write_eof()
方法就回傳True
,否則回傳False
。
-
write_eof
()¶ 在已緩衝的寫入資料被清理 (flush) 後關閉串流的寫入端。
-
transport
¶ 回傳底層的 asyncio 傳輸。
-
get_extra_info
(name, default=None)¶ 存取可選的傳輸資訊;詳情請見
BaseTransport.get_extra_info()
。
-
coroutine
drain
()¶ 等待直到可以繼續寫入到串流。範例:
writer.write(data) await writer.drain()
這是一個與底層 IO 寫入緩衝區互動的流程控制方法。當緩衝區大小達到最高標記位 (high watermark) 時,drain() 會阻塞直到緩衝區大小減少至最低標記位 (low watermark) 以便繼續寫入。當沒有要等待的資料時,
drain()
會立即回傳。
-
is_closing
()¶ 如果串流已被關閉或正在被關閉則回傳
True
。3.7 版新加入.
-
範例¶
使用串流的 TCP echo 客戶端¶
使用 asyncio.open_connection()
函式的 TCP echo 客戶端:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
也參考
使用低階 loop.create_connection()
方法的 TCP echo 客戶端協定範例。
使用串流的 TCP echo 伺服器¶
TCP echo 伺服器使用 asyncio.start_server()
函式:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
asyncio.run(main())
也參考
使用 loop.create_server()
方法的 TCP echo 伺服器協定 範例。
獲取 HTTP 標頭¶
查詢自命令列傳入之 URL 所帶有 HTTP 標頭的簡單範例:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
await writer.wait_closed()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
用法:
python example.py http://example.com/path/page.html
或使用 HTTPS:
python example.py https://example.com/path/page.html
註冊一個使用串流來等待資料的開放 socket¶
等待直到 socket 透過使用 open_connection()
函式接收到資料的協程:
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
await writer.wait_closed()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
也參考
在註冊一個開啟的 socket 以等待有使用協定的資料範例中,有使用了低階協定以及 loop.create_connection()
方法。
在監視檔案描述器以讀取事件範例中,有使用低階的 loop.add_reader()
方法來監視檔案描述器。