Streams

Código-fonte: Lib/asyncio/streams.py


Streams são conexões de rede de alto-nível assincronias/espera-pronta. Streams permitem envios e recebimentos de dados sem usar retornos de chamadas ou protocolos de baixo nível.

Aqui está um exemplo de um cliente TCP realizando eco, escrito usando streams asyncio:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

Veja também a seção Exemplos abaixo.

Funções Stream

As seguintes funções asyncio de alto nível podem ser usadas para criar e trabalhar com streams:

coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)

Estabelece uma conexão de rede e retorna um par de objetos (reader, writer).

Os objetos reader e writer retornados são instâncias das classes StreamReader e StreamWriter.

limit determina o tamanho limite do buffer usado pela instância StreamReader retornada. Por padrão, limit é definido em 64 KiB.

O resto dos argumentos é passado diretamente para loop.create_connection().

Alterado na versão 3.7: Added the ssl_handshake_timeout parameter.

Novo na versão 3.8: Added happy_eyeballs_delay and interleave parameters.

Alterado na versão 3.10: Removed the loop parameter.

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

Inicia um soquete no servidor.

A função de retorno client_connected_cb é chamada sempre que uma nova conexão de um cliente é estabelecida. Ela recebe um par (reader, writer) como dois argumentos, instâncias das classes StreamReader e StreamWriter.

client_connected_cb pode ser simplesmente algo chamável ou uma função de corrotina; se ele for uma função de corrotina, ele será automaticamente agendado como uma Task.

limit determina o tamanho limite do buffer usado pela instância StreamReader retornada. Por padrão, limit é definido em 64 KiB.

O resto dos argumentos são passados diretamente para loop.create_server().

Alterado na versão 3.7: Added the ssl_handshake_timeout and start_serving parameters.

Alterado na versão 3.10: Removed the loop parameter.

Soquetes Unix

coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)

Estabelece uma conexão de soquete Unix e retorna um par com (reader, writer).

Similar a open_connection(), mas opera em soquetes Unix.

Veja também a documentação do método loop.create_unix_connection().

Disponibilidade: Unix.

Alterado na versão 3.7: Added the ssl_handshake_timeout parameter. The path parameter can now be a path-like object

Alterado na versão 3.10: Removed the loop parameter.

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

Inicia um servidor com soquete Unix.

Similar a start_server(), mas funciona com soquetes Unix.

Veja também a documentação do método loop.create_unix_server().

Disponibilidade: Unix.

Alterado na versão 3.7: Added the ssl_handshake_timeout and start_serving parameters. The path parameter can now be a path-like object.

Alterado na versão 3.10: Removed the loop parameter.

StreamReader

class asyncio.StreamReader

Representa um objeto leitor, que fornece APIs para ler dados a partir do stream de entrada/saída.

Não é recomendado instanciar objetos StreamReader diretamente; use open_connection() e start_server() ao invés disso.

coroutine read(n=- 1)

Executa a leitura de até n bytes. Se n não for informado, ou for definido para -1, executa a leitura até que EOF (fim do arquivo) seja atingido, e retorna todos os bytes lidos.

Se EOF foi recebido e o buffer interno estiver vazio, retorna um objeto bytes vazio.

coroutine readline()

Lê uma linha, onde “line” é uma sequência de bytes encerrando com \n.

Se EOF é recebido e \n não foi encontrado, o método retorna os dados parcialmente lidos.

Se EOF for recebido e o buffer interno estiver vazio, retorna um objeto bytes vazio.

coroutine readexactly(n)

Lê exatamente n bytes.

Levanta um IncompleteReadError se EOF é atingido antes que n sejam lidos. Use o atributo IncompleteReadError.partial para obter os dados parcialmente lidos.

coroutine readuntil(separator=b'\n')

Lê dados a partir do stream até que separator seja encontrado.

Ao ter sucesso, os dados e o separador serão removidos do buffer interno (consumido). Dados retornados irão incluir o separador no final.

Se a quantidade de dados lidos excede o limite configurado para o stream, uma exceção LimitOverrunError é levantada, e os dados são deixados no buffer interno e podem ser lidos novamente.

Se EOF for atingido antes que o separador completo seja encontrado, uma exceção IncompleteReadError é levantada, e o buffer interno é resetado. O atributo IncompleteReadError.partial pode conter uma parte do separador.

Novo na versão 3.5.2.

at_eof()

Retorna True se o buffer estiver vazio e feed_eof() foi chamado.

StreamWriter

class asyncio.StreamWriter

Representa um objeto de escrita que fornece APIs para escrever dados para o stream de IO.

Não é recomendado instanciar objetos StreamWriter diretamente; use open_connection() e start_server() ao invés.

write(data)

O método tenta escrever data para o soquete subjacente imediatamente. Se isso falhar, data é enfileirado em um buffer interno de escrita, até que possa ser enviado.

O método deve ser usado juntamente com o método drain():

stream.write(data)
await stream.drain()
writelines(data)

O método escreve imediatamente a lista (ou qualquer iterável) de bytes para o soquete subjacente. Se isso falhar, os dados são enfileirados em um buffer de escrita interno até que possam ser enviados.

O método deve ser usado juntamente com o método drain():

stream.writelines(lines)
await stream.drain()
close()

O método fecha o stream e o soquete subjacente.

O método deve ser usado juntamente com o método wait_closed():

stream.close()
await stream.wait_closed()
can_write_eof()

Retorna True se o transporte subjacente suporta o método write_eof(), False caso contrário.

write_eof()

Fecha o extremo de escrita do stream após os dados no buffer de escrita terem sido descarregados.

transport

Retorna o transporte asyncio subjacente.

get_extra_info(name, default=None)

Acessa informações de transporte opcionais; veja BaseTransport.get_extra_info() para detalhes.

coroutine drain()

Aguarda até que seja apropriado continuar escrevendo no stream. Exemplo:

writer.write(data)
await writer.drain()

Este é um método de controle de fluxo que interage com o buffer de entrada e saída de escrita subjacente. Quando o tamanho do buffer atinge a marca d’agua alta, drain() bloqueia até que o tamanho do buffer seja drenado para a marca d’água baixa, e a escrita possa continuar. Quando não existe nada que cause uma espera, o método drain() retorna imediatamente.

is_closing()

Retorna True se o stream estiver fechado ou em processo de ser fechado.

Novo na versão 3.7.

coroutine wait_closed()

Aguarda até que o stream seja fechado.

Deve ser chamado após close() para aguardar até que a conexão subjacente esteja fechada.

Novo na versão 3.7.

Exemplos

Cliente para eco TCP usando streams

Cliente de eco TCP usando a função asyncio.open_connection():

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()

asyncio.run(tcp_echo_client('Hello World!'))

Ver também

O exemplo de protocolo do cliente para eco TCP usa o método de baixo nível loop.create_connection().

Servidor eco TCP usando streams

Servidor eco TCP usando a função asyncio.start_server():

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

Ver também

O exemplo de protocolo eco de servidor TCP utiliza o método loop.create_server().

Obtém headers HTTP

Exemplo simples consultando cabeçalhos HTTP da URL passada na linha de comando:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Uso:

python example.py http://example.com/path/page.html

ou com HTTPS:

python example.py https://example.com/path/page.html

Registra um soquete aberto para aguardar por dados usando streams

Corrotina aguardando até que um soquete receba dados usando a função open_connection():

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

Ver também

O exemplo de registro de um soquete aberto para aguardar por dados usando um protocolo utiliza um protocolo de baixo nível e o método loop.create_connection().

O exemplo para monitorar um descritor de arquivo para leitura de eventos utiliza o método de baixo nível loop.add_reader() para monitorar um descritor de arquivo.