Streams

Código-fonte: Lib/asyncio/streams.py


Streams são conexões de rede de alto-nível assincronias/espera-pronta. Streams permitem envios e recebimentos de dados sem usar retornos de chamadas ou protocolos de baixo nível.

Aqui está um exemplo de um cliente TCP realizando eco, escrito usando streams asyncio:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

Veja também a seção Exemplos abaixo.

Funções Stream

As seguintes funções asyncio de alto nível podem ser usadas para criar e trabalhar com streams:

coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)

Estabelece uma conexão de rede e retorna um par de objetos (reader, writer).

Os objetos reader e writer retornados são instâncias das classes StreamReader e StreamWriter.

limit determina o tamanho limite do buffer usado pela instância StreamReader retornada. Por padrão, limit é definido em 64 KiB.

O resto dos argumentos é passado diretamente para loop.create_connection().

Nota

The sock argument transfers ownership of the socket to the StreamWriter created. To close the socket, call its close() method.

Alterado na versão 3.7: Adicionado o parâmetro ssl_handshake_timeout.

Novo na versão 3.8: Added happy_eyeballs_delay and interleave parameters.

Alterado na versão 3.10: Removido o parâmetro loop.

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

Inicia um soquete no servidor.

A função de retorno client_connected_cb é chamada sempre que uma nova conexão de um cliente é estabelecida. Ela recebe um par (reader, writer) como dois argumentos, instâncias das classes StreamReader e StreamWriter.

client_connected_cb pode ser simplesmente algo chamável ou uma função de corrotina; se ele for uma função de corrotina, ele será automaticamente agendado como uma Task.

limit determina o tamanho limite do buffer usado pela instância StreamReader retornada. Por padrão, limit é definido em 64 KiB.

O resto dos argumentos são passados diretamente para loop.create_server().

Nota

The sock argument transfers ownership of the socket to the server created. To close the socket, call the server’s close() method.

Alterado na versão 3.7: Added the ssl_handshake_timeout and start_serving parameters.

Alterado na versão 3.10: Removido o parâmetro loop.

Soquetes Unix

coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)

Estabelece uma conexão de soquete Unix e retorna um par com (reader, writer).

Similar a open_connection(), mas opera em soquetes Unix.

Veja também a documentação do método loop.create_unix_connection().

Nota

The sock argument transfers ownership of the socket to the StreamWriter created. To close the socket, call its close() method.

Disponibilidade: Unix.

Alterado na versão 3.7: Added the ssl_handshake_timeout parameter. The path parameter can now be a path-like object

Alterado na versão 3.10: Removido o parâmetro loop.

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

Inicia um servidor com soquete Unix.

Similar a start_server(), mas funciona com soquetes Unix.

Veja também a documentação do método loop.create_unix_server().

Nota

The sock argument transfers ownership of the socket to the server created. To close the socket, call the server’s close() method.

Disponibilidade: Unix.

Alterado na versão 3.7: Added the ssl_handshake_timeout and start_serving parameters. The path parameter can now be a path-like object.

Alterado na versão 3.10: Removido o parâmetro loop.

StreamReader

class asyncio.StreamReader

Represents a reader object that provides APIs to read data from the IO stream. As an asynchronous iterable, the object supports the async for statement.

Não é recomendado instanciar objetos StreamReader diretamente; use open_connection() e start_server() ao invés disso.

coroutine read(n=- 1)

Read up to n bytes from the stream.

If n is not provided or set to -1, read until EOF, then return all read bytes. If EOF was received and the internal buffer is empty, return an empty bytes object.

If n is 0, return an empty bytes object immediately.

If n is positive, return at most n available bytes as soon as at least 1 byte is available in the internal buffer. If EOF is received before any byte is read, return an empty bytes object.

coroutine readline()

Lê uma linha, onde “line” é uma sequência de bytes encerrando com \n.

Se EOF é recebido e \n não foi encontrado, o método retorna os dados parcialmente lidos.

Se EOF for recebido e o buffer interno estiver vazio, retorna um objeto bytes vazio.

coroutine readexactly(n)

Lê exatamente n bytes.

Levanta um IncompleteReadError se EOF é atingido antes que n sejam lidos. Use o atributo IncompleteReadError.partial para obter os dados parcialmente lidos.

coroutine readuntil(separator=b'\n')

Lê dados a partir do stream até que separator seja encontrado.

Ao ter sucesso, os dados e o separador serão removidos do buffer interno (consumido). Dados retornados irão incluir o separador no final.

Se a quantidade de dados lidos excede o limite configurado para o stream, uma exceção LimitOverrunError é levantada, e os dados são deixados no buffer interno e podem ser lidos novamente.

Se EOF for atingido antes que o separador completo seja encontrado, uma exceção IncompleteReadError é levantada, e o buffer interno é resetado. O atributo IncompleteReadError.partial pode conter uma parte do separador.

Novo na versão 3.5.2.

at_eof()

Retorna True se o buffer estiver vazio e feed_eof() foi chamado.

StreamWriter

class asyncio.StreamWriter

Representa um objeto de escrita que fornece APIs para escrever dados para o stream de IO.

Não é recomendado instanciar objetos StreamWriter diretamente; use open_connection() e start_server() ao invés.

write(data)

O método tenta escrever data para o soquete subjacente imediatamente. Se isso falhar, data é enfileirado em um buffer interno de escrita, até que possa ser enviado.

O método deve ser usado juntamente com o método drain():

stream.write(data)
await stream.drain()
writelines(data)

O método escreve imediatamente a lista (ou qualquer iterável) de bytes para o soquete subjacente. Se isso falhar, os dados são enfileirados em um buffer de escrita interno até que possam ser enviados.

O método deve ser usado juntamente com o método drain():

stream.writelines(lines)
await stream.drain()
close()

O método fecha o stream e o soquete subjacente.

The method should be used, though not mandatory, along with the wait_closed() method:

stream.close()
await stream.wait_closed()
can_write_eof()

Retorna True se o transporte subjacente suporta o método write_eof(), False caso contrário.

write_eof()

Fecha o extremo de escrita do stream após os dados no buffer de escrita terem sido descarregados.

transport

Retorna o transporte asyncio subjacente.

get_extra_info(name, default=None)

Acessa informações de transporte opcionais; veja BaseTransport.get_extra_info() para detalhes.

coroutine drain()

Aguarda até que seja apropriado continuar escrevendo no stream. Exemplo:

writer.write(data)
await writer.drain()

Este é um método de controle de fluxo que interage com o buffer de entrada e saída de escrita subjacente. Quando o tamanho do buffer atinge a marca d’agua alta, drain() bloqueia até que o tamanho do buffer seja drenado para a marca d’água baixa, e a escrita possa continuar. Quando não existe nada que cause uma espera, o método drain() retorna imediatamente.

is_closing()

Retorna True se o stream estiver fechado ou em processo de ser fechado.

Novo na versão 3.7.

coroutine wait_closed()

Aguarda até que o stream seja fechado.

Should be called after close() to wait until the underlying connection is closed, ensuring that all data has been flushed before e.g. exiting the program.

Novo na versão 3.7.

Exemplos

Cliente para eco TCP usando streams

Cliente de eco TCP usando a função asyncio.open_connection():

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

Ver também

O exemplo de protocolo do cliente para eco TCP usa o método de baixo nível loop.create_connection().

Servidor eco TCP usando streams

Servidor eco TCP usando a função asyncio.start_server():

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

Ver também

O exemplo de protocolo eco de servidor TCP utiliza o método loop.create_server().

Obtém headers HTTP

Exemplo simples consultando cabeçalhos HTTP da URL passada na linha de comando:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()
    await writer.wait_closed()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Uso:

python example.py http://example.com/path/page.html

ou com HTTPS:

python example.py https://example.com/path/page.html

Registra um soquete aberto para aguardar por dados usando streams

Corrotina aguardando até que um soquete receba dados usando a função open_connection():

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()
    await writer.wait_closed()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

Ver também

O exemplo de registro de um soquete aberto para aguardar por dados usando um protocolo utiliza um protocolo de baixo nível e o método loop.create_connection().

O exemplo para monitorar um descritor de arquivo para leitura de eventos utiliza o método de baixo nível loop.add_reader() para monitorar um descritor de arquivo.