Flux (streams)¶
Code source : Lib/asyncore.py
Les flux sont des primitives de haut niveau compatibles avec async/await pour utiliser les connexions réseau. Les flux permettent d'envoyer et de recevoir des données sans utiliser de fonctions de rappel ou des protocoles de bas niveau.
Voici un exemple de client « écho TCP » écrit en utilisant les flux asyncio :
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
Voir également la section Exemples ci-dessous.
Fonctions de flux
Les fonctions asyncio de haut niveau suivantes peuvent être utilisées pour créer et utiliser des flux :
- coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)¶
Établit une connexion réseau et renvoie une paire d'objets
(lecteur, écrivain)
.Les objets lecteur et écrivain renvoyés sont des instances des classes
StreamReader
etStreamWriter
.limit détermine la limite de taille de tampon utilisée par l'instance
StreamReader
renvoyée. Par défaut, limit est fixée à 64 Kio.Le reste des arguments est passé directement à
loop.create_connection()
.Note
l'argument sock transfère la propriété du connecteur réseau au
StreamWriter
créé. Pour fermer le connecteur, appelez sa méthodeclose()
.Modifié dans la version 3.7: ajout du paramètre ssl_handshake_timeout.
Modifié dans la version 3.8: Added the happy_eyeballs_delay and interleave parameters.
Modifié dans la version 3.10: suppression du paramètre loop.
Modifié dans la version 3.11: ajout du paramètre ssl_shutdown_timeout.
- coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, keep_alive=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
Démarre un serveur de connexions
La fonction de rappel client_connected_cb est appelée chaque fois qu'une nouvelle connexion client est établie. Elle reçoit une paire d'arguments
(lecteur, écrivain)
, instances des classesStreamReader
etStreamWriter
.client_connected_cb peut être un simple appelable ou une fonction coroutine ; s'il s'agit d'une fonction coroutine, elle sera automatiquement planifiée en tant que
Task
.limit détermine la limite de taille de tampon utilisée par l'instance
StreamReader
renvoyée. Par défaut, limit est fixée à 64 Kio.Le reste des arguments est passé directement à
loop.create_server()
.Note
l'argument sock transfère la propriété du connecteur au serveur créé. Pour fermer le connecteur, appelez la méthode
close()
du serveur.Modifié dans la version 3.7: ajout des paramètres ssl_handshake_timeout et start_serving.
Modifié dans la version 3.10: suppression du paramètre loop.
Modifié dans la version 3.11: ajout du paramètre ssl_shutdown_timeout.
Modifié dans la version 3.13: Added the keep_alive parameter.
Connecteurs Unix (sockets)
- coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
Ouvre un connecteur Unix et renvoie une paire de
(lecteur, écrivain)
.Similaire à
open_connection()
mais fonctionne sur les connecteurs Unix.Voir aussi la documentation de
loop.create_unix_connection()
.Note
l'argument sock transfère la propriété du connecteur réseau au
StreamWriter
créé. Pour fermer le connecteur, appelez sa méthodeclose()
.Availability: Unix.
Modifié dans la version 3.7: ajout du paramètre ssl_handshake_timeout. Le paramètre path peut désormais être un objet simili-chemin
Modifié dans la version 3.10: suppression du paramètre loop.
Modifié dans la version 3.11: ajout du paramètre ssl_shutdown_timeout.
- coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
Démarre un connecteur Unix en mode serveur.
Similaire à
start_server()
mais fonctionne avec les connecteurs Unix.Voir aussi la documentation de
loop.create_unix_server()
.Note
l'argument sock transfère la propriété du connecteur au serveur créé. Pour fermer le connecteur, appelez la méthode
close()
du serveur.Availability: Unix.
Modifié dans la version 3.7: ajout des paramètres ssl_handshake_timeout et start_serving. Le paramètre chemin peut désormais être un simili-chemin.
Modifié dans la version 3.10: suppression du paramètre loop.
Modifié dans la version 3.11: ajout du paramètre ssl_shutdown_timeout.
Flux lecteurs (StreamReader)¶
- class asyncio.StreamReader¶
Représente un objet lecteur qui fournit des API pour lire les données du flux d'entrée-sortie. En tant que itérable asynchrone, l'objet prend en charge l'instruction
async for
.Il n'est pas recommandé d'instancier directement les objets StreamReader ; utilisez
open_connection()
etstart_server()
à la place.- feed_eof()¶
Acknowledge the EOF.
- coroutine read(n=-1)¶
Lit jusqu'à n octets du flux.
Si n n'est pas fourni ou défini à
-1
, lit jusqu'à EOF, puis renvoie tous lesbytes
lus. Si EOF a été reçu et que le tampon interne est vide, renvoie un objetbytes
vide.Si n vaut
0
, renvoie immédiatement un objetbytes
vide.Si n est positif, renvoie au plus n
bytes
disponibles dès qu'au moins 1 octet est disponible dans le tampon interne. Si EOF est reçu avant qu'aucun octet ne soit lu, renvoie un objetbytes
vide.
- coroutine readline()¶
Lit une ligne, où une « ligne » est une séquence d'octets se terminant par
\n
.Si EOF est reçu et
\n
n'a pas été trouvé, la méthode renvoie des données partiellement lues.Si EOF est reçu et que le tampon interne est vide, renvoie un objet
bytes
vide.
- coroutine readexactly(n)¶
Lit exactement n octets.
Lève une
IncompleteReadError
si EOF est atteint avant que n octets ne puissent être lus. Utilisez l'attributIncompleteReadError.partial
pour obtenir les données partiellement lues.
- coroutine readuntil(separator=b'\n')¶
Lit les données du flux jusqu'à ce que le separator soit trouvé.
En cas de succès, les données et le séparateur sont supprimés du tampon interne (consommés). Les données renvoyées incluent le séparateur à la fin.
Si la quantité de données lues dépasse la limite de flux configurée, une exception
LimitOverrunError
est levée et les données sont laissées dans le tampon interne et peuvent être lues à nouveau.Si EOF est atteint avant que le séparateur complet ne soit trouvé, une exception
IncompleteReadError
est levée et le tampon interne est réinitialisé. L'attributIncompleteReadError.partial
peut contenir une partie du séparateur.The separator may also be a tuple of separators. In this case the return value will be the shortest possible that has any separator as the suffix. For the purposes of
LimitOverrunError
, the shortest possible separator is considered to be the one that matched.Ajouté dans la version 3.5.2.
Modifié dans la version 3.13: The separator parameter may now be a
tuple
of separators.
- at_eof()¶
Renvoie
True
si le tampon est vide et quefeed_eof()
a été appelée.
Flux écrivains (StreamWriter)¶
- class asyncio.StreamWriter¶
Représente un objet écrivain qui fournit des API pour écrire des données dans le flux d'entrée-sortie.
Il n'est pas recommandé d'instancier directement les objets StreamWriter ; utilisez
open_connection()
etstart_server()
à la place.- write(data)¶
La méthode tente d'écrire immédiatement les data dans le connecteur sous-jacent. Si cela échoue, les données sont mises en file d'attente dans un tampon d'écriture interne jusqu'à ce qu'elles puissent être envoyées.
La méthode doit être utilisée avec la méthode
drain()
:stream.write(data) await stream.drain()
- writelines(data)¶
La méthode écrit immédiatement une liste (ou tout itérable) d'octets dans le connecteur sous-jacent. Si cela échoue, les données sont mises en file d'attente dans un tampon d'écriture interne jusqu'à ce qu'elles puissent être envoyées.
La méthode doit être utilisée avec la méthode
drain()
:stream.writelines(lines) await stream.drain()
- close()¶
La méthode ferme le flux et le connecteur sous-jacent.
La méthode doit être utilisée, bien que ce ne soit pas obligatoire, avec la méthode
wait_closed()
:stream.close() await stream.wait_closed()
- can_write_eof()¶
Renvoie
True
si le transport sous-jacent gère la méthodewrite_eof()
,False
sinon.
- write_eof()¶
Ferme le flux en écriture après le vidage des données d'écriture en mémoire tampon.
- transport¶
Renvoie le transport asynchrone sous-jacent.
- get_extra_info(name, default=None)¶
Donne accès aux informations de transport facultatives ; voir
BaseTransport.get_extra_info()
pour plus de détails.
- coroutine drain()¶
Attend qu'il soit approprié de reprendre l'écriture dans le flux. Par exemple :
writer.write(data) await writer.drain()
Il s'agit d'une méthode de contrôle de flux qui interagit avec le tampon d'écriture entrée-sortie sous-jacent. Lorsque la taille du tampon atteint la limite haute, drain() bloque jusqu'à ce que la taille du tampon soit drainée jusqu'à la limite basse et que l'écriture puisse reprendre. Lorsqu'il n'y a rien à attendre,
drain()
termine immédiatement.
- coroutine start_tls(sslcontext, *, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
Bascule la connexion basée sur le flux existant vers TLS.
Paramètres :
sslcontext : une instance configurée de
SSLContext
.server_hostname : définit ou remplace le nom d'hôte auquel le certificat du serveur cible sera comparé.
ssl_handshake_timeout est le temps en secondes à attendre pour que la poignée de main TLS se termine avant d'abandonner la connexion.
60.0
secondes siNone
(par défaut).ssl_shutdown_timeout is the time in seconds to wait for the SSL shutdown to complete before aborting the connection.
30.0
seconds ifNone
(default).
Ajouté dans la version 3.11.
Modifié dans la version 3.12: ajout du paramètre ssl_shutdown_timeout.
- is_closing()¶
Renvoie
True
si le flux est fermé ou en cours de fermeture.Ajouté dans la version 3.7.
Exemples¶
Client d'écho TCP utilisant des flux¶
Client d'écho TCP utilisant la fonction asyncio.open_connection()
:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
Voir aussi
L'exemple Client écho en TCP utilise la méthode de bas niveau loop.create_connection()
.
Serveur d'écho TCP utilisant des flux¶
Serveur d'écho TCP utilisant la fonction asyncio.start_server()
:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
asyncio.run(main())
Voir aussi
L'exemple de Serveur écho en TCP utilise la méthode loop.create_server()
.
Récupération des en-têtes HTTP¶
Exemple simple d'interrogation des en-têtes HTTP de l'URL transmise sur la ligne de commande :
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
await writer.wait_closed()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
Utilisation :
python example.py http://example.com/path/page.html
ou avec HTTPS :
python example.py https://example.com/path/page.html
Ouverture d'un connecteur pour attendre les données à l'aide de flux¶
Coroutine attendant qu'un connecteur reçoive des données en utilisant la fonction open_connection()
:
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
await writer.wait_closed()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
Voir aussi
L'exemple Connexion de connecteurs existants utilise un protocole de bas niveau et la méthode loop.create_connection()
.
L'exemple Surveillance des événements de lecture pour un descripteur de fichier utilise la méthode de bas niveau loop.add_reader()
pour surveiller un descripteur de fichier.