Streams¶
Código fuente: Lib/asyncio/streams.py
Los streams son async/await primitivos de alto nivel para trabajar con conexiones de red. Los streams permiten enviar y recibir datos sin utilizar callbacks o protocolos y transportes de bajo nivel.
Aquí hay un ejemplo de un cliente eco TCP escrito utilizando streams asyncio:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
Consulte también la sección de Examples a continuación.
Funciones stream
Las siguientes funciones asyncio de nivel superior se pueden utilizar para crear y trabajar con streams:
-
coroutine
asyncio.
open_connection
(host=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None)¶ Establece una conexión de red y retorna un par de objetos
(reader, writer)
.Los objetos retornados reader y writer son instancias de las clases
StreamReader
yStreamWriter
.El argumento loop es opcional y siempre se puede determinar automáticamente cuando se espera esta función de una corrutina.
limit determina el límite de tamaño del búfer utilizado por la instancia de
StreamReader
retornada. De forma predeterminada, limit está establecido en 64 KiB.El resto de los argumentos se pasan directamente a
loop.create_connection()
.Nuevo en la versión 3.7: El parámetro ssl_handshake_timeout.
-
coroutine
asyncio.
start_server
(client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)¶ Inicia un servidor socket.
La retrollamada client_connected_cb se llama siempre que se establece una nueva conexión de cliente. Recibe un par
(reader, writer)
como dos argumentos, instancias de las clasesStreamReader
yStreamWriter
.client_connected_cb puede ser una función simple invocable o de corrutina; si es una función de corrutina, se programará automáticamente como un
Task
.El argumento loop es opcional y siempre se puede determinar automáticamente cuando se espera este método de una corrutina.
limit determina el límite de tamaño del búfer utilizado por la instancia de
StreamReader
retornada. De forma predeterminada, limit está establecido en 64 KiB.El resto de los argumentos se pasan directamente a
loop.create_server()
.Nuevo en la versión 3.7: Los parámetros ssl_handshake_timeout y start_serving.
Sockets Unix
-
coroutine
asyncio.
open_unix_connection
(path=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)¶ Establece una conexión de socket Unix y retorna un par de
(reader, writer)
.Similar a
open_connection()
pero opera en sockets Unix.Consulte también la documentación de
loop.create_unix_connection()
.Disponibilidad: Unix.
Nuevo en la versión 3.7: El parámetro ssl_handshake_timeout.
Distinto en la versión 3.7: El parámetro path ahora puede ser un objeto similar a una ruta (path-like object)
-
coroutine
asyncio.
start_unix_server
(client_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)¶ Inicia un servidor socket Unix.
Similar a
start_server()
pero funciona con sockets Unix.Consulte también la documentación de
loop.create_unix_server()
.Disponibilidad: Unix.
Nuevo en la versión 3.7: Los parámetros ssl_handshake_timeout y start_serving.
Distinto en la versión 3.7: El parámetro path ahora puede ser un objeto similar a una ruta (path-like object).
StreamReader¶
-
class
asyncio.
StreamReader
¶ Representa un objeto lector que proporciona APIs para leer datos del flujo de E/S.
No se recomienda crear instancias de objetos StreamReader directamente; utilice
open_connection()
ystart_server()
en su lugar.-
coroutine
read
(n=-1)¶ Lee hasta n bytes. Si no se proporciona n, o se establece en
-1
, lee hasta EOF (final del archivo) y retorna todos los bytes leídos.Si se recibió EOF (final del archivo) y el búfer interno está vacío, retorna un objeto de
bytes
vacío.
-
coroutine
readline
()¶ Lea una línea, donde «línea» es una secuencia de bytes que termina en
\n
.Si se recibe EOF (final del archivo) y no se encontró
\n
, el método retorna datos leídos parcialmente.Si se recibe EOF (final de archivo) y el búfer interno está vacío, retorna un objeto de
bytes
vacío.
-
coroutine
readexactly
(n)¶ Lee exactamente n bytes.
Genera un
IncompleteReadError
si se alcanza EOF (final del archivo) antes de que se pueda leer n. Utilice el atributoIncompleteReadError.partial
para obtener los datos leídos parcialmente.
-
coroutine
readuntil
(separator=b'\n')¶ Lee datos de la secuencia hasta que se encuentre el separador (separator).
En caso de éxito, los datos y el separador se eliminarán del búfer interno (consumido). Los datos retornados incluirán el separador al final.
Si la cantidad de datos leídos excede el límite de flujo configurado, se genera una excepción
LimitOverrunError
y los datos se dejan en el búfer interno y se pueden leer nuevamente.Si se alcanza EOF (final del archivo) antes de que se encuentre el separador completo, se genera una excepción
IncompleteReadError
y se restablece el búfer interno. El atributoIncompleteReadError.partial
puede contener una parte del separador.Nuevo en la versión 3.5.2.
-
at_eof
()¶ Retorna
True
si el buffer está vacío yfeed_eof()
fue llamado.
-
coroutine
StreamWriter¶
-
class
asyncio.
StreamWriter
¶ Representa un objeto de escritura que proporciona APIs para escribir datos en el flujo de E/S.
No se recomienda crear instancias de objetos StreamWriter directamente; use
open_connection()
ystart_server()
en su lugar.-
write
(data)¶ El método intenta escribir los datos (data) en el socket subyacente inmediatamente. Si eso falla, los datos se ponen en cola en un búfer de escritura interno hasta que se puedan enviar.
El método debe usarse junto con el método
drain()
:stream.write(data) await stream.drain()
-
writelines
(data)¶ El método escribe una lista (o cualquier iterable) de bytes en el socket subyacente inmediatamente. Si eso falla, los datos se ponen en cola en un búfer de escritura interno hasta que se puedan enviar.
El método debe usarse junto con el método
drain()
:stream.writelines(lines) await stream.drain()
-
close
()¶ El método cierra la secuencia y el socket subyacente.
El método debe usarse junto con el método
wait_closed()
:stream.close() await stream.wait_closed()
-
can_write_eof
()¶ Retorna
True
si el transporte subyacente admite el métodowrite_eof()
,False
en caso contrario.
-
write_eof
()¶ Cierra la escritura de la secuencia después de que se vacíen los datos de escritura almacenados en búfer.
-
transport
¶ Retorna el transporte asyncio subyacente.
-
get_extra_info
(name, default=None)¶ Accede a información de transporte opcional; consulte
BaseTransport.get_extra_info()
para obtener más detalles.
-
coroutine
drain
()¶ Espera hasta que sea apropiado reanudar la escritura en la transmisión. Ejemplo:
writer.write(data) await writer.drain()
Este es un método de control de flujo que interactúa con el búfer de escritura de E/S subyacente. Cuando el tamaño del búfer alcanza la marca de agua alta, drain() bloquea hasta que el tamaño del búfer se agota hasta la marca de agua baja y se pueda reanudar la escritura. Cuando no hay nada que esperar,
drain()
regresa inmediatamente.
-
is_closing
()¶ Retorna
True
si la secuencia está cerrada o en proceso de cerrarse.Nuevo en la versión 3.7.
-
Ejemplos¶
Cliente eco TCP usando streams¶
Cliente eco TCP usando la función asyncio.open_connection()
:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
asyncio.run(tcp_echo_client('Hello World!'))
Ver también
El ejemplo del protocolo de cliente eco TCP utiliza el método loop.create_connection()
de bajo nivel.
Servidor eco TCP usando streams¶
Servidor eco TCP usando la función asyncio.start_server()
:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()
asyncio.run(main())
Ver también
El ejemplo del protocolo de servidor eco TCP utiliza el método loop.create_server()
.
Obtener encabezados HTTP¶
Ejemplo simple de consulta de encabezados HTTP de la URL pasada en la línea de comando:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
Uso:
python example.py http://example.com/path/page.html
o con HTTPS:
python example.py https://example.com/path/page.html
Registrar un socket abierto para esperar datos usando streams¶
Corutina esperando hasta que un socket reciba datos usando la función open_connection()
function:
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
Ver también
El ejemplo de registro de un socket abierto para esperar datos usando un protocolo utiliza un protocolo de bajo nivel y el método loop.create_connection()
.
El ejemplo de observar un descriptor de archivo para leer eventos utiliza el método loop.add_reader()
de bajo nivel para ver un descriptor de archivo.