Streams¶
Código-fonte: Lib/asyncio/streams.py
Streams são conexões de rede de alto-nível assincronias/espera-pronta. Streams permitem envios e recebimentos de dados sem usar retornos de chamadas ou protocolos de baixo nível.
Aqui está um exemplo de um cliente TCP realizando eco, escrito usando streams asyncio:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
Veja também a seção Exemplos abaixo.
Funções Stream
As seguintes funções asyncio de alto nível podem ser usadas para criar e trabalhar com streams:
-
coroutine
asyncio.
open_connection
(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)¶ Estabelece uma conexão de rede e retorna um par de objetos
(reader, writer)
.Os objetos reader e writer retornados são instâncias das classes
StreamReader
eStreamWriter
.limit determina o tamanho limite do buffer usado pela instância
StreamReader
retornada. Por padrão, limit é definido em 64 KiB.O resto dos argumentos é passado diretamente para
loop.create_connection()
.Nota
The sock argument transfers ownership of the socket to the
StreamWriter
created. To close the socket, call itsclose()
method.Alterado na versão 3.7: Adicionado o parâmetro ssl_handshake_timeout.
Novo na versão 3.8: Added happy_eyeballs_delay and interleave parameters.
Alterado na versão 3.10: Removido o parâmetro loop.
-
coroutine
asyncio.
start_server
(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)¶ Inicia um soquete no servidor.
A função de retorno client_connected_cb é chamada sempre que uma nova conexão de um cliente é estabelecida. Ela recebe um par
(reader, writer)
como dois argumentos, instâncias das classesStreamReader
eStreamWriter
.client_connected_cb pode ser simplesmente algo chamável ou uma função de corrotina; se ele for uma função de corrotina, ele será automaticamente agendado como uma
Task
.limit determina o tamanho limite do buffer usado pela instância
StreamReader
retornada. Por padrão, limit é definido em 64 KiB.O resto dos argumentos são passados diretamente para
loop.create_server()
.Nota
The sock argument transfers ownership of the socket to the server created. To close the socket, call the server’s
close()
method.Alterado na versão 3.7: Added the ssl_handshake_timeout and start_serving parameters.
Alterado na versão 3.10: Removido o parâmetro loop.
Soquetes Unix
-
coroutine
asyncio.
open_unix_connection
(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)¶ Estabelece uma conexão de soquete Unix e retorna um par com
(reader, writer)
.Similar a
open_connection()
, mas opera em soquetes Unix.Veja também a documentação do método
loop.create_unix_connection()
.Nota
The sock argument transfers ownership of the socket to the
StreamWriter
created. To close the socket, call itsclose()
method.Disponibilidade: Unix.
Alterado na versão 3.7: Added the ssl_handshake_timeout parameter. The path parameter can now be a path-like object
Alterado na versão 3.10: Removido o parâmetro loop.
-
coroutine
asyncio.
start_unix_server
(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)¶ Inicia um servidor com soquete Unix.
Similar a
start_server()
, mas funciona com soquetes Unix.Veja também a documentação do método
loop.create_unix_server()
.Nota
The sock argument transfers ownership of the socket to the server created. To close the socket, call the server’s
close()
method.Disponibilidade: Unix.
Alterado na versão 3.7: Added the ssl_handshake_timeout and start_serving parameters. The path parameter can now be a path-like object.
Alterado na versão 3.10: Removido o parâmetro loop.
StreamReader¶
-
class
asyncio.
StreamReader
¶ Represents a reader object that provides APIs to read data from the IO stream. As an asynchronous iterable, the object supports the
async for
statement.Não é recomendado instanciar objetos StreamReader diretamente; use
open_connection()
estart_server()
ao invés disso.-
coroutine
read
(n=- 1)¶ Read up to n bytes from the stream.
If n is not provided or set to
-1
, read until EOF, then return all readbytes
. If EOF was received and the internal buffer is empty, return an emptybytes
object.If n is
0
, return an emptybytes
object immediately.If n is positive, return at most n available
bytes
as soon as at least 1 byte is available in the internal buffer. If EOF is received before any byte is read, return an emptybytes
object.
-
coroutine
readline
()¶ Lê uma linha, onde “line” é uma sequência de bytes encerrando com
\n
.Se EOF é recebido e
\n
não foi encontrado, o método retorna os dados parcialmente lidos.Se EOF for recebido e o buffer interno estiver vazio, retorna um objeto
bytes
vazio.
-
coroutine
readexactly
(n)¶ Lê exatamente n bytes.
Levanta um
IncompleteReadError
se EOF é atingido antes que n sejam lidos. Use o atributoIncompleteReadError.partial
para obter os dados parcialmente lidos.
-
coroutine
readuntil
(separator=b'\n')¶ Lê dados a partir do stream até que separator seja encontrado.
Ao ter sucesso, os dados e o separador serão removidos do buffer interno (consumido). Dados retornados irão incluir o separador no final.
Se a quantidade de dados lidos excede o limite configurado para o stream, uma exceção
LimitOverrunError
é levantada, e os dados são deixados no buffer interno e podem ser lidos novamente.Se EOF for atingido antes que o separador completo seja encontrado, uma exceção
IncompleteReadError
é levantada, e o buffer interno é resetado. O atributoIncompleteReadError.partial
pode conter uma parte do separador.Novo na versão 3.5.2.
-
at_eof
()¶ Retorna
True
se o buffer estiver vazio efeed_eof()
foi chamado.
-
coroutine
StreamWriter¶
-
class
asyncio.
StreamWriter
¶ Representa um objeto de escrita que fornece APIs para escrever dados para o stream de IO.
Não é recomendado instanciar objetos StreamWriter diretamente; use
open_connection()
estart_server()
ao invés.-
write
(data)¶ O método tenta escrever data para o soquete subjacente imediatamente. Se isso falhar, data é enfileirado em um buffer interno de escrita, até que possa ser enviado.
O método deve ser usado juntamente com o método
drain()
:stream.write(data) await stream.drain()
-
writelines
(data)¶ O método escreve imediatamente a lista (ou qualquer iterável) de bytes para o soquete subjacente. Se isso falhar, os dados são enfileirados em um buffer de escrita interno até que possam ser enviados.
O método deve ser usado juntamente com o método
drain()
:stream.writelines(lines) await stream.drain()
-
close
()¶ O método fecha o stream e o soquete subjacente.
The method should be used, though not mandatory, along with the
wait_closed()
method:stream.close() await stream.wait_closed()
-
can_write_eof
()¶ Retorna
True
se o transporte subjacente suporta o métodowrite_eof()
,False
caso contrário.
-
write_eof
()¶ Fecha o extremo de escrita do stream após os dados no buffer de escrita terem sido descarregados.
-
transport
¶ Retorna o transporte asyncio subjacente.
-
get_extra_info
(name, default=None)¶ Acessa informações de transporte opcionais; veja
BaseTransport.get_extra_info()
para detalhes.
-
coroutine
drain
()¶ Aguarda até que seja apropriado continuar escrevendo no stream. Exemplo:
writer.write(data) await writer.drain()
Este é um método de controle de fluxo que interage com o buffer de entrada e saída de escrita subjacente. Quando o tamanho do buffer atinge a marca d’agua alta, drain() bloqueia até que o tamanho do buffer seja drenado para a marca d’água baixa, e a escrita possa continuar. Quando não existe nada que cause uma espera, o método
drain()
retorna imediatamente.
-
is_closing
()¶ Retorna
True
se o stream estiver fechado ou em processo de ser fechado.Novo na versão 3.7.
-
Exemplos¶
Cliente para eco TCP usando streams¶
Cliente de eco TCP usando a função asyncio.open_connection()
:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
Ver também
O exemplo de protocolo do cliente para eco TCP usa o método de baixo nível loop.create_connection()
.
Servidor eco TCP usando streams¶
Servidor eco TCP usando a função asyncio.start_server()
:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
asyncio.run(main())
Ver também
O exemplo de protocolo eco de servidor TCP utiliza o método loop.create_server()
.
Obtém headers HTTP¶
Exemplo simples consultando cabeçalhos HTTP da URL passada na linha de comando:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
await writer.wait_closed()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
Uso:
python example.py http://example.com/path/page.html
ou com HTTPS:
python example.py https://example.com/path/page.html
Registra um soquete aberto para aguardar por dados usando streams¶
Corrotina aguardando até que um soquete receba dados usando a função open_connection()
:
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
await writer.wait_closed()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
Ver também
O exemplo de registro de um soquete aberto para aguardar por dados usando um protocolo utiliza um protocolo de baixo nível e o método loop.create_connection()
.
O exemplo para monitorar um descritor de arquivo para leitura de eventos utiliza o método de baixo nível loop.add_reader()
para monitorar um descritor de arquivo.