트랜스포트와 프로토콜

머리말

트랜스포트와 프로토콜은 loop.create_connection()와 같은 저수준 이벤트 루프 API에서 사용됩니다. 그들은 콜백 기반 프로그래밍 스타일을 사용하고 네트워크 나 IPC 프로토콜(예를 들어 HTTP)의 고성능 구현을 가능하게 합니다.

본질에서 트랜스포트와 프로토콜은 라이브러리와 프레임워크에서만 사용되어야 하며 고수준 asyncio 응용 프로그램에서는 사용되지 않아야 합니다.

이 문서 페이지는 트랜스포트프로토콜을 모두 다룹니다.

소개

최상위 수준에서, 트랜스포트는 어떻게(how) 바이트를 전송할지를 다루지만, 프로토콜은 어떤(which) 바이트를 전송할지를 결정합니다 (그리고 어느 정도는 언제(when)도).

같은 것을 다른 식으로 말하면: 트랜스포트는 소켓(또는 유사한 I/O 엔드포인트)의 추상화지만, 프로토콜은 트랜스포트의 관점에서 응용 프로그램의 추상화입니다.

또 다른 시각은 트랜스포트와 프로토콜 인터페이스가 함께 네트워크 I/O와 프로세스 간 I/O를 사용하기 위한 추상 인터페이스를 정의한다는 것입니다.

트랜스포트 객체와 프로토콜 객체 간에는 항상 1:1 관계가 있습니다: 프로토콜은 트랜스포트 메서드를 호출하여 데이터를 보내지만, 트랜스포트는 프로토콜 메서드를 호출하여 받은 데이터를 전달합니다.

대부분의 연결 지향 이벤트 루프 메서드(가령 loop.create_connection())는 Transport 객체로 표현되는 받아들인 연결을 위한 Protocol 객체를 만드는 데 사용되는 protocol_factory 인자를 받아들입니다. 이러한 메서드는 대개 (transport, protocol)의 튜플을 반환합니다.

목차

이 설명서 페이지는 다음 절로 구성됩니다:

트랜스포트

소스 코드: Lib/asyncio/transports.py


트랜스포트는 다양한 종류의 통신 채널을 추상화하기 위해 asyncio에서 제공하는 클래스입니다.

트랜스포트 객체는 항상 asyncio 이벤트 루프에 의해 인스턴스로 만들어집니다.

asyncio는 TCP, UDP, SSL 및 서브 프로세스 파이프를 위한 트랜스포트를 구현합니다. 트랜스포트에서 사용할 수 있는 메서드는 트랜스포트의 종류에 따라 다릅니다.

트랜스포트 클래스는 스레드 안전하지 않습니다.

트랜스포트 계층 구조

class asyncio.BaseTransport

모든 트랜스포트의 베이스 클래스. 모든 asyncio 트랜스포트가 공유하는 메서드를 포함합니다.

class asyncio.WriteTransport(BaseTransport)

쓰기 전용 연결을 위한 베이스 트랜스포트

WriteTransport 클래스의 인스턴스는 loop.connect_write_pipe() 이벤트 루프 메서드에서 반환되며 loop.subprocess_exec()와 같은 서브 프로세스 관련 메서드에서도 사용됩니다.

class asyncio.ReadTransport(BaseTransport)

읽기 전용 연결을 위한 베이스 트랜스포트

ReadTransport 클래스의 인스턴스는 loop.connect_read_pipe() 이벤트 루프 메서드에서 반환되며 loop.subprocess_exec()와 같은 서브 프로세스 관련 메서드에서도 사용됩니다.

class asyncio.Transport(WriteTransport, ReadTransport)

TCP 연결과 같은 양방향 트랜스포트를 나타내는 인터페이스.

사용자는 트랜스포트를 직접 인스턴스로 만들지 않습니다; 유틸리티 함수를 호출하고, 프로토콜 팩토리와 트랜스포트와 프로토콜을 만드는 데 필요한 기타 정보를 전달합니다.

Transport 클래스의 인스턴스는 loop.create_connection(), loop.create_unix_connection(), loop.create_server(), loop.sendfile() 등과 같은 이벤트 루프 메서드에서 반환되거나 사용됩니다.

class asyncio.DatagramTransport(BaseTransport)

데이터 그램 (UDP) 연결을 위한 트랜스포트.

DatagramTransport 클래스의 인스턴스는 loop.create_datagram_endpoint() 이벤트 루프 메서드에서 반환됩니다.

class asyncio.SubprocessTransport(BaseTransport)

부모와 그 자식 OS 프로세스 간의 연결을 나타내는 추상화.

SubprocessTransport 클래스의 인스턴스는 이벤트 루프 메서드 loop.subprocess_shell()loop.subprocess_exec()에서 반환됩니다.

베이스 트랜스포트

BaseTransport.close()

트랜스포트를 닫습니다.

트랜스포트에 송신 데이터용 버퍼가 있으면, 버퍼 된 데이터는 비동기적으로 플러시 됩니다. 더는 데이터가 수신되지 않습니다. 버퍼 된 모든 데이터가 플러시 된 후, 프로토콜의 protocol.connection_lost() 메서드가 None을 인자로 사용하여 호출됩니다.

BaseTransport.is_closing()

트랜스포트가 닫히는 중이거나 닫혔으면 True를 반환합니다.

BaseTransport.get_extra_info(name, default=None)

트랜스포트나 그것이 사용하는 하부 자원에 대한 정보를 반환합니다.

name은 얻고자 하는 트랜스포트 특정 정보의 조각을 나타내는 문자열입니다.

default는 정보가 없거나 트랜스포트가 제삼자 이벤트 루프 구현이나 현재 플랫폼에서 조회를 지원하지 않을 때 반환 할 값입니다.

예를 들어, 다음 코드는 트랜스포트의 하부 소켓 객체를 가져오려고 시도합니다:

sock = transport.get_extra_info('socket')
if sock is not None:
    print(sock.getsockopt(...))

일부 트랜스포트에서 조회할 수 있는 정보의 범주:

BaseTransport.set_protocol(protocol)

새 프로토콜을 설정합니다.

프로토콜의 교환은 두 프로토콜이 교환을 지원한다고 문서화 되었을 때만 수행되어야 합니다.

BaseTransport.get_protocol()

현재 프로토콜을 반환합니다.

읽기 전용 트랜스포트

ReadTransport.is_reading()

트랜스 포트가 새로운 데이터를 받고 있으면 True를 반환합니다.

버전 3.7에 추가.

ReadTransport.pause_reading()

트랜스포트의 수신 끝을 일시 중지합니다. resume_reading()이 호출 될 때까지 프로토콜의 protocol.data_received() 메서드에 데이터가 전달되지 않습니다.

버전 3.7에서 변경: 이 메서드는 멱등적(idempotent)입니다. 즉, 트랜스포트가 이미 일시 중지되거나 닫혔을 때도 호출할 수 있습니다.

ReadTransport.resume_reading()

수신 끝을 재개합니다. 읽을 수 있는 데이터가 있다면 프로토콜의 protocol.data_received() 메서드가 다시 한번 호출됩니다.

버전 3.7에서 변경: 이 메서드는 멱등적(idempotent)입니다. 즉, 트랜스포트가 이미 읽고 있을 때도 호출할 수 있습니다.

쓰기 전용 트랜스포트

WriteTransport.abort()

계류 중인 작업이 완료될 때까지 기다리지 않고, 즉시 트랜스포트를 닫습니다. 버퍼 된 데이터는 손실됩니다. 더는 데이터가 수신되지 않습니다. 프로토콜의 protocol.connection_lost() 메서드는 결국 None을 인자로 사용하여 호출됩니다.

WriteTransport.can_write_eof()

트랜스포트가 write_eof()를 지원하면 True를 반환하고, 그렇지 않으면 False를 반환합니다.

WriteTransport.get_write_buffer_size()

트랜스포트가 사용하는 출력 버퍼의 현재 크기를 반환합니다.

WriteTransport.get_write_buffer_limits()

쓰기 흐름 제어를 위한 highlow 수위(watermark)를 가져옵니다. 튜플 (low, high)를 반환합니다. 여기서 lowhigh는 양의 바이트 수입니다.

제한을 설정하려면 set_write_buffer_limits()를 사용하십시오.

버전 3.4.2에 추가.

WriteTransport.set_write_buffer_limits(high=None, low=None)

쓰기 흐름 제어를 위한 highlow 수위(watermark)를 설정합니다.

이 두 값(바이트 수로 측정)은 프로토콜의 protocol.pause_writing()protocol.resume_writing() 메서드가 언제 호출될지를 제어합니다. 지정하면, low 수위는 high 수위보다 작거나 같아야 합니다. highlow는 음수가 될 수 없습니다.

버퍼 크기가 high 값보다 크거나 같아질 때 pause_writing()가 호출됩니다. 쓰기가 일시 중지되면, 버퍼 크기가 low 값보다 작거나 같아질 때 resume_writing()이 호출됩니다.

기본값은 구현에 따라 다릅니다. high 수위만 주어지면, low 수위는 high 수위보다 작거나 같은 구현 특정 기본값이 사용됩니다. high를 0으로 설정하면, low도 0으로 설정되고, 버퍼가 비어있지 않게 될 때마다 pause_writing()가 호출되도록 합니다. low를 0으로 설정하면 버퍼가 빌 때만 resume_writing()이 호출됩니다. 두 제한 중 하나에 0을 사용하는 것은 I/O와 계산을 동시에 수행할 기회를 줄이기 때문에 일반적으로 최선이 아닙니다.

제한을 가져오려면 get_write_buffer_limits()를 사용하십시오.

WriteTransport.write(data)

어떤 data 바이트열을 트랜스포트에 기록합니다.

이 메서드는 블록하지 않습니다; 데이터를 버퍼하고 비동기적으로 전송되도록 배치합니다.

WriteTransport.writelines(list_of_data)

트랜스포트에 데이터 바이트열 리스트(또는 임의의 이터러블)를 기록합니다. 이것은 이터러블이 산출하는 각 요소에 대해 write()를 호출하는 것과 기능 면에서 동등하지만, 더 효율적으로 구현될 수 있습니다.

WriteTransport.write_eof()

버퍼 된 모든 데이터를 플러시 한 후 트랜스포트의 쓰기 끝을 닫습니다. 데이터가 여전히 수신될 수 있습니다.

이 메서드는 트랜스포트(예를 들어 SSL)가 반만 닫힌 연결을 지원하지 않으면 NotImplementedError를 발생시킬 수 있습니다.

데이터 그램 트랜스포트

DatagramTransport.sendto(data, addr=None)

addr(트랜스포트 종속적인 대상 주소)에 의해 주어진 원격 피어로 data 바이트열을 보냅니다. addrNone이면, 트랜스포트를 만들 때 지정된 대상 주소로 data가 전송됩니다.

이 메서드는 블록하지 않습니다; 데이터를 버퍼하고 비동기적으로 전송되도록 배치합니다.

DatagramTransport.abort()

계류 중인 작업이 완료될 때까지 기다리지 않고, 즉시 트랜스포트를 닫습니다. 버퍼 된 데이터는 손실됩니다. 더는 데이터가 수신되지 않습니다. 프로토콜의 protocol.connection_lost() 메서드는 결국 None을 인자로 사용하여 호출됩니다.

서브 프로세스 트랜스포트

SubprocessTransport.get_pid()

서브 프로세스 ID를 정수로 반환합니다.

SubprocessTransport.get_pipe_transport(fd)

정수 파일 기술자 fd에 대응하는 통신 파이프의 트랜스포트를 돌려줍니다:

  • 0: 표준 입력(stdin)의 읽을 수 있는 스트리밍 트랜스포트, 또는 서브 프로세스가 stdin=PIPE로 만들어지지 않았으면 None

  • 1: 표준 출력(stdout)의 쓸 수 있는 스트리밍 트랜스포트, 또는 서브 프로세스가 stdout=PIPE로 만들어지지 않았으면 None

  • 2: 표준 오류(stderr)의 쓸 수 있는 스트리밍 트랜스포트, 또는 서브 프로세스가 stderr=PIPE로 만들어지지 않았으면 None

  • 다른 fd: None

SubprocessTransport.get_returncode()

서브 프로세스 반환 값을 정수로, 혹은 반환되지 않았으면 None을 반환합니다. subprocess.Popen.returncode 어트리뷰트와 유사합니다.

SubprocessTransport.kill()

서브 프로세스를 죽입니다.

POSIX 시스템에서, 이 함수는 SIGKILL을 서브 프로세스로 보냅니다. 윈도우에서, 이 메서드는 terminate()의 별칭입니다.

subprocess.Popen.kill()도 참조하십시오.

SubprocessTransport.send_signal(signal)

subprocess.Popen.send_signal()와 마찬가지로 signal 번호를 서브 프로세스로 보냅니다.

SubprocessTransport.terminate()

서브 프로세스를 중지합니다.

POSIX 시스템에서, 이 메서드는 SIGTERM을 서브 프로세스로 보냅니다. 윈도에서, 서브 프로세스를 중지하기 위해 윈도우 API 함수 TerminateProcess() 가 호출됩니다.

subprocess.Popen.terminate()도 참조하십시오.

SubprocessTransport.close()

kill() 메서드를 호출하여 서브 프로세스를 죽입니다.

서브 프로세스가 아직 반환하지 않았으면, stdin, stdoutstderr 파이프의 트랜스포트를 닫습니다.

프로토콜

소스 코드: Lib/asyncio/protocols.py


asyncio는 네트워크 프로토콜을 구현하는 데 사용해야 하는 추상 베이스 클래스 집합을 제공합니다. 이러한 클래스는 트랜스포트와 함께 사용해야 합니다.

추상 베이스 프로토콜 클래스의 서브 클래스는 일부 또는 모든 메서드를 구현할 수 있습니다. 이 모든 메서드는 콜백입니다: 이것들은 특정 이벤트에서 트랜스포트에 의해 호출됩니다, 예를 들어 어떤 데이터가 수신될 때. 베이스 프로토콜 메서드는 해당 트랜스포트에 의해 호출되어야 합니다.

베이스 프로토콜

class asyncio.BaseProtocol

모든 프로토콜이 공유하는 메서드를 가진 베이스 프로토콜.

class asyncio.Protocol(BaseProtocol)

스트리밍 프로토콜(TCP, 유닉스 소켓 등)을 구현하기 위한 베이스 클래스.

class asyncio.BufferedProtocol(BaseProtocol)

수신 버퍼의 수동 제어로 스트리밍 프로토콜을 구현하기 위한 베이스 클래스.

class asyncio.DatagramProtocol(BaseProtocol)

데이터 그램 (UDP) 프로토콜을 구현하기 위한 베이스 클래스.

class asyncio.SubprocessProtocol(BaseProtocol)

자식 프로세스와 통신하는 (단방향 파이프) 프로토콜을 구현하기 위한 베이스 클래스.

베이스 프로토콜

모든 asyncio 프로토콜은 베이스 프로토콜 콜백을 구현할 수 있습니다.

연결 콜백

연결 콜백은 모든 프로토콜에서 성공적으로 연결될 때마다 정확히 한 번 호출됩니다. 다른 모든 프로토콜 콜백은 이 두 메서드 사이에서만 호출될 수 있습니다.

BaseProtocol.connection_made(transport)

연결이 이루어질 때 호출됩니다.

transport 인자는 연결을 나타내는 트랜스포트입니다. 트랜스포트에 대한 참조를 저장하는 것은 프로토콜의 책임입니다.

BaseProtocol.connection_lost(exc)

연결이 끊어지거나 닫힐 때 호출됩니다.

인자는 예외 객체나 None입니다. 후자는 정상적인 EOF가 수신되었거나, 연결의 이쪽에서 연결이 중단(abort)되거나 닫혔다는 것을 의미합니다.

흐름 제어 콜백

흐름 제어 콜백은 프로토콜에 의해 수행되는 쓰기를 일시 중지하거나 다시 시작하도록 트랜스포트에 의해 호출될 수 있습니다.

자세한 내용은 set_write_buffer_limits() 메서드 설명서를 참조하십시오.

BaseProtocol.pause_writing()

트랜스포트 버퍼가 높은 수위를 넘을 때 호출됩니다.

BaseProtocol.resume_writing()

트랜스포트 버퍼가 낮은 수위 아래로 내려갈 때 호출됩니다.

버퍼 크기가 높은 수위와 같으면, pause_writing()이 호출되지 않습니다: 버퍼 크기는 엄격하게 초과해야 합니다.

반대로, resume_writing()은 버퍼 크기가 낮은 수위와 같거나 낮을 때 호출됩니다. 이러한 종료 조건은 수위가 0일 때 예상대로 진행되게 하려면 중요합니다.

스트리밍 프로토콜

loop.create_server(), loop.create_unix_server(), loop.create_connection(), loop.create_unix_connection(), loop.connect_accepted_socket(), loop.connect_read_pipe()loop.connect_write_pipe()와 같은 이벤트 메서드는 스트리밍 프로토콜을 반환하는 팩토리를 받아들입니다.

Protocol.data_received(data)

어떤 데이터가 수신될 때 호출됩니다. data는 수신 데이터가 들어있는 비어 있지 않은 바이트열 객체입니다.

데이터가 버퍼 되고, 조각으로 나뉘고, 재조립되는지는 트랜스포트에 달려있습니다. 일반적으로, 특정 의미에 의존해서는 안 되며, 구문 분석을 일반적이고 유연하게 만들어야 합니다. 그러나, 데이터는 항상 올바른 순서로 수신됩니다.

이 메서드는 연결이 열려있는 동안 임의의 횟수만큼 호출될 수 있습니다.

그러나, protocol.eof_received()는 최대한 한 번만 호출됩니다. 일단 eof_received()가 호출되면, data_received()는 더는 호출되지 않습니다.

Protocol.eof_received()

다른 쪽 끝이 더는 데이터를 보내지 않을 것이라는 신호를 보낼 때 호출됩니다 (예를 들어, 다른 쪽 끝도 asyncio를 사용하면, transport.write_eof()를 호출하여).

이 메서드는 거짓 값(None 포함)을 반환할 수 있으며, 이때 트랜스포트는 스스로 닫힙니다. 반대로, 이 메서드가 참값을 반환하면, 사용된 프로토콜이 트랜스포트를 닫을지를 결정합니다. 기본 구현이 None을 반환하기 때문에, 묵시적으로 연결을 닫습니다.

SSL을 포함한 일부 트랜스포트는, 반만 닫힌 연결을 지원하지 않습니다. 이때, 이 메서드에서 참을 반환하면 연결이 닫힙니다.

상태 기계:

start -> connection_made
    [-> data_received]*
    [-> eof_received]?
-> connection_lost -> end

버퍼 된 스트리밍 프로토콜

버전 3.7에 추가.

버퍼 된 프로토콜은 스트리밍 프로토콜을 지원하는 모든 이벤트 루프 메서드와 함께 사용할 수 있습니다.

BufferedProtocol 구현은 수신 버퍼의 명시적 수동 할당과 제어를 허용합니다. 이벤트 루프는 프로토콜에서 제공하는 버퍼를 사용하여 불필요한 데이터 복사를 피할 수 있습니다. 이로 인해 대량의 데이터를 수신하는 프로토콜의 성능이 크게 향상될 수 있습니다. 정교한 프로토콜 구현은 버퍼 할당 수를 크게 줄일 수 있습니다.

다음 콜백은 BufferedProtocol 인스턴스에 호출됩니다.:

BufferedProtocol.get_buffer(sizehint)

새로운 수신 버퍼를 할당하기 위해서 호출됩니다.

sizehint는 반환되는 버퍼에 대해 권장되는 최소 크기입니다. sizehint가 제안하는 것보다 더 작거나 큰 버퍼를 반환하는 것이 허용됩니다. -1로 설정하면 버퍼 크기는 임의적일 수 있습니다. 크기가 0인 버퍼를 반환하는 것은 에러입니다.

get_buffer()버퍼 프로토콜을 구현하는 객체를 반환해야 합니다.

BufferedProtocol.buffer_updated(nbytes)

수신된 데이터로 버퍼가 갱신될 때 호출됩니다.

nbytes는 버퍼에 기록된 총 바이트 수입니다.

BufferedProtocol.eof_received()

protocol.eof_received() 메서드의 설명서를 참조하십시오.

get_buffer()는 연결 중에 임의의 횟수만큼 호출될 수 있습니다. 그러나, protocol.eof_received()는 최대한 한 번만 호출되며, 호출되면 get_buffer()buffer_updated()는 그 이후로 호출되지 않습니다.

상태 기계:

start -> connection_made
    [-> get_buffer
        [-> buffer_updated]?
    ]*
    [-> eof_received]?
-> connection_lost -> end

데이터 그램 프로토콜

데이터 그램 프로토콜 인스턴스는 loop.create_datagram_endpoint() 메서드에 전달된 프로토콜 팩토리에 의해 만들어져야 합니다.

DatagramProtocol.datagram_received(data, addr)

데이터 그램이 수신될 때 호출됩니다. data는 수신 데이터를 포함하는 바이트열 객체입니다. addr는 데이터를 보내는 피어의 주소입니다; 정확한 형식은 트랜스포트에 따라 다릅니다.

DatagramProtocol.error_received(exc)

이전의 송신이나 수신 연산이 OSError를 일으킬 때 호출됩니다. excOSError 인스턴스입니다.

이 메서드는 드문 조건에서 호출됩니다, 트랜스포트(예를 들어 UDP)가 데이터 그램을 수신자에게 전달할 수 없음을 감지했을 때입니다. 하지만 대부분 전달할 수 없는 데이터 그램은 조용히 삭제됩니다.

참고

BSD 시스템(macOS, FreeBSD 등)에서는, 너무 많은 패킷을 쓰는 것으로 인한 전송 실패를 감지하는 신뢰성 있는 방법이 없으므로 데이터 그램 프로토콜에 대한 흐름 제어가 지원되지 않습니다.

소켓은 항상 ‘ready’로 나타나고 여분의 패킷은 삭제됩니다. errnoerrno.ENOBUFS로 설정된 OSError가 발생할 수도 그렇지 않을 수도 있습니다; 발생하면, DatagramProtocol.error_received()에게 보고되지만 그렇지 않으면 무시됩니다.

서브 프로세스 프로토콜

Subprocess Protocol instances should be constructed by protocol factories passed to the loop.subprocess_exec() and loop.subprocess_shell() methods.

SubprocessProtocol.pipe_data_received(fd, data)

자식 프로세스가 stdout 이나 stderr 파이프에 데이터를 쓸 때 호출됩니다.

fd는 파이프의 정수 파일 기술자입니다.

data는 수신 된 데이터를 포함하는 비어 있지 않은 바이트열 객체입니다.

SubprocessProtocol.pipe_connection_lost(fd, exc)

자식 프로세스와 통신하는 파이프 중 하나가 닫히면 호출됩니다.

fd는 닫힌 정수 파일 기술자입니다.

SubprocessProtocol.process_exited()

자식 프로세스가 종료할 때 호출됩니다.

예제

TCP 메아리 서버

loop.create_server() 메서드를 사용하여 TCP 메아리 서버를 만들고, 받은 데이터를 다시 보내고, 연결을 닫습니다:

import asyncio


class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        lambda: EchoServerProtocol(),
        '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()


asyncio.run(main())

더 보기

스트림을 사용하는 TCP 메아리 서버 예제는 고수준 asyncio.start_server() 함수를 사용합니다.

TCP 메아리 클라이언트

loop.create_connection() 메서드를 사용하는 TCP 메아리 클라이언트, 데이터를 보내고 연결이 닫힐 때까지 기다립니다:

import asyncio


class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = 'Hello World!'

    transport, protocol = await loop.create_connection(
        lambda: EchoClientProtocol(message, on_con_lost),
        '127.0.0.1', 8888)

    # Wait until the protocol signals that the connection
    # is lost and close the transport.
    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

더 보기

스트림을 사용하는 TCP 메아리 클라이언트 예제는 고수준 asyncio.open_connection() 함수를 사용합니다.

UDP 메아리 서버

loop.create_datagram_endpoint() 메서드를 사용하는 UDP 메아리 서버, 수신된 데이터를 다시 보냅니다:

import asyncio


class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)


async def main():
    print("Starting UDP server")

    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    # One protocol instance will be created to serve all
    # client requests.
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoServerProtocol(),
        local_addr=('127.0.0.1', 9999))

    try:
        await asyncio.sleep(3600)  # Serve for 1 hour.
    finally:
        transport.close()


asyncio.run(main())

UDP 메아리 클라이언트

loop.create_datagram_endpoint() 메서드를 사용하는 UDP 메아리 클라이언트, 데이터를 보내고 응답을 받으면 트랜스포트를 닫습니다:

import asyncio


class EchoClientProtocol:
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Connection closed")
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = "Hello World!"

    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoClientProtocol(message, on_con_lost),
        remote_addr=('127.0.0.1', 9999))

    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

기존 소켓 연결하기

프로토콜과 함께 loop.create_connection() 메서드를 사용하여 소켓이 데이터를 수신할 때까지 기다립니다:

import asyncio
import socket


class MyProtocol(asyncio.Protocol):

    def __init__(self, on_con_lost):
        self.transport = None
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport;
        # connection_lost() will be called automatically.
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()
    on_con_lost = loop.create_future()

    # Create a pair of connected sockets
    rsock, wsock = socket.socketpair()

    # Register the socket to wait for data.
    transport, protocol = await loop.create_connection(
        lambda: MyProtocol(on_con_lost), sock=rsock)

    # Simulate the reception of data from the network.
    loop.call_soon(wsock.send, 'abc'.encode())

    try:
        await protocol.on_con_lost
    finally:
        transport.close()
        wsock.close()

asyncio.run(main())

더 보기

파일 기술자에서 읽기 이벤트를 관찰하기 예제는 저수준의 loop.add_reader() 메서드를 사용하여 FD를 등록합니다.

스트림을 사용하여 데이터를 기다리는 열린 소켓 등록 예제는 코루틴에서 open_connection() 함수에 의해 생성된 고수준 스트림을 사용합니다.

loop.subprocess_exec() 와 SubprocessProtocol

서브 프로세스의 출력을 가져오고 서브 프로세스가 끝날 때까지 대기하는 데 사용되는 서브 프로세스 프로토콜의 예.

서브 프로세스는 loop.subprocess_exec() 메서드에 의해 만들어집니다:

import asyncio
import sys

class DateProtocol(asyncio.SubprocessProtocol):
    def __init__(self, exit_future):
        self.exit_future = exit_future
        self.output = bytearray()

    def pipe_data_received(self, fd, data):
        self.output.extend(data)

    def process_exited(self):
        self.exit_future.set_result(True)

async def get_date():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    code = 'import datetime; print(datetime.datetime.now())'
    exit_future = asyncio.Future(loop=loop)

    # Create the subprocess controlled by DateProtocol;
    # redirect the standard output into a pipe.
    transport, protocol = await loop.subprocess_exec(
        lambda: DateProtocol(exit_future),
        sys.executable, '-c', code,
        stdin=None, stderr=None)

    # Wait for the subprocess exit using the process_exited()
    # method of the protocol.
    await exit_future

    # Close the stdout pipe.
    transport.close()

    # Read the output which was collected by the
    # pipe_data_received() method of the protocol.
    data = bytes(protocol.output)
    return data.decode('ascii').rstrip()

date = asyncio.run(get_date())
print(f"Current date: {date}")

고수준 API를 사용하여 작성된 같은 예제도 참조하십시오.