Потоки¶
Вихідний код: Lib/asyncio/streams.py
Потоки — це високорівневі асинхронні/готові до очікування примітиви для роботи з мережевими підключеннями. Потоки дозволяють надсилати й отримувати дані без використання зворотних викликів або низькорівневих протоколів і транспортів.
Ось приклад клієнта відлуння TCP, написаного з використанням асинхронних потоків:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
Дивіться також розділ Examples нижче.
Потокові функції
Наступні асинхронні функції верхнього рівня можна використовувати для створення та роботи з потоками:
-
coroutine
asyncio.
open_connection
(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)¶ Встановіть мережеве з’єднання та поверніть пару об’єктів
(reader, writer)
.Повернені об’єкти reader і writer є екземплярами класів
StreamReader
іStreamWriter
.The loop argument is optional and can always be determined automatically when this function is awaited from a coroutine.
limit визначає обмеження розміру буфера, який використовується повернутим екземпляром
StreamReader
. За замовчуванням обмеження встановлено на 64 КБ.Решта аргументів передаються безпосередньо до
loop.create_connection()
.Нове в версії 3.7: The ssl_handshake_timeout parameter.
Нове в версії 3.8: Added happy_eyeballs_delay and interleave parameters.
-
coroutine
asyncio.
start_server
(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)¶ Запустіть сервер сокетів.
Зворотний виклик client_connected_cb викликається щоразу, коли встановлюється нове підключення клієнта. Він отримує пару
(reader, writer)
як два аргументи, екземпляри класівStreamReader
іStreamWriter
.client_connected_cb може бути простим викликом або функцією співпрограми; якщо це функція співпрограми, вона буде автоматично запланована як
Task
.The loop argument is optional and can always be determined automatically when this method is awaited from a coroutine.
limit визначає обмеження розміру буфера, який використовується повернутим екземпляром
StreamReader
. За замовчуванням обмеження встановлено на 64 КБ.Решта аргументів передаються безпосередньо до
loop.create_server()
.Нове в версії 3.7: The ssl_handshake_timeout and start_serving parameters.
Unix-сокети
-
coroutine
asyncio.
open_unix_connection
(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)¶ Встановіть з’єднання через сокет Unix і поверніть пару
(reader, writer)
.Подібно до
open_connection()
, але працює на сокетах Unix.Дивіться також документацію
loop.create_unix_connection()
.Availability: Unix.
Нове в версії 3.7: The ssl_handshake_timeout parameter.
Змінено в версії 3.7: The path parameter can now be a path-like object
-
coroutine
asyncio.
start_unix_server
(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)¶ Запустіть сокет-сервер Unix.
Подібно до
start_server()
, але працює з сокетами Unix.Дивіться також документацію
loop.create_unix_server()
.Availability: Unix.
Нове в версії 3.7: The ssl_handshake_timeout and start_serving parameters.
Змінено в версії 3.7: The path parameter can now be a path-like object.
StreamReader¶
-
class
asyncio.
StreamReader
¶ Represents a reader object that provides APIs to read data from the IO stream.
Не рекомендується безпосередньо створювати екземпляри об’єктів StreamReader; замість цього використовуйте
open_connection()
іstart_server()
.-
coroutine
read
(n=-1)¶ Read up to n bytes. If n is not provided, or set to
-1
, read until EOF and return all read bytes.If EOF was received and the internal buffer is empty, return an empty
bytes
object.
-
coroutine
readline
()¶ Прочитати один рядок, де «рядок» — це послідовність байтів, що закінчуються на
\n
.Якщо EOF отримано, а
\n
не знайдено, метод повертає частково прочитані дані.Якщо EOF отримано, а внутрішній буфер порожній, поверніть порожній об’єкт
bytes
.
-
coroutine
readexactly
(n)¶ Прочитайте рівно n байт.
Викликати
IncompleteReadError
, якщо EOF досягнуто до того, як n можна буде прочитати. Використовуйте атрибутIncompleteReadError.partial
, щоб отримати частково прочитані дані.
-
coroutine
readuntil
(separator=b'\n')¶ Читати дані з потоку, доки не буде знайдено роздільник.
У разі успіху дані та роздільник буде видалено з внутрішнього буфера (використано). Повернуті дані включатимуть роздільник у кінці.
Якщо обсяг зчитаних даних перевищує налаштований ліміт потоку, виникає виняток
LimitOverrunError
, і дані залишаються у внутрішньому буфері та можуть бути прочитані знову.Якщо EOF досягнуто до того, як знайдено повний роздільник, виникає виняток
IncompleteReadError
, і внутрішній буфер скидається. АтрибутIncompleteReadError.partial
може містити частину роздільника.Нове в версії 3.5.2.
-
at_eof
()¶ Повертає
True
, якщо буфер порожній і було викликаноfeed_eof()
.
-
coroutine
StreamWriter¶
-
class
asyncio.
StreamWriter
¶ Представляє об’єкт запису, який надає API для запису даних у потік вводу-виводу.
Не рекомендується безпосередньо створювати екземпляри об’єктів StreamWriter; замість цього використовуйте
open_connection()
іstart_server()
.-
write
(data)¶ Метод намагається негайно записати дані в основний сокет. Якщо це не вдається, дані ставляться в чергу у внутрішній буфер запису, доки їх не буде надіслано.
Цей метод слід використовувати разом із методом
drain()
:stream.write(data) await stream.drain()
-
writelines
(data)¶ Метод негайно записує список (або будь-яку ітерацію) байтів у базовий сокет. Якщо це не вдається, дані ставляться в чергу у внутрішній буфер запису, доки їх не буде надіслано.
Цей метод слід використовувати разом із методом
drain()
:stream.writelines(lines) await stream.drain()
-
close
()¶ Метод закриває потік і базовий сокет.
The method should be used along with the
wait_closed()
method:stream.close() await stream.wait_closed()
-
can_write_eof
()¶ Повертає
True
, якщо основний транспорт підтримує методwrite_eof()
,False
інакше.
-
write_eof
()¶ Закрийте кінець запису потоку після очищення буферизованих даних запису.
-
transport
¶ Повернути базовий асинхронний транспорт.
-
get_extra_info
(name, default=None)¶ Доступ до додаткової транспортної інформації; подробиці див.
BaseTransport.get_extra_info()
.
-
coroutine
drain
()¶ Зачекайте, доки буде прийнятно продовжити запис у потік. Приклад:
writer.write(data) await writer.drain()
Це метод керування потоком, який взаємодіє з базовим буфером запису вводу-виводу. Коли розмір буфера досягає верхнього водяного знака, drain() блокує, доки розмір буфера не зменшиться до низького водяного знака, і запис можна буде відновити. Коли нема чого чекати,
drain()
повертається негайно.
-
is_closing
()¶ Повертає
True
, якщо потік закрито або знаходиться в процесі закриття.Нове в версії 3.7.
-
Приклади¶
TCP echo client використовує потоки¶
TCP-клієнт відлуння за допомогою функції asyncio.open_connection()
:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
asyncio.run(tcp_echo_client('Hello World!'))
Дивись також
У прикладі TCP echo client protocol використовується метод низького рівня loop.create_connection()
.
Сервер відлуння TCP з використанням потоків¶
Сервер відлуння TCP за допомогою функції asyncio.start_server()
:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
asyncio.run(main())
Дивись також
У прикладі TCP echo server protocol використовується метод loop.create_server()
.
Отримайте заголовки HTTP¶
Простий приклад запиту HTTP-заголовків URL-адреси, переданої в командному рядку:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
Використання:
python example.py http://example.com/path/page.html
або з HTTPS:
python example.py https://example.com/path/page.html
Зареєструйте відкритий сокет для очікування даних за допомогою потоків¶
Співпрограма очікує, доки сокет не отримає дані за допомогою функції open_connection()
:
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
Дивись також
У прикладі реєструвати відкритий сокет для очікування даних за допомогою протоколу використовується протокол низького рівня та метод loop.create_connection()
.
У прикладі спостерігати за подіями читання файлового дескриптора використовується низькорівневий метод loop.add_reader()
для спостереження за файловим дескриптором.