Transports et Protocoles

Préface

Les transports et les protocoles sont utilisés par les API de boucle d'événements de bas niveau telles que loop.create_connection(). Ils utilisent un style de programmation basé sur les fonctions de rappel et permettent des implémentations hautes performances de protocoles réseau (par exemple, HTTP) ou de communication inter-processus (IPC).

Avant tout, les transports et les protocoles ne doivent être utilisés que dans des bibliothèques et des cadriciels et jamais dans des applications asynchrones de haut niveau.

Cette page de documentation couvre à la fois Transports et Protocoles.

Introduction

Au plus haut niveau, le transport concerne comment les octets sont transmis, tandis que le protocole détermine quels octets transmettre (et dans une certaine mesure quand).

Pour l'écrire autrement : un transport est une abstraction pour un connecteur (socket ou tout autre point de terminaison d'entrée-sortie) tandis qu'un protocole est une abstraction pour une application, du point de vue du transport.

Encore une autre vue est que les interfaces de transport et de protocole définissent ensemble une interface abstraite pour utiliser les entrées-sorties réseau et les entrées-sorties inter-processus.

Il existe toujours une relation 1:1 entre les objets de transport et de protocole : le protocole appelle les méthodes de transport pour envoyer des données, tandis que le transport appelle les méthodes de protocole pour lui transmettre les données reçues.

La plupart des méthodes de boucles d'événements orientées connexion (telles que loop.create_connection()) acceptent généralement un argument protocol_factory utilisé pour créer un objet Protocol pour une connexion acceptée, représentée par un objet Transport. De telles méthodes renvoient généralement un n-uplet (transport, protocol).

Sommaire

Cette page de documentation contient les sections suivantes :

Transports

Code source : Lib/asyncio/transports.py


Les transports sont des classes fournies par asyncio afin d'abstraire différents types de canaux de communication.

Les objets de transport sont toujours instanciés par une boucle d'événements asyncio.

asyncio implémente les transports pour TCP, UDP, SSL et les tubes de sous-processus. Les méthodes disponibles sur un transport dépendent du type de transport.

Les classes de transport ne sont pas compatibles avec les fils d'exécution multiples.

Hiérarchie des transports

class asyncio.BaseTransport

Classe de base pour tous les transports. Contient des méthodes partagées par tous les transports asyncio.

class asyncio.WriteTransport(BaseTransport)

Transport de base pour les connexions en écriture seule.

Les instances de la classe WriteTransport sont renvoyées par la méthode de boucle d'événements loop.connect_write_pipe() et sont également utilisées par les méthodes liées aux sous-processus comme loop.subprocess_exec().

class asyncio.ReadTransport(BaseTransport)

Transport de base pour les connexions en lecture seule.

Les instances de la classe ReadTransport sont renvoyées par la méthode de boucle d'événements loop.connect_read_pipe() et sont également utilisées par les méthodes liées aux sous-processus comme loop.subprocess_exec().

class asyncio.Transport(WriteTransport, ReadTransport)

Interface représentant un transport bidirectionnel, comme une connexion TCP.

L'utilisateur n'instancie pas un transport directement ; il appelle une fonction utilitaire, lui transmettant une fabrique de protocoles et d'autres informations nécessaires pour créer le transport et le protocole.

Les instances de la classe Transport sont renvoyées ou utilisées par des méthodes de boucle d'événements comme loop.create_connection(), loop.create_unix_connection(), loop.create_server(), loop.sendfile(), etc.

class asyncio.DatagramTransport(BaseTransport)

Transport pour les connexions par datagrammes (UDP).

Les instances de la classe DatagramTransport sont renvoyées par la méthode de boucle d'événements loop.create_datagram_endpoint().

class asyncio.SubprocessTransport(BaseTransport)

Abstraction pour représenter une connexion entre un parent et son processus enfant au niveau du système d'exploitation.

Les instances de la classe SubprocessTransport sont renvoyées par les méthodes de boucle d'événements loop.subprocess_shell() et loop.subprocess_exec().

Classe de base des Transports

BaseTransport.close()

Ferme le transport.

Si le transport a une mémoire tampon pour les données sortantes, les données mises en mémoire tampon seront vidées de manière asynchrone. Aucune autre donnée ne sera reçue. Une fois que toutes les données mises en mémoire tampon ont été vidées, la méthode protocol.connection_lost() sera appelée avec None comme argument. Le transport ne doit pas être utilisé une fois qu'il est fermé.

BaseTransport.is_closing()

Renvoie True si le transport se ferme ou est fermé.

BaseTransport.get_extra_info(name, default=None)

Renvoie des informations sur le transport ou les ressources sous-jacentes qu'il utilise.

name est une chaîne représentant l'information spécifique au transport à obtenir.

default est la valeur à renvoyer si les informations ne sont pas disponibles ou si le transport ne prend pas en charge l'implémentation de boucle d'événements tierce donnée ou la plateforme actuelle.

Par exemple, le code suivant tente d'obtenir l'objet socket sous-jacent du transport :

sock = transport.get_extra_info('socket')
if sock is not None:
    print(sock.getsockopt(...))

Catégories d'informations pouvant être interrogées sur certains transports :

BaseTransport.set_protocol(protocol)

Change le protocole.

La commutation de protocole ne doit être effectuée que lorsque les deux protocoles sont documentés pour prendre en charge la commutation.

BaseTransport.get_protocol()

Renvoie le protocole courant.

Transports en lecture seule

ReadTransport.is_reading()

Renvoie True si le transport reçoit de nouvelles données.

Ajouté dans la version 3.7.

ReadTransport.pause_reading()

Met en pause l'extrémité de réception du transport. Aucune donnée ne sera transmise à la méthode protocol.data_received() du protocole jusqu'à ce que resume_reading() soit appelée.

Modifié dans la version 3.7: la méthode est idempotente, c'est-à-dire qu'elle peut être appelée lorsque le transport est déjà en pause ou fermé.

ReadTransport.resume_reading()

Reprend la réception. La méthode protocol.data_received() du protocole sera appelée à nouveau si certaines données sont disponibles pour la lecture.

Modifié dans la version 3.7: la méthode est idempotente, c'est-à-dire qu'elle peut être appelée alors que le transport est déjà en train de lire.

Transports en lecture-écriture

WriteTransport.abort()

Ferme le transport immédiatement, sans attendre la fin des opérations en attente. Les données mises en mémoire tampon sont perdues. Aucune autre donnée ne sera reçue. La méthode protocol.connection_lost() du protocole sera éventuellement appelée avec None comme argument.

WriteTransport.can_write_eof()

Renvoie True si le transport gère write_eof(), False sinon.

WriteTransport.get_write_buffer_size()

Renvoie la taille actuelle du tampon de sortie utilisé par le transport.

WriteTransport.get_write_buffer_limits()

Obtient les seuils high et low pour le contrôle du flux d'écriture. Renvoie un n-uplet (low, high)low et high sont des nombres positifs d'octets.

Utilisez set_write_buffer_limits() pour définir les limites.

Ajouté dans la version 3.4.2.

WriteTransport.set_write_buffer_limits(high=None, low=None)

Définit les seuils high et low pour le contrôle du flux d'écriture.

Ces deux valeurs (mesurées en nombre d'octets) contrôlent quand les méthodes protocol.pause_writing() et protocol.resume_writing() du protocole sont appelées. S'il est spécifié, le seuil bas doit être inférieur ou égal au seuil haut. Ni high ni low ne peuvent être négatifs.

pause_writing() est appelée lorsque la taille du tampon devient supérieure ou égale à la valeur high. Si l'écriture a été interrompue, resume_writing() est appelée lorsque la taille du tampon devient inférieure ou égale à la valeur low.

Les valeurs par défaut sont spécifiques à l'implémentation. Si seul le seuil supérieur est donné, le seuil bas prend par défaut une valeur spécifique à l'implémentation inférieure ou égale au seuil supérieur. Définir high sur zéro force également low sur zéro et provoque l'appel de pause_writing() chaque fois que le tampon devient non vide. Définir low sur zéro entraîne l'appel de resume_writing() uniquement une fois que le tampon est vide. L'utilisation de zéro pour l'un ou l'autre seuil est généralement sous-optimal car cela réduit les possibilités d'effectuer des entrées-sorties et des calculs simultanément.

Utilisez get_write_buffer_limits() pour obtenir les limites.

WriteTransport.write(data)

Écrit des octets de data sur le transport.

Cette méthode ne bloque pas ; elle met les données en mémoire tampon et s'arrange pour qu'elles soient envoyées de manière asynchrone.

WriteTransport.writelines(list_of_data)

Écrit une liste (ou tout itérable) d'octets de données dans le transport. Ceci est fonctionnellement équivalent à appeler write() sur chaque élément produit par l'itérable, mais peut être implémentée plus efficacement.

WriteTransport.write_eof()

Ferme l'extrémité d'écriture du transport après avoir vidé toutes les données mises en mémoire tampon. Des données peuvent encore être reçues.

Cette méthode peut lever NotImplementedError si le transport (par exemple SSL) ne prend pas en charge les connexions semi-fermées.

Transports par datagrammes

DatagramTransport.sendto(data, addr=None)

Envoie les octets data au pair distant indiqué par addr (une adresse cible dépendante du transport). Si addr est None, les données sont envoyées à l'adresse cible indiquée lors de la création du transport.

Cette méthode ne bloque pas ; elle met les données en mémoire tampon et s'arrange pour qu'elles soient envoyées de manière asynchrone.

DatagramTransport.abort()

Ferme le transport immédiatement, sans attendre la fin des opérations en attente. Les données mises en mémoire tampon sont perdues. Aucune autre donnée ne sera reçue. La méthode protocol.connection_lost() du protocole sera éventuellement appelée avec None comme argument.

Transports entre sous-processus

SubprocessTransport.get_pid()

Renvoie l'identifiant du sous processus sous la forme d'un nombre entier.

SubprocessTransport.get_pipe_transport(fd)

Renvoie le transport pour le canal de communication correspondant au descripteur de fichier fd donné sous forme d'un entier :

  • 0 : transport de flux en lecture de l'entrée standard (stdin), ou None si le sous-processus n'a pas été créé avec stdin=PIPE

  • 1 : transport de flux en écriture de la sortie standard (stdout), ou None si le sous-processus n'a pas été créé avec stdout=PIPE

  • 2 : transport de flux en écriture de l'erreur standard (stderr), ou None si le sous-processus n'a pas été créé avec stderr=PIPE

  • autre fd : None

SubprocessTransport.get_returncode()

Renvoie le code de retour du sous-processus sous la forme d'un entier ou None s'il n'a pas été renvoyé, ce qui est similaire à l'attribut subprocess.Popen.returncode.

SubprocessTransport.kill()

Tue le sous-processus.

Sur les systèmes POSIX, la fonction envoie SIGKILL au sous-processus. Sous Windows, cette méthode est un alias pour terminate().

Voir aussi subprocess.Popen.kill().

SubprocessTransport.send_signal(signal)

Envoie le numéro de signal au sous-processus, comme dans subprocess.Popen.send_signal().

SubprocessTransport.terminate()

Termine le sous-processus.

On POSIX systems, this method sends SIGTERM to the subprocess. On Windows, the Windows API function TerminateProcess() is called to stop the subprocess.

Voir aussi subprocess.Popen.terminate().

SubprocessTransport.close()

Tue le sous-processus en appelant la méthode kill().

Si le sous-processus n'est pas encore terminé, ferme les transports des tubes stdin, stdout et stderr.

Protocoles

Code source : Lib/asyncio/protocols.py


asyncio fournit un ensemble de classes mères abstraites qui doivent être utilisées pour implémenter des protocoles réseau. Ces classes sont destinées à être utilisées avec les transports.

Les sous-classes des classes mères abstraites de protocole peuvent implémenter certaines ou toutes les méthodes. Toutes ces méthodes sont des rappels : elles sont appelées par des transports sur certains événements, par exemple lors de la réception de certaines données. Une méthode de protocole de base doit être appelée par le transport correspondant.

Protocoles de base

class asyncio.BaseProtocol

Protocole de base avec des méthodes partagées par tous les protocoles.

class asyncio.Protocol(BaseProtocol)

Classe mère pour l'implémentation des protocoles de streaming (TCP, sockets Unix, etc.).

class asyncio.BufferedProtocol(BaseProtocol)

Classe mère pour implémenter des protocoles de streaming avec contrôle manuel du tampon de réception.

class asyncio.DatagramProtocol(BaseProtocol)

Classe mère pour l'implémentation des protocoles par datagrammes (UDP).

class asyncio.SubprocessProtocol(BaseProtocol)

Classe mère pour implémenter des protocoles communiquant avec des processus enfants (canaux unidirectionnels).

Protocoles de base

Tous les protocoles asynchrones peuvent implémenter des rappels pour les protocoles de base.

Rappels pour les connexions

Les méthodes de rappel pour les connexions concernent tous les protocoles, exactement une fois par connexion réussie. Tous les autres rappels de protocole ne peuvent être appelés qu'entre ces deux méthodes.

BaseProtocol.connection_made(transport)

Appelée lorsqu'une connexion est établie.

L'argument transport est le transport représentant la connexion. Le protocole est chargé de stocker la référence à son transport.

BaseProtocol.connection_lost(exc)

Appelée lorsqu'une connexion est perdue ou fermée.

L'argument est soit un objet exception soit None. Ce dernier signifie qu'un EOF régulier est reçu, ou que la connexion a été interrompue ou fermée par ce côté de la connexion.

Rappels pour le contrôle de flux

Les méthodes de rappel pour le contrôle de flux concernent les transports et sont utilisés pour suspendre ou reprendre l'écriture effectuée par le protocole.

Voir la documentation de la méthode set_write_buffer_limits() pour plus de détails.

BaseProtocol.pause_writing()

Appelée lorsque la mémoire tampon du transport dépasse la limite supérieure.

BaseProtocol.resume_writing()

Appelée lorsque la mémoire tampon du transport passe sous le seuil bas.

Si la taille du tampon est égale au seuil haut, pause_writing() n'est pas appelée : la taille du tampon doit être strictement supérieure.

Inversement, resume_writing() est appelée lorsque la taille du tampon est égale ou inférieure au seuil bas. Ces conditions de fin sont importantes pour s'assurer que les choses se déroulent comme prévu lorsque l'un ou l'autre seuil est à zéro.

Protocoles connectés

Les méthodes d'événement, telles que loop.create_server(), loop.create_unix_server(), loop.create_connection(), loop.create_unix_connection(), loop.connect_accepted_socket(), loop.connect_read_pipe() et loop.connect_write_pipe() acceptent les fabriques qui renvoient des protocoles connectés.

Protocol.data_received(data)

Appelée lorsque certaines données sont reçues. data est un objet bytes non vide contenant les données entrantes.

Le fait que les données soient mises en mémoire tampon, fragmentées ou réassemblées dépend du transport. En général, vous ne devez pas vous fier à une sémantique spécifique et plutôt rendre votre analyse générique et flexible. Cependant, les données sont toujours reçues dans le bon ordre.

La méthode peut être appelée un nombre arbitraire de fois lorsqu'une connexion est ouverte.

Cependant, protocol.eof_received() est appelée au plus une fois. Une fois que eof_received() est appelée, data_received() n'est plus appelée.

Protocol.eof_received()

Appelée lorsque l'autre extrémité signale qu'il n'enverra plus de données (par exemple en appelant transport.write_eof(), si l'autre extrémité utilise également asyncio).

Cette méthode peut renvoyer une valeur évaluée à faux (y compris None), auquel cas le transport se ferme de lui-même. À l'inverse, si cette méthode renvoie une valeur évaluée à vrai, le protocole utilisé détermine s'il faut fermer le transport. Puisque l'implémentation par défaut renvoie None, elle ferme implicitement la connexion.

Certains transports, y compris SSL, ne prennent pas en charge les connexions semi-fermées, auquel cas renvoyer True à partir de cette méthode entraîne la fermeture de la connexion.

Machine à états :

start -> connection_made
    [-> data_received]*
    [-> eof_received]?
-> connection_lost -> end

Protocoles connectés avec tampon

Ajouté dans la version 3.7.

Les protocoles avec mise en mémoire tampon peuvent être utilisés avec n'importe quelle méthode de boucle d'événements prenant en charge les protocoles connectés.

Les implémentations de BufferedProtocol permettent une allocation et un contrôle manuels explicites du tampon de réception. Les boucles d'événements peuvent alors utiliser le tampon fourni par le protocole pour éviter les copies de données inutiles. Cela peut entraîner une amélioration notable des performances pour les protocoles qui reçoivent de grandes quantités de données. Des implémentations de protocole sophistiquées peuvent réduire considérablement le nombre d'allocations de mémoire tampon.

Les méthodes de rappel suivantes sont appelées sur les instances BufferedProtocol :

BufferedProtocol.get_buffer(sizehint)

Appelée pour allouer un nouveau tampon de réception.

sizehint est la taille minimale recommandée pour le tampon renvoyé. Il est acceptable de renvoyer des tampons plus petits ou plus grands que ce que suggère sizehint. Lorsqu'il est défini à −1, la taille du tampon peut être arbitraire. C'est une erreur de renvoyer un tampon de taille nulle.

get_buffer() doit renvoyer un objet implémentant le protocole tampon.

BufferedProtocol.buffer_updated(nbytes)

Appelée lorsque le tampon a été mis à jour avec les données reçues.

nbytes est le nombre total d'octets qui ont été écrits dans la mémoire tampon.

BufferedProtocol.eof_received()

Voir la documentation de la méthode protocol.eof_received().

get_buffer() peut être appelée un nombre arbitraire de fois pendant une connexion. Cependant, protocol.eof_received() est appelée au plus une fois et, si elle est appelée, get_buffer() et buffer_updated() ne seront pas appelées après.

Machine à états :

start -> connection_made
    [-> get_buffer
        [-> buffer_updated]?
    ]*
    [-> eof_received]?
-> connection_lost -> end

Protocoles par datagrammes (non connectés)

Les instances du protocole par datagrammes doivent être construites par des fabriques de protocole transmises à la méthode loop.create_datagram_endpoint().

DatagramProtocol.datagram_received(data, addr)

Appelée lorsqu'un datagramme est reçu. data est un objet bytes contenant les données entrantes. addr est l'adresse du pair qui envoie les données ; le format exact dépend du transport.

DatagramProtocol.error_received(exc)

Appelée lorsqu'une opération d'envoi ou de réception précédente lève une OSError. exc est l'instance OSError.

Cette méthode est appelée dans de rares cas, lorsque le transport (par exemple UDP) détecte qu'un datagramme n'a pas pu être livré à son destinataire. Cependant, il est courant que les datagrammes qui ne peuvent être acheminés soient supprimés silencieusement.

Note

Sur les systèmes BSD (macOS, FreeBSD, etc.), le contrôle de flux n'est pas pris en charge pour les protocoles par datagrammes, car il n'existe aucun moyen fiable de détecter les échecs d'envoi causés par l'écriture d'un trop grand nombre de paquets.

Le connecteur apparaît toujours « prêt » et les paquets en excès sont supprimés. Une OSError avec errno définie sur errno.ENOBUFS peut être levée ou non ; si elle est levée, c'est communiqué à DatagramProtocol.error_received() et ignoré dans le cas contraire.

Protocoles liés aux sous-processus

Les instances de protocole de sous-processus doivent être construites par des fabriques de protocole transmises aux méthodes loop.subprocess_exec() et loop.subprocess_shell().

SubprocessProtocol.pipe_data_received(fd, data)

Appelée lorsqu'un processus enfant écrit sur sa sortie d'erreur ou sa sortie standard.

fd est l'entier représentant le descripteur de fichier du tube.

data est un objet bytes non vide contenant les données reçues.

SubprocessProtocol.pipe_connection_lost(fd, exc)

Appelé lorsqu'un des tubes de communication avec un sous-processus est fermé.

fd est l'entier représentant le descripteur de fichier qui a été fermé.

SubprocessProtocol.process_exited()

Appelée lorsqu'un processus enfant se termine.

It can be called before pipe_data_received() and pipe_connection_lost() methods.

Exemples

Serveur écho en TCP

Crée un serveur d'écho TCP en utilisant la méthode loop.create_server(), renvoie les données reçues et ferme la connexion :

import asyncio


class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        lambda: EchoServerProtocol(),
        '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()


asyncio.run(main())

Voir aussi

L'exemple Serveur d'écho TCP utilisant des flux utilise la fonction de haut niveau asyncio.start_server().

Client écho en TCP

Client d'écho TCP utilisant la méthode loop.create_connection() envoie des données et attend que la connexion soit fermée :

import asyncio


class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = 'Hello World!'

    transport, protocol = await loop.create_connection(
        lambda: EchoClientProtocol(message, on_con_lost),
        '127.0.0.1', 8888)

    # Wait until the protocol signals that the connection
    # is lost and close the transport.
    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

Voir aussi

L'exemple Client d'écho TCP utilisant des flux utilise la fonction de haut niveau asyncio.open_connection().

Serveur écho en UDP

Serveur d'écho UDP, utilisant la méthode loop.create_datagram_endpoint(), qui renvoie les données reçues :

import asyncio


class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)


async def main():
    print("Starting UDP server")

    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    # One protocol instance will be created to serve all
    # client requests.
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoServerProtocol(),
        local_addr=('127.0.0.1', 9999))

    try:
        await asyncio.sleep(3600)  # Serve for 1 hour.
    finally:
        transport.close()


asyncio.run(main())

Client écho en UDP

Client d'écho UDP, utilisant la méthode loop.create_datagram_endpoint(), qui envoie des données et ferme le transport lorsqu'il reçoit la réponse :

import asyncio


class EchoClientProtocol:
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Connection closed")
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = "Hello World!"

    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoClientProtocol(message, on_con_lost),
        remote_addr=('127.0.0.1', 9999))

    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

Connexion de connecteurs existants

Attend qu'un connecteur reçoive des données en utilisant la méthode loop.create_connection() avec un protocole :

import asyncio
import socket


class MyProtocol(asyncio.Protocol):

    def __init__(self, on_con_lost):
        self.transport = None
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport;
        # connection_lost() will be called automatically.
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()
    on_con_lost = loop.create_future()

    # Create a pair of connected sockets
    rsock, wsock = socket.socketpair()

    # Register the socket to wait for data.
    transport, protocol = await loop.create_connection(
        lambda: MyProtocol(on_con_lost), sock=rsock)

    # Simulate the reception of data from the network.
    loop.call_soon(wsock.send, 'abc'.encode())

    try:
        await protocol.on_con_lost
    finally:
        transport.close()
        wsock.close()

asyncio.run(main())

Voir aussi

L'exemple Surveillance des événements de lecture pour un descripteur de fichier utilise la méthode de bas niveau loop.add_reader() pour enregistrer un descripteur de fichier.

L'exemple Ouverture d'un connecteur pour attendre les données à l'aide de flux utilise des flux de haut niveau créés par la fonction open_connection() dans une coroutine.

loop.subprocess_exec() et SubprocessProtocol

Un exemple de protocole de sous-processus utilisé pour obtenir la sortie d'un sous-processus et attendre la sortie du sous-processus.

Le sous-processus est créé par la méthode loop.subprocess_exec() :

import asyncio
import sys

class DateProtocol(asyncio.SubprocessProtocol):
    def __init__(self, exit_future):
        self.exit_future = exit_future
        self.output = bytearray()
        self.pipe_closed = False
        self.exited = False

    def pipe_connection_lost(self, fd, exc):
        self.pipe_closed = True
        self.check_for_exit()

    def pipe_data_received(self, fd, data):
        self.output.extend(data)

    def process_exited(self):
        self.exited = True
        # process_exited() method can be called before
        # pipe_connection_lost() method: wait until both methods are
        # called.
        self.check_for_exit()

    def check_for_exit(self):
        if self.pipe_closed and self.exited:
            self.exit_future.set_result(True)

async def get_date():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    code = 'import datetime; print(datetime.datetime.now())'
    exit_future = asyncio.Future(loop=loop)

    # Create the subprocess controlled by DateProtocol;
    # redirect the standard output into a pipe.
    transport, protocol = await loop.subprocess_exec(
        lambda: DateProtocol(exit_future),
        sys.executable, '-c', code,
        stdin=None, stderr=None)

    # Wait for the subprocess exit using the process_exited()
    # method of the protocol.
    await exit_future

    # Close the stdout pipe.
    transport.close()

    # Read the output which was collected by the
    # pipe_data_received() method of the protocol.
    data = bytes(protocol.output)
    return data.decode('ascii').rstrip()

date = asyncio.run(get_date())
print(f"Current date: {date}")

Voir aussi le même exemple écrit à l'aide d'API de haut niveau.