Потоки

Вихідний код: Lib/asyncio/streams.py


Потоки — це високорівневі асинхронні/готові до очікування примітиви для роботи з мережевими підключеннями. Потоки дозволяють надсилати й отримувати дані без використання зворотних викликів або низькорівневих протоколів і транспортів.

Ось приклад клієнта відлуння TCP, написаного з використанням асинхронних потоків:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

Дивіться також розділ Examples нижче.

Потокові функції

Наступні асинхронні функції верхнього рівня можна використовувати для створення та роботи з потоками:

coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)

Встановіть мережеве з’єднання та поверніть пару об’єктів (reader, writer).

Повернені об’єкти reader і writer є екземплярами класів StreamReader і StreamWriter.

limit визначає обмеження розміру буфера, який використовується повернутим екземпляром StreamReader. За замовчуванням обмеження встановлено на 64 КБ.

Решта аргументів передаються безпосередньо до loop.create_connection().

Примітка

The sock argument transfers ownership of the socket to the StreamWriter created. To close the socket, call its close() method.

Змінено в версії 3.7: Додано параметр ssl_handshake_timeout.

Змінено в версії 3.8: Додано параметри happy_eyeballs_delay і interleave.

Змінено в версії 3.10: Видалено параметр loop.

Змінено в версії 3.11: Added the ssl_shutdown_timeout parameter.

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

Запустіть сервер сокетів.

Зворотний виклик client_connected_cb викликається щоразу, коли встановлюється нове підключення клієнта. Він отримує пару (reader, writer) як два аргументи, екземпляри класів StreamReader і StreamWriter.

client_connected_cb може бути простим викликом або функцією співпрограми; якщо це функція співпрограми, вона буде автоматично запланована як Task.

limit визначає обмеження розміру буфера, який використовується повернутим екземпляром StreamReader. За замовчуванням обмеження встановлено на 64 КБ.

Решта аргументів передаються безпосередньо до loop.create_server().

Примітка

The sock argument transfers ownership of the socket to the server created. To close the socket, call the server’s close() method.

Змінено в версії 3.7: Додано параметри ssl_handshake_timeout і start_serving.

Змінено в версії 3.10: Видалено параметр loop.

Змінено в версії 3.11: Added the ssl_shutdown_timeout parameter.

Unix-сокети

coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

Встановіть з’єднання через сокет Unix і поверніть пару (reader, writer).

Подібно до open_connection(), але працює на сокетах Unix.

Дивіться також документацію loop.create_unix_connection().

Примітка

The sock argument transfers ownership of the socket to the StreamWriter created. To close the socket, call its close() method.

Змінено в версії 3.7: Додано параметр ssl_handshake_timeout. Параметр path тепер може бути path-like object

Змінено в версії 3.10: Видалено параметр loop.

Змінено в версії 3.11: Added the ssl_shutdown_timeout parameter.

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

Запустіть сокет-сервер Unix.

Подібно до start_server(), але працює з сокетами Unix.

Дивіться також документацію loop.create_unix_server().

Примітка

The sock argument transfers ownership of the socket to the server created. To close the socket, call the server’s close() method.

Змінено в версії 3.7: Додано параметри ssl_handshake_timeout і start_serving. Параметр path тепер може бути path-like object.

Змінено в версії 3.10: Видалено параметр loop.

Змінено в версії 3.11: Added the ssl_shutdown_timeout parameter.

StreamReader

class asyncio.StreamReader

Represents a reader object that provides APIs to read data from the IO stream. As an asynchronous iterable, the object supports the async for statement.

Не рекомендується безпосередньо створювати екземпляри об’єктів StreamReader; замість цього використовуйте open_connection() і start_server().

feed_eof()

Acknowledge the EOF.

coroutine read(n=-1)

Read up to n bytes from the stream.

If n is not provided or set to -1, read until EOF, then return all read bytes. If EOF was received and the internal buffer is empty, return an empty bytes object.

If n is 0, return an empty bytes object immediately.

If n is positive, return at most n available bytes as soon as at least 1 byte is available in the internal buffer. If EOF is received before any byte is read, return an empty bytes object.

coroutine readline()

Прочитати один рядок, де «рядок» — це послідовність байтів, що закінчуються на \n.

Якщо EOF отримано, а \n не знайдено, метод повертає частково прочитані дані.

Якщо EOF отримано, а внутрішній буфер порожній, поверніть порожній об’єкт bytes.

coroutine readexactly(n)

Прочитайте рівно n байт.

Викликати IncompleteReadError, якщо EOF досягнуто до того, як n можна буде прочитати. Використовуйте атрибут IncompleteReadError.partial, щоб отримати частково прочитані дані.

coroutine readuntil(separator=b'\n')

Читати дані з потоку, доки не буде знайдено роздільник.

У разі успіху дані та роздільник буде видалено з внутрішнього буфера (використано). Повернуті дані включатимуть роздільник у кінці.

Якщо обсяг зчитаних даних перевищує налаштований ліміт потоку, виникає виняток LimitOverrunError, і дані залишаються у внутрішньому буфері та можуть бути прочитані знову.

Якщо EOF досягнуто до того, як знайдено повний роздільник, виникає виняток IncompleteReadError, і внутрішній буфер скидається. Атрибут IncompleteReadError.partial може містити частину роздільника.

Нове в версії 3.5.2.

at_eof()

Повертає True, якщо буфер порожній і було викликано feed_eof().

StreamWriter

class asyncio.StreamWriter

Представляє об’єкт запису, який надає API для запису даних у потік вводу-виводу.

Не рекомендується безпосередньо створювати екземпляри об’єктів StreamWriter; замість цього використовуйте open_connection() і start_server().

write(data)

Метод намагається негайно записати дані в основний сокет. Якщо це не вдається, дані ставляться в чергу у внутрішній буфер запису, доки їх не буде надіслано.

Цей метод слід використовувати разом із методом drain():

stream.write(data)
await stream.drain()
writelines(data)

Метод негайно записує список (або будь-яку ітерацію) байтів у базовий сокет. Якщо це не вдається, дані ставляться в чергу у внутрішній буфер запису, доки їх не буде надіслано.

Цей метод слід використовувати разом із методом drain():

stream.writelines(lines)
await stream.drain()
close()

Метод закриває потік і базовий сокет.

The method should be used, though not mandatory, along with the wait_closed() method:

stream.close()
await stream.wait_closed()
can_write_eof()

Повертає True, якщо основний транспорт підтримує метод write_eof(), False інакше.

write_eof()

Закрийте кінець запису потоку після очищення буферизованих даних запису.

transport

Повернути базовий асинхронний транспорт.

get_extra_info(name, default=None)

Доступ до додаткової транспортної інформації; подробиці див. BaseTransport.get_extra_info().

coroutine drain()

Зачекайте, доки буде прийнятно продовжити запис у потік. Приклад:

writer.write(data)
await writer.drain()

Це метод керування потоком, який взаємодіє з базовим буфером запису вводу-виводу. Коли розмір буфера досягає верхнього водяного знака, drain() блокує, доки розмір буфера не зменшиться до низького водяного знака, і запис можна буде відновити. Коли нема чого чекати, drain() повертається негайно.

coroutine start_tls(sslcontext, *, server_hostname=None, ssl_handshake_timeout=None)

Upgrade an existing stream-based connection to TLS.

Параметри:

  • sslcontext: налаштований екземпляр SSLContext.

  • server_hostname: встановлює або замінює ім’я хоста, з яким буде зіставлятися сертифікат цільового сервера.

  • ssl_handshake_timeout is the time in seconds to wait for the TLS handshake to complete before aborting the connection. 60.0 seconds if None (default).

Нове в версії 3.11.

is_closing()

Повертає True, якщо потік закрито або знаходиться в процесі закриття.

Нове в версії 3.7.

coroutine wait_closed()

Зачекайте, поки потік закриється.

Should be called after close() to wait until the underlying connection is closed, ensuring that all data has been flushed before e.g. exiting the program.

Нове в версії 3.7.

Приклади

TCP echo client використовує потоки

TCP-клієнт відлуння за допомогою функції asyncio.open_connection():

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

Дивись також

У прикладі TCP echo client protocol використовується метод низького рівня loop.create_connection().

Сервер відлуння TCP з використанням потоків

Сервер відлуння TCP за допомогою функції asyncio.start_server():

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

Дивись також

У прикладі TCP echo server protocol використовується метод loop.create_server().

Отримайте заголовки HTTP

Простий приклад запиту HTTP-заголовків URL-адреси, переданої в командному рядку:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()
    await writer.wait_closed()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Використання:

python example.py http://example.com/path/page.html

або з HTTPS:

python example.py https://example.com/path/page.html

Зареєструйте відкритий сокет для очікування даних за допомогою потоків

Співпрограма очікує, доки сокет не отримає дані за допомогою функції open_connection():

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()
    await writer.wait_closed()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

Дивись також

У прикладі реєструвати відкритий сокет для очікування даних за допомогою протоколу використовується протокол низького рівня та метод loop.create_connection().

У прикладі спостерігати за подіями читання файлового дескриптора використовується низькорівневий метод loop.add_reader() для спостереження за файловим дескриптором.