Fluxos

Streams são conexões de rede de alto-nível assincronias/espera-pronta. Streams permitem envios e recebimentos de dados sem usar retornos de chamadas ou protocolos de baixo nível.

Aqui está um exemplo de um cliente TCP realizando eco, escrito usando streams asyncio:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

Veja também a seção Exemplos abaixo.

Funções Stream

As seguintes funções asyncio de alto nível podems ser usadas para criar e trabalhar com fluxos:

coroutine asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)

Estabelece uma conexão de rede e retorna um par de objetos (reader, writer).

Os objetos reader e writer retornados são instâncias das classes StreamReader e StreamWriter.

O argumento loop é opcional e sempre pode ser determinado automaticamente, quando esta função é esperada a partir de uma corrotina.

limit determina o tamanho limite do buffer usado pela instância StreamReader retornada. Por padrão, limit é definido em 64 KiB.

O resto dos argumentos são passados diretamente para loop.create_connection().

Novo na versão 3.7: O parâmetro ssl_handshake_timeout.

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

Inicia um soquete no servidor.

A função de retorno client_connected_cb é chamada sempre que uma nova conexão de um cliente é estabelecida. Ela recebe um par (reader, writer) como dois argumentos, instâncias das classes StreamReader e StreamWriter.

client_connected_cb pode ser simplesmente algo chamável ou uma função de corrotina; se ele for uma função de corrotina, ele será automaticamente agendado como uma Task.

O argumento loop é opcional e pode sempre ser determinado automaticamente quando este método é esperado a partir de uma corrotina.

limit determina o tamanho limite do buffer usado pela instância StreamReader retornada. Por padrão, limit é definido em 64 KiB.

O resto dos argumentos são passados diretamente para loop.create_server().

Novo na versão 3.7: Os parâmetros ssl_handshake_timeout e start_serving.

Soquetes Unix

coroutine asyncio.open_unix_connection(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)

Estabelece uma conexão de soquete Unix e retorna um par com (reader, writer).

Similar a open_connection(), mas opera em soquetes Unix.

Veja também a documentação do método loop.create_unix_connection().

Disponibilidade: Unix.

Novo na versão 3.7: O parâmetro ssl_handshake_timeout.

Alterado na versão 3.7: O parâmetro path agora pode ser um objeto caminho ou similar

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

Inicia um servidor com soquete Unix.

Similar a start_server(), mas funciona com soquetes Unix.

Veja também a documentação do método loop.create_unix_server().

Disponibilidade: Unix.

Novo na versão 3.7: Os parâmetros ssl_handshake_timeout e start_serving.

Alterado na versão 3.7: O parâmetro path agora pode ser um objeto caminho ou similar.


StreamReader

class asyncio.StreamReader

Representa um objeto leitor, que fornece APIs para ler dados a partir do stream de entrada/saída.

Não é recomendado instanciar objetos StreamReader diretamente; use open_connection() e start_server() ao invés disso.

coroutine read(n=-1)

Executa a leitura de até n bytes. Se n não for informado, ou for definido para -1, executa a leitura até que EOF (fim do arquivo) seja atingido, e retorna todos os bytes lidos.

Se EOF foi recebido e o buffer interno estiver vazio, retorna um objeto bytes vazio.

coroutine readline()

Lê uma linha, onde “line” é uma sequência de bytes encerrando com \n.

Se EOF é recebido e \n não foi encontrado, o método retorna os dados parcialmente lidos.

Se EOF for recebido e o buffer interno estiver vazio, retorna um objeto bytes vazio.

coroutine readexactly(n)

Lê exatamente n bytes.

Levanta um IncompleteReadError se EOF é atingido antes que n sejam lidos. Use o atributo IncompleteReadError.partial para obter os dados parcialmente lidos.

coroutine readuntil(separator=b'\n')

Lê dados a partir do stream até que separator seja encontrado.

Ao ter sucesso, os dados e o separador serão removidos do buffer interno (consumido). Dados retornados irão incluir o separador no final.

Se a quantidade de dados lidos excede o limite configurado para o stream, uma exceção LimitOverrunError é levantada, e os dados são deixados no buffer interno e podem ser lidos novamente.

Se EOF for atingido antes que o separador completo seja encontrado, uma exceção IncompleteReadError é levantada, e o buffer interno é resetado. O atributo IncompleteReadError.partial pode conter uma parte do separador.

Novo na versão 3.5.2.

at_eof()

Retorna True se o buffer estiver vazio e feed_eof() foi chamado.

StreamWriter

class asyncio.StreamWriter

Representa um objeto de escrita que fornece APIs para escrever dados para o stream de IO.

Não é recomendado instanciar objetos StreamWriter diretamente; use open_connection() e start_server() ao invés.

can_write_eof()

Retorna True se o transporte subjacente suporta o método write_eof(), False caso contrário.

write_eof()

Fecha o extremo de escrita do stream após os dados no buffer de escrita terem sido descarregados.

transport

Retorna o transporte asyncio subjacente.

get_extra_info(name, default=None)

Acessa informações de transporte opcionais; veja BaseTransport.get_extra_info() para detalhes.

write(data)

Write data to the stream.

This method is not subject to flow control. Calls to write() should be followed by drain().

writelines(data)

Write a list (or any iterable) of bytes to the stream.

This method is not subject to flow control. Calls to writelines() should be followed by drain().

coroutine drain()

Aguarda até que seja apropriado continuar escrevendo no stream. Exemplo:

writer.write(data)
await writer.drain()

Este é um método de controle de fluxo que interage com o buffer de entrada e saída de escrita subjacente. Quando o tamanho do buffer atinge a marca d’agua alta, drain() bloqueia até que o tamanho do buffer seja drenado para a marca d’água baixa, e a escrita possa continuar. Quando não existe nada que cause uma espera, o método drain() retorna imediatamente.

close()

Close the stream.

is_closing()

Retorna True se o stream estiver fechado ou em processo de ser fechado.

Novo na versão 3.7.

coroutine wait_closed()

Aguarda até que o stream seja fechado.

Deve ser chamado após close() para aguardar até que a conexão subjacente esteja fechada.

Novo na versão 3.7.

Exemplos

Cliente para eco TCP usando streams

Cliente de eco TCP usando a função asyncio.open_connection():

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()

asyncio.run(tcp_echo_client('Hello World!'))

Ver também

O exemplo de protocolo do cliente para eco TCP usa o método de baixo nível loop.create_connection().

Servidor eco TCP usando streams

Servidor eco TCP usando a função asyncio.start_server():

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

Ver também

O exemplo de protocolo eco de servidor TCP utiliza o método loop.create_server().

Obtém headers HTTP

Exemplo simples consultando cabeçalhos HTTP da URL passada na linha de comando:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Utilização:

python example.py http://example.com/path/page.html

ou com HTTPS:

python example.py https://example.com/path/page.html

Registra um soquete aberto para aguardar por dados usando streams

Corrotina aguardando até que um soquete receba dados usando a função open_connection():

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

Ver também

O exemplo de registro de um soquete aberto para aguardar por dados usando um protocolo utiliza um protocolo de baixo nível e o método loop.create_connection().

O exemplo para monitorar um descritor de arquivo para leitura de eventos utiliza o método de baixo nível loop.add_reader() para monitorar um descritor de arquivo.