串流¶
串流是支援 async/await (async/await-ready) 的高階原始物件 (high-level primitive),用於處理網路連線。串流不需要使用回呼 (callback) 或低階協定和傳輸 (transport) 就能夠傳送和接收資料。
這是一個使用 asyncio 串流編寫的 TCP echo 客戶端範例:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
另請參閱下方 Examples 段落。
串流函式
下面的高階 asyncio 函式可以用來建立和處理串流:
- coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)¶
建立網路連線並回傳一對
(reader, writer)
物件。回傳的 reader 和 writer 物件是
StreamReader
和StreamWriter
類別的實例。limit 指定了回傳的
StreamReader
實例所使用的緩衝區 (buffer) 大小限制。limit 預設為 64 KiB。其餘的引數會直接傳遞到
loop.create_connection()
。備註
The sock argument transfers ownership of the socket to the
StreamWriter
created. To close the socket, call itsclose()
method.在 3.7 版的變更: 新增 ssl_handshake_timeout 參數。
在 3.8 版的變更: 新增 happy_eyeballs_delay 和 interleave 參數。
在 3.10 版的變更: 移除 loop 參數。
在 3.11 版的變更: 新增 ssl_shutdown_timeout 參數。
- coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, keep_alive=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
啟動 socket 伺服器。
當一個新的客戶端連線被建立時,回呼函式 client_connected_cb 就會被呼叫。該函式會接收到一對引數
(reader, writer)
,分別為StreamReader
和StreamWriter
的實例。client_connected_cb 既可以是普通的可呼叫物件 (callable),也可以是一個協程函式;如果它是一個協程函式,它將自動作為
Task
來被排程。limit 指定了回傳的
StreamReader
實例所使用的緩衝區 (buffer) 大小限制。limit 預設為 64 KiB。剩下的引數將會直接傳遞給
loop.create_server()
。備註
The sock argument transfers ownership of the socket to the server created. To close the socket, call the server's
close()
method.在 3.7 版的變更: 新增 ssl_handshake_timeout 與 start_serving 參數。
在 3.10 版的變更: 移除 loop 參數。
在 3.11 版的變更: 新增 ssl_shutdown_timeout 參數。
在 3.13 版的變更: Added the keep_alive parameter.
Unix Sockets
- coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
建立一個 Unix socket 連線並回傳一對
(reader, writer)
。與
open_connection()
相似,但是是操作 Unix sockets。另請參閱
loop.create_unix_connection()
文件。備註
The sock argument transfers ownership of the socket to the
StreamWriter
created. To close the socket, call itsclose()
method.Availability: Unix.
在 3.7 版的變更: 新增 ssl_handshake_timeout 參數。path 參數現在可以是個 path-like object
在 3.10 版的變更: 移除 loop 參數。
在 3.11 版的變更: 新增 ssl_shutdown_timeout 參數。
- coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
啟動一個 Unix socket 伺服器。
與
start_server()
相似,但會是操作 Unix sockets。另請參閱
loop.create_unix_server()
文件。備註
The sock argument transfers ownership of the socket to the server created. To close the socket, call the server's
close()
method.Availability: Unix.
在 3.7 版的變更: 新增 ssl_handshake_timeout 與 start_serving 參數。path 參數現在可以是個 path-like object。
在 3.10 版的變更: 移除 loop 參數。
在 3.11 版的變更: 新增 ssl_shutdown_timeout 參數。
StreamReader¶
- class asyncio.StreamReader¶
表示一個有提供 API 來從 IO 串流中讀取資料的 reader 物件。作為一個 asynchronous iterable,此物件支援
async for
陳述式。不建議直接實例化 StreamReader 物件;使用
open_connection()
和start_server()
會是較好的做法。- feed_eof()¶
Acknowledge the EOF.
- coroutine read(n=-1)¶
從串流中讀取至多 n 個位元組的資料。
如果沒有設定 n 或是被設為
-1
,則會持續讀取直到 EOF,然後回傳所有讀取到的bytes
。讀取到 EOF 且內部緩衝區是空的,則回傳一個空的bytes
物件。如果 n 為
0
,則立即回傳一個空的bytes
物件。If n is positive, return at most n available
bytes
as soon as at least 1 byte is available in the internal buffer. If EOF is received before any byte is read, return an emptybytes
object.
- coroutine readline()¶
讀取一行,其中"行"指的是以
\n
結尾的位元組序列。如果讀取到 EOF 而沒有找到
\n
,該方法會回傳部分的已讀取資料。如果讀取到 EOF 且內部緩衝區是空的,則回傳一個空的
bytes
物件。
- coroutine readexactly(n)¶
讀取剛好 n 個位元組。
如果在讀取完 n 個位元組之前讀取到 EOF,則會引發
IncompleteReadError
。使用IncompleteReadError.partial
屬性來獲取串流結束前已讀取的部分資料。
- coroutine readuntil(separator=b'\n')¶
從串流中持續讀取資料直到出現 separator。
成功後,資料和 separator(分隔符號)會從內部緩衝區中刪除(或者說是被消費掉 (consumed))。回傳的資料在末尾會有一個 separator。
如果讀取的資料量超過了設定的串流限制,將會引發
LimitOverrunError
例外,資料將被留在內部緩衝區中,並可以再次被讀取。如果在完整的 separator 被找到之前就讀取到 EOF,則會引發
IncompleteReadError
例外,且內部緩衝區會被重置。IncompleteReadError.partial
屬性可能包含一部分的 separator。The separator may also be a tuple of separators. In this case the return value will be the shortest possible that has any separator as the suffix. For the purposes of
LimitOverrunError
, the shortest possible separator is considered to be the one that matched.在 3.5.2 版被加入.
在 3.13 版的變更: The separator parameter may now be a
tuple
of separators.
- at_eof()¶
如果緩衝區是空的且
feed_eof()
曾被呼叫則回傳True
。
StreamWriter¶
- class asyncio.StreamWriter¶
表示一個有提供 API 來將資料寫入 IO 串流的 writer 物件。
不建議直接實例化 StreamWriter 物件;使用
open_connection()
和start_server()
會是較好的做法。- write(data)¶
此方法會嘗試立即將 data 寫入到底層的 socket。如果失敗,資料會被放到內部寫入緩衝中排隊等待 (queue),直到它可被發送。
此方法應當與
drain()
方法一起使用:stream.write(data) await stream.drain()
- writelines(data)¶
此方法會立即嘗試將一個位元組 list(或任何可疊代物件 (iterable))寫入到底層的 socket。如果失敗,資料會被放到內部寫入緩衝中排隊等待,直到它可被發送。
此方法應當與
drain()
方法一起使用:stream.writelines(lines) await stream.drain()
- close()¶
此方法會關閉串流以及底層的 socket。
此方法應與
wait_closed()
方法一起使用,但並非強制:stream.close() await stream.wait_closed()
- can_write_eof()¶
如果底層的傳輸支援
write_eof()
方法就回傳True
,否則回傳False
。
- write_eof()¶
在已緩衝的寫入資料被清理 (flush) 後關閉串流的寫入端。
- transport¶
回傳底層的 asyncio 傳輸。
- get_extra_info(name, default=None)¶
存取可選的傳輸資訊;詳情請見
BaseTransport.get_extra_info()
。
- coroutine drain()¶
等待直到可以繼續寫入到串流。範例:
writer.write(data) await writer.drain()
這是一個與底層 IO 寫入緩衝區互動的流程控制方法。當緩衝區大小達到最高標記位 (high watermark) 時,drain() 會阻塞直到緩衝區大小減少至最低標記位 (low watermark) 以便繼續寫入。當沒有要等待的資料時,
drain()
會立即回傳。
- coroutine start_tls(sslcontext, *, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
Upgrade an existing stream-based connection to TLS.
參數:
sslcontext: a configured instance of
SSLContext
.server_hostname: sets or overrides the host name that the target server's certificate will be matched against.
ssl_handshake_timeout is the time in seconds to wait for the TLS handshake to complete before aborting the connection.
60.0
seconds ifNone
(default).ssl_shutdown_timeout is the time in seconds to wait for the SSL shutdown to complete before aborting the connection.
30.0
seconds ifNone
(default).
在 3.11 版被加入.
在 3.12 版的變更: 新增 ssl_shutdown_timeout 參數。
- is_closing()¶
如果串流已被關閉或正在被關閉則回傳
True
。在 3.7 版被加入.
範例¶
使用串流的 TCP echo 客戶端¶
使用 asyncio.open_connection()
函式的 TCP echo 客戶端:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
也參考
使用低階 loop.create_connection()
方法的 TCP echo 客戶端協定範例。
使用串流的 TCP echo 伺服器¶
TCP echo 伺服器使用 asyncio.start_server()
函式:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
asyncio.run(main())
也參考
使用 loop.create_server()
方法的 TCP echo 伺服器協定 範例。
獲取 HTTP 標頭¶
查詢自命令列傳入之 URL 所帶有 HTTP 標頭的簡單範例:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
await writer.wait_closed()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
用法:
python example.py http://example.com/path/page.html
或使用 HTTPS:
python example.py https://example.com/path/page.html
註冊一個使用串流來等待資料的開放 socket¶
等待直到 socket 透過使用 open_connection()
函式接收到資料的協程:
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
await writer.wait_closed()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
也參考
在註冊一個開啟的 socket 以等待有使用協定的資料範例中,有使用了低階協定以及 loop.create_connection()
方法。
在監視檔案描述器以讀取事件範例中,有使用低階的 loop.add_reader()
方法來監視檔案描述器。