Потоки

Вихідний код: Lib/asyncio/streams.py


Потоки — це високорівневі асинхронні/готові до очікування примітиви для роботи з мережевими підключеннями. Потоки дозволяють надсилати й отримувати дані без використання зворотних викликів або низькорівневих протоколів і транспортів.

Ось приклад клієнта відлуння TCP, написаного з використанням асинхронних потоків:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

Дивіться також розділ Examples нижче.

Потокові функції

Наступні асинхронні функції верхнього рівня можна використовувати для створення та роботи з потоками:

coroutine asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)

Встановіть мережеве з’єднання та поверніть пару об’єктів (reader, writer).

Повернені об’єкти reader і writer є екземплярами класів StreamReader і StreamWriter.

The loop argument is optional and can always be determined automatically when this function is awaited from a coroutine.

limit визначає обмеження розміру буфера, який використовується повернутим екземпляром StreamReader. За замовчуванням обмеження встановлено на 64 КБ.

Решта аргументів передаються безпосередньо до loop.create_connection().

Нове в версії 3.7: The ssl_handshake_timeout parameter.

Нове в версії 3.8: Added happy_eyeballs_delay and interleave parameters.

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

Запустіть сервер сокетів.

Зворотний виклик client_connected_cb викликається щоразу, коли встановлюється нове підключення клієнта. Він отримує пару (reader, writer) як два аргументи, екземпляри класів StreamReader і StreamWriter.

client_connected_cb може бути простим викликом або функцією співпрограми; якщо це функція співпрограми, вона буде автоматично запланована як Task.

The loop argument is optional and can always be determined automatically when this method is awaited from a coroutine.

limit визначає обмеження розміру буфера, який використовується повернутим екземпляром StreamReader. За замовчуванням обмеження встановлено на 64 КБ.

Решта аргументів передаються безпосередньо до loop.create_server().

Нове в версії 3.7: The ssl_handshake_timeout and start_serving parameters.

Unix-сокети

coroutine asyncio.open_unix_connection(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)

Встановіть з’єднання через сокет Unix і поверніть пару (reader, writer).

Подібно до open_connection(), але працює на сокетах Unix.

Дивіться також документацію loop.create_unix_connection().

Availability: Unix.

Нове в версії 3.7: The ssl_handshake_timeout parameter.

Змінено в версії 3.7: The path parameter can now be a path-like object

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

Запустіть сокет-сервер Unix.

Подібно до start_server(), але працює з сокетами Unix.

Дивіться також документацію loop.create_unix_server().

Availability: Unix.

Нове в версії 3.7: The ssl_handshake_timeout and start_serving parameters.

Змінено в версії 3.7: The path parameter can now be a path-like object.

StreamReader

class asyncio.StreamReader

Represents a reader object that provides APIs to read data from the IO stream.

Не рекомендується безпосередньо створювати екземпляри об’єктів StreamReader; замість цього використовуйте open_connection() і start_server().

coroutine read(n=-1)

Read up to n bytes. If n is not provided, or set to -1, read until EOF and return all read bytes.

If EOF was received and the internal buffer is empty, return an empty bytes object.

coroutine readline()

Прочитати один рядок, де «рядок» — це послідовність байтів, що закінчуються на \n.

Якщо EOF отримано, а \n не знайдено, метод повертає частково прочитані дані.

Якщо EOF отримано, а внутрішній буфер порожній, поверніть порожній об’єкт bytes.

coroutine readexactly(n)

Прочитайте рівно n байт.

Викликати IncompleteReadError, якщо EOF досягнуто до того, як n можна буде прочитати. Використовуйте атрибут IncompleteReadError.partial, щоб отримати частково прочитані дані.

coroutine readuntil(separator=b'\n')

Читати дані з потоку, доки не буде знайдено роздільник.

У разі успіху дані та роздільник буде видалено з внутрішнього буфера (використано). Повернуті дані включатимуть роздільник у кінці.

Якщо обсяг зчитаних даних перевищує налаштований ліміт потоку, виникає виняток LimitOverrunError, і дані залишаються у внутрішньому буфері та можуть бути прочитані знову.

Якщо EOF досягнуто до того, як знайдено повний роздільник, виникає виняток IncompleteReadError, і внутрішній буфер скидається. Атрибут IncompleteReadError.partial може містити частину роздільника.

Нове в версії 3.5.2.

at_eof()

Повертає True, якщо буфер порожній і було викликано feed_eof().

StreamWriter

class asyncio.StreamWriter

Представляє об’єкт запису, який надає API для запису даних у потік вводу-виводу.

Не рекомендується безпосередньо створювати екземпляри об’єктів StreamWriter; замість цього використовуйте open_connection() і start_server().

write(data)

Метод намагається негайно записати дані в основний сокет. Якщо це не вдається, дані ставляться в чергу у внутрішній буфер запису, доки їх не буде надіслано.

Цей метод слід використовувати разом із методом drain():

stream.write(data)
await stream.drain()
writelines(data)

Метод негайно записує список (або будь-яку ітерацію) байтів у базовий сокет. Якщо це не вдається, дані ставляться в чергу у внутрішній буфер запису, доки їх не буде надіслано.

Цей метод слід використовувати разом із методом drain():

stream.writelines(lines)
await stream.drain()
close()

Метод закриває потік і базовий сокет.

The method should be used along with the wait_closed() method:

stream.close()
await stream.wait_closed()
can_write_eof()

Повертає True, якщо основний транспорт підтримує метод write_eof(), False інакше.

write_eof()

Закрийте кінець запису потоку після очищення буферизованих даних запису.

transport

Повернути базовий асинхронний транспорт.

get_extra_info(name, default=None)

Доступ до додаткової транспортної інформації; подробиці див. BaseTransport.get_extra_info().

coroutine drain()

Зачекайте, доки буде прийнятно продовжити запис у потік. Приклад:

writer.write(data)
await writer.drain()

Це метод керування потоком, який взаємодіє з базовим буфером запису вводу-виводу. Коли розмір буфера досягає верхнього водяного знака, drain() блокує, доки розмір буфера не зменшиться до низького водяного знака, і запис можна буде відновити. Коли нема чого чекати, drain() повертається негайно.

is_closing()

Повертає True, якщо потік закрито або знаходиться в процесі закриття.

Нове в версії 3.7.

coroutine wait_closed()

Зачекайте, поки потік закриється.

Should be called after close() to wait until the underlying connection is closed.

Нове в версії 3.7.

Приклади

TCP echo client використовує потоки

TCP-клієнт відлуння за допомогою функції asyncio.open_connection():

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()

asyncio.run(tcp_echo_client('Hello World!'))

Дивись також

У прикладі TCP echo client protocol використовується метод низького рівня loop.create_connection().

Сервер відлуння TCP з використанням потоків

Сервер відлуння TCP за допомогою функції asyncio.start_server():

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

Дивись також

У прикладі TCP echo server protocol використовується метод loop.create_server().

Отримайте заголовки HTTP

Простий приклад запиту HTTP-заголовків URL-адреси, переданої в командному рядку:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Використання:

python example.py http://example.com/path/page.html

або з HTTPS:

python example.py https://example.com/path/page.html

Зареєструйте відкритий сокет для очікування даних за допомогою потоків

Співпрограма очікує, доки сокет не отримає дані за допомогою функції open_connection():

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

Дивись також

У прикладі реєструвати відкритий сокет для очікування даних за допомогою протоколу використовується протокол низького рівня та метод loop.create_connection().

У прикладі спостерігати за подіями читання файлового дескриптора використовується низькорівневий метод loop.add_reader() для спостереження за файловим дескриптором.