Flux (streams)

Code source : Lib/asyncore.py


Les flux sont des primitives de haut niveau compatibles avec async/await pour utiliser les connexions réseau. Les flux permettent d'envoyer et de recevoir des données sans utiliser de fonctions de rappel ou des protocoles de bas niveau.

Voici un exemple de client « écho TCP » écrit en utilisant les flux asyncio :

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

Voir également la section Exemples ci-dessous.

Fonctions de flux

Les fonctions asyncio de haut niveau suivantes peuvent être utilisées pour créer et utiliser des flux :

coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)

Établit une connexion réseau et renvoie une paire d'objets (lecteur, écrivain).

Les objets lecteur et écrivain renvoyés sont des instances des classes StreamReader et StreamWriter.

limit détermine la limite de taille de tampon utilisée par l'instance StreamReader renvoyée. Par défaut, limit est fixée à 64 Kio.

Le reste des arguments est passé directement à loop.create_connection().

Note

l'argument sock transfère la propriété du connecteur réseau au StreamWriter créé. Pour fermer le connecteur, appelez sa méthode close().

Modifié dans la version 3.7: ajout du paramètre ssl_handshake_timeout.

Modifié dans la version 3.8: Added the happy_eyeballs_delay and interleave parameters.

Modifié dans la version 3.10: suppression du paramètre loop.

Modifié dans la version 3.11: ajout du paramètre ssl_shutdown_timeout.

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, keep_alive=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

Démarre un serveur de connexions

La fonction de rappel client_connected_cb est appelée chaque fois qu'une nouvelle connexion client est établie. Elle reçoit une paire d'arguments (lecteur, écrivain), instances des classes StreamReader et StreamWriter.

client_connected_cb peut être un simple appelable ou une fonction coroutine ; s'il s'agit d'une fonction coroutine, elle sera automatiquement planifiée en tant que Task.

limit détermine la limite de taille de tampon utilisée par l'instance StreamReader renvoyée. Par défaut, limit est fixée à 64 Kio.

Le reste des arguments est passé directement à loop.create_server().

Note

l'argument sock transfère la propriété du connecteur au serveur créé. Pour fermer le connecteur, appelez la méthode close() du serveur.

Modifié dans la version 3.7: ajout des paramètres ssl_handshake_timeout et start_serving.

Modifié dans la version 3.10: suppression du paramètre loop.

Modifié dans la version 3.11: ajout du paramètre ssl_shutdown_timeout.

Modifié dans la version 3.13: Added the keep_alive parameter.

Connecteurs Unix (sockets)

coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

Ouvre un connecteur Unix et renvoie une paire de (lecteur, écrivain).

Similaire à open_connection() mais fonctionne sur les connecteurs Unix.

Voir aussi la documentation de loop.create_unix_connection().

Note

l'argument sock transfère la propriété du connecteur réseau au StreamWriter créé. Pour fermer le connecteur, appelez sa méthode close().

Availability: Unix.

Modifié dans la version 3.7: ajout du paramètre ssl_handshake_timeout. Le paramètre path peut désormais être un objet simili-chemin

Modifié dans la version 3.10: suppression du paramètre loop.

Modifié dans la version 3.11: ajout du paramètre ssl_shutdown_timeout.

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

Démarre un connecteur Unix en mode serveur.

Similaire à start_server() mais fonctionne avec les connecteurs Unix.

Voir aussi la documentation de loop.create_unix_server().

Note

l'argument sock transfère la propriété du connecteur au serveur créé. Pour fermer le connecteur, appelez la méthode close() du serveur.

Availability: Unix.

Modifié dans la version 3.7: ajout des paramètres ssl_handshake_timeout et start_serving. Le paramètre chemin peut désormais être un simili-chemin.

Modifié dans la version 3.10: suppression du paramètre loop.

Modifié dans la version 3.11: ajout du paramètre ssl_shutdown_timeout.

Flux lecteurs (StreamReader)

class asyncio.StreamReader

Représente un objet lecteur qui fournit des API pour lire les données du flux d'entrée-sortie. En tant que itérable asynchrone, l'objet prend en charge l'instruction async for.

Il n'est pas recommandé d'instancier directement les objets StreamReader ; utilisez open_connection() et start_server() à la place.

feed_eof()

Acknowledge the EOF.

coroutine read(n=-1)

Lit jusqu'à n octets du flux.

Si n n'est pas fourni ou défini à -1, lit jusqu'à EOF, puis renvoie tous les bytes lus. Si EOF a été reçu et que le tampon interne est vide, renvoie un objet bytes vide.

Si n vaut 0, renvoie immédiatement un objet bytes vide.

Si n est positif, renvoie au plus n bytes disponibles dès qu'au moins 1 octet est disponible dans le tampon interne. Si EOF est reçu avant qu'aucun octet ne soit lu, renvoie un objet bytes vide.

coroutine readline()

Lit une ligne, où une « ligne » est une séquence d'octets se terminant par \n.

Si EOF est reçu et \n n'a pas été trouvé, la méthode renvoie des données partiellement lues.

Si EOF est reçu et que le tampon interne est vide, renvoie un objet bytes vide.

coroutine readexactly(n)

Lit exactement n octets.

Lève une IncompleteReadError si EOF est atteint avant que n octets ne puissent être lus. Utilisez l'attribut IncompleteReadError.partial pour obtenir les données partiellement lues.

coroutine readuntil(separator=b'\n')

Lit les données du flux jusqu'à ce que le separator soit trouvé.

En cas de succès, les données et le séparateur sont supprimés du tampon interne (consommés). Les données renvoyées incluent le séparateur à la fin.

Si la quantité de données lues dépasse la limite de flux configurée, une exception LimitOverrunError est levée et les données sont laissées dans le tampon interne et peuvent être lues à nouveau.

Si EOF est atteint avant que le séparateur complet ne soit trouvé, une exception IncompleteReadError est levée et le tampon interne est réinitialisé. L'attribut IncompleteReadError.partial peut contenir une partie du séparateur.

The separator may also be a tuple of separators. In this case the return value will be the shortest possible that has any separator as the suffix. For the purposes of LimitOverrunError, the shortest possible separator is considered to be the one that matched.

Ajouté dans la version 3.5.2.

Modifié dans la version 3.13: The separator parameter may now be a tuple of separators.

at_eof()

Renvoie True si le tampon est vide et que feed_eof() a été appelée.

Flux écrivains (StreamWriter)

class asyncio.StreamWriter

Représente un objet écrivain qui fournit des API pour écrire des données dans le flux d'entrée-sortie.

Il n'est pas recommandé d'instancier directement les objets StreamWriter ; utilisez open_connection() et start_server() à la place.

write(data)

La méthode tente d'écrire immédiatement les data dans le connecteur sous-jacent. Si cela échoue, les données sont mises en file d'attente dans un tampon d'écriture interne jusqu'à ce qu'elles puissent être envoyées.

La méthode doit être utilisée avec la méthode drain() :

stream.write(data)
await stream.drain()
writelines(data)

La méthode écrit immédiatement une liste (ou tout itérable) d'octets dans le connecteur sous-jacent. Si cela échoue, les données sont mises en file d'attente dans un tampon d'écriture interne jusqu'à ce qu'elles puissent être envoyées.

La méthode doit être utilisée avec la méthode drain() :

stream.writelines(lines)
await stream.drain()
close()

La méthode ferme le flux et le connecteur sous-jacent.

La méthode doit être utilisée, bien que ce ne soit pas obligatoire, avec la méthode wait_closed() :

stream.close()
await stream.wait_closed()
can_write_eof()

Renvoie True si le transport sous-jacent gère la méthode write_eof(), False sinon.

write_eof()

Ferme le flux en écriture après le vidage des données d'écriture en mémoire tampon.

transport

Renvoie le transport asynchrone sous-jacent.

get_extra_info(name, default=None)

Donne accès aux informations de transport facultatives ; voir BaseTransport.get_extra_info() pour plus de détails.

coroutine drain()

Attend qu'il soit approprié de reprendre l'écriture dans le flux. Par exemple :

writer.write(data)
await writer.drain()

Il s'agit d'une méthode de contrôle de flux qui interagit avec le tampon d'écriture entrée-sortie sous-jacent. Lorsque la taille du tampon atteint la limite haute, drain() bloque jusqu'à ce que la taille du tampon soit drainée jusqu'à la limite basse et que l'écriture puisse reprendre. Lorsqu'il n'y a rien à attendre, drain() termine immédiatement.

coroutine start_tls(sslcontext, *, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

Bascule la connexion basée sur le flux existant vers TLS.

Paramètres :

  • sslcontext : une instance configurée de SSLContext.

  • server_hostname : définit ou remplace le nom d'hôte auquel le certificat du serveur cible sera comparé.

  • ssl_handshake_timeout est le temps en secondes à attendre pour que la poignée de main TLS se termine avant d'abandonner la connexion. 60.0 secondes si None (par défaut).

  • ssl_shutdown_timeout is the time in seconds to wait for the SSL shutdown to complete before aborting the connection. 30.0 seconds if None (default).

Ajouté dans la version 3.11.

Modifié dans la version 3.12: ajout du paramètre ssl_shutdown_timeout.

is_closing()

Renvoie True si le flux est fermé ou en cours de fermeture.

Ajouté dans la version 3.7.

coroutine wait_closed()

Attend que le flux soit fermé.

Doit être appelée après close() pour attendre que la connexion sous-jacente soit fermée, en s'assurant que toutes les données ont été vidées avant, par exemple, de quitter le programme.

Ajouté dans la version 3.7.

Exemples

Client d'écho TCP utilisant des flux

Client d'écho TCP utilisant la fonction asyncio.open_connection() :

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

Voir aussi

L'exemple Client écho en TCP utilise la méthode de bas niveau loop.create_connection().

Serveur d'écho TCP utilisant des flux

Serveur d'écho TCP utilisant la fonction asyncio.start_server() :

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

Voir aussi

L'exemple de Serveur écho en TCP utilise la méthode loop.create_server().

Récupération des en-têtes HTTP

Exemple simple d'interrogation des en-têtes HTTP de l'URL transmise sur la ligne de commande :

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()
    await writer.wait_closed()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Utilisation :

python example.py http://example.com/path/page.html

ou avec HTTPS :

python example.py https://example.com/path/page.html

Ouverture d'un connecteur pour attendre les données à l'aide de flux

Coroutine attendant qu'un connecteur reçoive des données en utilisant la fonction open_connection() :

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()
    await writer.wait_closed()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

Voir aussi

L'exemple Connexion de connecteurs existants utilise un protocole de bas niveau et la méthode loop.create_connection().

L'exemple Surveillance des événements de lecture pour un descripteur de fichier utilise la méthode de bas niveau loop.add_reader() pour surveiller un descripteur de fichier.