Потоки¶
Вихідний код: Lib/asyncio/streams.py
Потоки — це високорівневі асинхронні/готові до очікування примітиви для роботи з мережевими підключеннями. Потоки дозволяють надсилати й отримувати дані без використання зворотних викликів або низькорівневих протоколів і транспортів.
Ось приклад клієнта відлуння TCP, написаного з використанням асинхронних потоків:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
Дивіться також розділ Examples нижче.
Потокові функції
Наступні асинхронні функції верхнього рівня можна використовувати для створення та роботи з потоками:
-
coroutine
asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)¶ Встановіть мережеве з’єднання та поверніть пару об’єктів
(reader, writer).Повернені об’єкти reader і writer є екземплярами класів
StreamReaderіStreamWriter.The loop argument is optional and can always be determined automatically when this function is awaited from a coroutine.
limit визначає обмеження розміру буфера, який використовується повернутим екземпляром
StreamReader. За замовчуванням обмеження встановлено на 64 КБ.Решта аргументів передаються безпосередньо до
loop.create_connection().Нове в версії 3.7: The ssl_handshake_timeout parameter.
Нове в версії 3.8: Added happy_eyeballs_delay and interleave parameters.
-
coroutine
asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)¶ Запустіть сервер сокетів.
Зворотний виклик client_connected_cb викликається щоразу, коли встановлюється нове підключення клієнта. Він отримує пару
(reader, writer)як два аргументи, екземпляри класівStreamReaderіStreamWriter.client_connected_cb може бути простим викликом або функцією співпрограми; якщо це функція співпрограми, вона буде автоматично запланована як
Task.The loop argument is optional and can always be determined automatically when this method is awaited from a coroutine.
limit визначає обмеження розміру буфера, який використовується повернутим екземпляром
StreamReader. За замовчуванням обмеження встановлено на 64 КБ.Решта аргументів передаються безпосередньо до
loop.create_server().Нове в версії 3.7: The ssl_handshake_timeout and start_serving parameters.
Unix-сокети
-
coroutine
asyncio.open_unix_connection(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)¶ Встановіть з’єднання через сокет Unix і поверніть пару
(reader, writer).Подібно до
open_connection(), але працює на сокетах Unix.Дивіться також документацію
loop.create_unix_connection().Availability: Unix.
Нове в версії 3.7: The ssl_handshake_timeout parameter.
Змінено в версії 3.7: The path parameter can now be a path-like object
-
coroutine
asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)¶ Запустіть сокет-сервер Unix.
Подібно до
start_server(), але працює з сокетами Unix.Дивіться також документацію
loop.create_unix_server().Availability: Unix.
Нове в версії 3.7: The ssl_handshake_timeout and start_serving parameters.
Змінено в версії 3.7: The path parameter can now be a path-like object.
StreamReader¶
-
class
asyncio.StreamReader¶ Represents a reader object that provides APIs to read data from the IO stream.
Не рекомендується безпосередньо створювати екземпляри об’єктів StreamReader; замість цього використовуйте
open_connection()іstart_server().-
coroutine
read(n=-1)¶ Read up to n bytes. If n is not provided, or set to
-1, read until EOF and return all read bytes.If EOF was received and the internal buffer is empty, return an empty
bytesobject.
-
coroutine
readline()¶ Прочитати один рядок, де «рядок» — це послідовність байтів, що закінчуються на
\n.Якщо EOF отримано, а
\nне знайдено, метод повертає частково прочитані дані.Якщо EOF отримано, а внутрішній буфер порожній, поверніть порожній об’єкт
bytes.
-
coroutine
readexactly(n)¶ Прочитайте рівно n байт.
Викликати
IncompleteReadError, якщо EOF досягнуто до того, як n можна буде прочитати. Використовуйте атрибутIncompleteReadError.partial, щоб отримати частково прочитані дані.
-
coroutine
readuntil(separator=b'\n')¶ Читати дані з потоку, доки не буде знайдено роздільник.
У разі успіху дані та роздільник буде видалено з внутрішнього буфера (використано). Повернуті дані включатимуть роздільник у кінці.
Якщо обсяг зчитаних даних перевищує налаштований ліміт потоку, виникає виняток
LimitOverrunError, і дані залишаються у внутрішньому буфері та можуть бути прочитані знову.Якщо EOF досягнуто до того, як знайдено повний роздільник, виникає виняток
IncompleteReadError, і внутрішній буфер скидається. АтрибутIncompleteReadError.partialможе містити частину роздільника.Нове в версії 3.5.2.
-
at_eof()¶ Повертає
True, якщо буфер порожній і було викликаноfeed_eof().
-
coroutine
StreamWriter¶
-
class
asyncio.StreamWriter¶ Представляє об’єкт запису, який надає API для запису даних у потік вводу-виводу.
Не рекомендується безпосередньо створювати екземпляри об’єктів StreamWriter; замість цього використовуйте
open_connection()іstart_server().-
write(data)¶ Метод намагається негайно записати дані в основний сокет. Якщо це не вдається, дані ставляться в чергу у внутрішній буфер запису, доки їх не буде надіслано.
Цей метод слід використовувати разом із методом
drain():stream.write(data) await stream.drain()
-
writelines(data)¶ Метод негайно записує список (або будь-яку ітерацію) байтів у базовий сокет. Якщо це не вдається, дані ставляться в чергу у внутрішній буфер запису, доки їх не буде надіслано.
Цей метод слід використовувати разом із методом
drain():stream.writelines(lines) await stream.drain()
-
close()¶ Метод закриває потік і базовий сокет.
The method should be used along with the
wait_closed()method:stream.close() await stream.wait_closed()
-
can_write_eof()¶ Повертає
True, якщо основний транспорт підтримує методwrite_eof(),Falseінакше.
-
write_eof()¶ Закрийте кінець запису потоку після очищення буферизованих даних запису.
-
transport¶ Повернути базовий асинхронний транспорт.
-
get_extra_info(name, default=None)¶ Доступ до додаткової транспортної інформації; подробиці див.
BaseTransport.get_extra_info().
-
coroutine
drain()¶ Зачекайте, доки буде прийнятно продовжити запис у потік. Приклад:
writer.write(data) await writer.drain()
Це метод керування потоком, який взаємодіє з базовим буфером запису вводу-виводу. Коли розмір буфера досягає верхнього водяного знака, drain() блокує, доки розмір буфера не зменшиться до низького водяного знака, і запис можна буде відновити. Коли нема чого чекати,
drain()повертається негайно.
-
is_closing()¶ Повертає
True, якщо потік закрито або знаходиться в процесі закриття.Нове в версії 3.7.
-
Приклади¶
TCP echo client використовує потоки¶
TCP-клієнт відлуння за допомогою функції asyncio.open_connection():
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
asyncio.run(tcp_echo_client('Hello World!'))
Дивись також
У прикладі TCP echo client protocol використовується метод низького рівня loop.create_connection().
Сервер відлуння TCP з використанням потоків¶
Сервер відлуння TCP за допомогою функції asyncio.start_server():
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
asyncio.run(main())
Дивись також
У прикладі TCP echo server protocol використовується метод loop.create_server().
Отримайте заголовки HTTP¶
Простий приклад запиту HTTP-заголовків URL-адреси, переданої в командному рядку:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
Використання:
python example.py http://example.com/path/page.html
або з HTTPS:
python example.py https://example.com/path/page.html
Зареєструйте відкритий сокет для очікування даних за допомогою потоків¶
Співпрограма очікує, доки сокет не отримає дані за допомогою функції open_connection():
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
Дивись також
У прикладі реєструвати відкритий сокет для очікування даних за допомогою протоколу використовується протокол низького рівня та метод loop.create_connection().
У прикладі спостерігати за подіями читання файлового дескриптора використовується низькорівневий метод loop.add_reader() для спостереження за файловим дескриптором.