Потоки¶
Вихідний код: Lib/asyncio/streams.py
Потоки — це високорівневі асинхронні/готові до очікування примітиви для роботи з мережевими підключеннями. Потоки дозволяють надсилати й отримувати дані без використання зворотних викликів або низькорівневих протоколів і транспортів.
Ось приклад клієнта відлуння TCP, написаного з використанням асинхронних потоків:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
Дивіться також розділ Examples нижче.
Потокові функції
Наступні асинхронні функції верхнього рівня можна використовувати для створення та роботи з потоками:
- coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)¶
Встановіть мережеве з’єднання та поверніть пару об’єктів
(reader, writer)
.Повернені об’єкти reader і writer є екземплярами класів
StreamReader
іStreamWriter
.limit визначає обмеження розміру буфера, який використовується повернутим екземпляром
StreamReader
. За замовчуванням обмеження встановлено на 64 КБ.Решта аргументів передаються безпосередньо до
loop.create_connection()
.Примітка
The sock argument transfers ownership of the socket to the
StreamWriter
created. To close the socket, call itsclose()
method.Змінено в версії 3.7: Додано параметр ssl_handshake_timeout.
Змінено в версії 3.8: Додано параметри happy_eyeballs_delay і interleave.
Змінено в версії 3.10: Видалено параметр loop.
Змінено в версії 3.11: Added the ssl_shutdown_timeout parameter.
- coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
Запустіть сервер сокетів.
Зворотний виклик client_connected_cb викликається щоразу, коли встановлюється нове підключення клієнта. Він отримує пару
(reader, writer)
як два аргументи, екземпляри класівStreamReader
іStreamWriter
.client_connected_cb може бути простим викликом або функцією співпрограми; якщо це функція співпрограми, вона буде автоматично запланована як
Task
.limit визначає обмеження розміру буфера, який використовується повернутим екземпляром
StreamReader
. За замовчуванням обмеження встановлено на 64 КБ.Решта аргументів передаються безпосередньо до
loop.create_server()
.Примітка
The sock argument transfers ownership of the socket to the server created. To close the socket, call the server’s
close()
method.Змінено в версії 3.7: Додано параметри ssl_handshake_timeout і start_serving.
Змінено в версії 3.10: Видалено параметр loop.
Змінено в версії 3.11: Added the ssl_shutdown_timeout parameter.
Unix-сокети
- coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
Встановіть з’єднання через сокет Unix і поверніть пару
(reader, writer)
.Подібно до
open_connection()
, але працює на сокетах Unix.Дивіться також документацію
loop.create_unix_connection()
.Примітка
The sock argument transfers ownership of the socket to the
StreamWriter
created. To close the socket, call itsclose()
method.Availability: Unix.
Змінено в версії 3.7: Додано параметр ssl_handshake_timeout. Параметр path тепер може бути path-like object
Змінено в версії 3.10: Видалено параметр loop.
Змінено в версії 3.11: Added the ssl_shutdown_timeout parameter.
- coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
Запустіть сокет-сервер Unix.
Подібно до
start_server()
, але працює з сокетами Unix.Дивіться також документацію
loop.create_unix_server()
.Примітка
The sock argument transfers ownership of the socket to the server created. To close the socket, call the server’s
close()
method.Availability: Unix.
Змінено в версії 3.7: Додано параметри ssl_handshake_timeout і start_serving. Параметр path тепер може бути path-like object.
Змінено в версії 3.10: Видалено параметр loop.
Змінено в версії 3.11: Added the ssl_shutdown_timeout parameter.
StreamReader¶
- class asyncio.StreamReader¶
Represents a reader object that provides APIs to read data from the IO stream. As an asynchronous iterable, the object supports the
async for
statement.Не рекомендується безпосередньо створювати екземпляри об’єктів StreamReader; замість цього використовуйте
open_connection()
іstart_server()
.- feed_eof()¶
Acknowledge the EOF.
- coroutine read(n=-1)¶
Read up to n bytes from the stream.
If n is not provided or set to
-1
, read until EOF, then return all readbytes
. If EOF was received and the internal buffer is empty, return an emptybytes
object.If n is
0
, return an emptybytes
object immediately.If n is positive, return at most n available
bytes
as soon as at least 1 byte is available in the internal buffer. If EOF is received before any byte is read, return an emptybytes
object.
- coroutine readline()¶
Прочитати один рядок, де «рядок» — це послідовність байтів, що закінчуються на
\n
.Якщо EOF отримано, а
\n
не знайдено, метод повертає частково прочитані дані.Якщо EOF отримано, а внутрішній буфер порожній, поверніть порожній об’єкт
bytes
.
- coroutine readexactly(n)¶
Прочитайте рівно n байт.
Викликати
IncompleteReadError
, якщо EOF досягнуто до того, як n можна буде прочитати. Використовуйте атрибутIncompleteReadError.partial
, щоб отримати частково прочитані дані.
- coroutine readuntil(separator=b'\n')¶
Читати дані з потоку, доки не буде знайдено роздільник.
У разі успіху дані та роздільник буде видалено з внутрішнього буфера (використано). Повернуті дані включатимуть роздільник у кінці.
Якщо обсяг зчитаних даних перевищує налаштований ліміт потоку, виникає виняток
LimitOverrunError
, і дані залишаються у внутрішньому буфері та можуть бути прочитані знову.Якщо EOF досягнуто до того, як знайдено повний роздільник, виникає виняток
IncompleteReadError
, і внутрішній буфер скидається. АтрибутIncompleteReadError.partial
може містити частину роздільника.Added in version 3.5.2.
- at_eof()¶
Повертає
True
, якщо буфер порожній і було викликаноfeed_eof()
.
StreamWriter¶
- class asyncio.StreamWriter¶
Представляє об’єкт запису, який надає API для запису даних у потік вводу-виводу.
Не рекомендується безпосередньо створювати екземпляри об’єктів StreamWriter; замість цього використовуйте
open_connection()
іstart_server()
.- write(data)¶
Метод намагається негайно записати дані в основний сокет. Якщо це не вдається, дані ставляться в чергу у внутрішній буфер запису, доки їх не буде надіслано.
Цей метод слід використовувати разом із методом
drain()
:stream.write(data) await stream.drain()
- writelines(data)¶
Метод негайно записує список (або будь-яку ітерацію) байтів у базовий сокет. Якщо це не вдається, дані ставляться в чергу у внутрішній буфер запису, доки їх не буде надіслано.
Цей метод слід використовувати разом із методом
drain()
:stream.writelines(lines) await stream.drain()
- close()¶
Метод закриває потік і базовий сокет.
The method should be used, though not mandatory, along with the
wait_closed()
method:stream.close() await stream.wait_closed()
- can_write_eof()¶
Повертає
True
, якщо основний транспорт підтримує методwrite_eof()
,False
інакше.
- write_eof()¶
Закрийте кінець запису потоку після очищення буферизованих даних запису.
- transport¶
Повернути базовий асинхронний транспорт.
- get_extra_info(name, default=None)¶
Доступ до додаткової транспортної інформації; подробиці див.
BaseTransport.get_extra_info()
.
- coroutine drain()¶
Зачекайте, доки буде прийнятно продовжити запис у потік. Приклад:
writer.write(data) await writer.drain()
Це метод керування потоком, який взаємодіє з базовим буфером запису вводу-виводу. Коли розмір буфера досягає верхнього водяного знака, drain() блокує, доки розмір буфера не зменшиться до низького водяного знака, і запис можна буде відновити. Коли нема чого чекати,
drain()
повертається негайно.
- coroutine start_tls(sslcontext, *, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
Upgrade an existing stream-based connection to TLS.
Параметри:
sslcontext: налаштований екземпляр
SSLContext
.server_hostname: встановлює або замінює ім’я хоста, з яким буде зіставлятися сертифікат цільового сервера.
ssl_handshake_timeout is the time in seconds to wait for the TLS handshake to complete before aborting the connection.
60.0
seconds ifNone
(default).ssl_shutdown_timeout is the time in seconds to wait for the SSL shutdown to complete before aborting the connection.
30.0
seconds ifNone
(default).
Added in version 3.11.
Змінено в версії 3.12: Added the ssl_shutdown_timeout parameter.
- is_closing()¶
Повертає
True
, якщо потік закрито або знаходиться в процесі закриття.Added in version 3.7.
Приклади¶
TCP echo client використовує потоки¶
TCP-клієнт відлуння за допомогою функції asyncio.open_connection()
:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
Дивись також
У прикладі TCP echo client protocol використовується метод низького рівня loop.create_connection()
.
Сервер відлуння TCP з використанням потоків¶
Сервер відлуння TCP за допомогою функції asyncio.start_server()
:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
asyncio.run(main())
Дивись також
У прикладі TCP echo server protocol використовується метод loop.create_server()
.
Отримайте заголовки HTTP¶
Простий приклад запиту HTTP-заголовків URL-адреси, переданої в командному рядку:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
await writer.wait_closed()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
Використання:
python example.py http://example.com/path/page.html
або з HTTPS:
python example.py https://example.com/path/page.html
Зареєструйте відкритий сокет для очікування даних за допомогою потоків¶
Співпрограма очікує, доки сокет не отримає дані за допомогою функції open_connection()
:
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
await writer.wait_closed()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
Дивись також
У прикладі реєструвати відкритий сокет для очікування даних за допомогою протоколу використовується протокол низького рівня та метод loop.create_connection()
.
У прикладі спостерігати за подіями читання файлового дескриптора використовується низькорівневий метод loop.add_reader()
для спостереження за файловим дескриптором.