스트림

소스 코드: Lib/asyncio/streams.py


스트림은 네트워크 연결로 작업하기 위해, async/await에서 사용할 수 있는 고수준 프리미티브입니다. 스트림은 콜백이나 저수준 프로토콜과 트랜스포트를 사용하지 않고 데이터를 송수신할 수 있게 합니다.

다음은 asyncio 스트림을 사용하여 작성된 TCP 메아리 클라이언트의 예입니다:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

아래의 예제 절도 참조하십시오.

스트림 함수

다음 최상위 asyncio 함수를 사용하여 스트림을 만들고 작업할 수 있습니다.:

coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)

네트워크 연결을 만들고 (reader, writer) 객체 쌍을 반환합니다.

반환된 readerwriter 객체는 StreamReaderStreamWriter 클래스의 인스턴스입니다.

limit는 반환된 StreamReader 인스턴스가 사용하는 버퍼 크기 한계를 결정합니다. 기본적으로 limit는 64KiB로 설정됩니다.

나머지 인자는 loop.create_connection()로 직접 전달됩니다.

참고

The sock argument transfers ownership of the socket to the StreamWriter created. To close the socket, call its close() method.

버전 3.7에서 변경: Added the ssl_handshake_timeout parameter.

버전 3.8에 추가: Added happy_eyeballs_delay and interleave parameters.

버전 3.10에서 변경: Removed the loop parameter.

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

소켓 서버를 시작합니다.

새 클라이언트 연결이 만들어질 때마다 client_connected_cb 콜백이 호출됩니다. 이 콜백은 두 개의 인자로 (reader, writer) 쌍을 받는데, StreamReaderStreamWriter 클래스의 인스턴스입니다.

client_connected_cb는 일반 콜러블이나 코루틴 함수 일 수 있습니다; 코루틴 함수면, 자동으로 Task로 예약됩니다.

limit는 반환된 StreamReader 인스턴스가 사용하는 버퍼 크기 한계를 결정합니다. 기본적으로 limit는 64KiB로 설정됩니다.

나머지 인자는 loop.create_server()로 직접 전달됩니다.

참고

The sock argument transfers ownership of the socket to the server created. To close the socket, call the server’s close() method.

버전 3.7에서 변경: Added the ssl_handshake_timeout and start_serving parameters.

버전 3.10에서 변경: Removed the loop parameter.

유닉스 소켓

coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)

유닉스 소켓 연결을 만들고 (reader, writer) 쌍을 반환합니다.

open_connection()과 비슷하지만, 유닉스 소켓에서 작동합니다.

loop.create_unix_connection()의 설명서도 참조하십시오.

참고

The sock argument transfers ownership of the socket to the StreamWriter created. To close the socket, call its close() method.

가용성: 유닉스.

버전 3.7에서 변경: Added the ssl_handshake_timeout parameter. The path parameter can now be a path-like object

버전 3.10에서 변경: Removed the loop parameter.

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

유닉스 소켓 서버를 시작합니다.

start_server()와 비슷하지만, 유닉스 소켓에서 작동합니다.

loop.create_unix_server()의 설명서도 참조하십시오.

참고

The sock argument transfers ownership of the socket to the server created. To close the socket, call the server’s close() method.

가용성: 유닉스.

버전 3.7에서 변경: Added the ssl_handshake_timeout and start_serving parameters. The path parameter can now be a path-like object.

버전 3.10에서 변경: Removed the loop parameter.

StreamReader

class asyncio.StreamReader

Represents a reader object that provides APIs to read data from the IO stream. As an asynchronous iterable, the object supports the async for statement.

StreamReader 객체를 직접 인스턴스로 만드는 것은 권장되지 않습니다. 대신 open_connection()start_server()를 사용하십시오.

coroutine read(n=- 1)

Read up to n bytes from the stream.

If n is not provided or set to -1, read until EOF, then return all read bytes. If EOF was received and the internal buffer is empty, return an empty bytes object.

If n is 0, return an empty bytes object immediately.

If n is positive, return at most n available bytes as soon as at least 1 byte is available in the internal buffer. If EOF is received before any byte is read, return an empty bytes object.

coroutine readline()

한 줄을 읽습니다. 여기서 “줄”은 \n로 끝나는 바이트의 시퀀스입니다.

EOF를 수신했고, \n를 찾을 수 없으면, 이 메서드는 부분적으로 읽은 데이터를 반환합니다.

EOF를 수신했고, 내부 버퍼가 비어 있으면 빈 bytes 객체를 반환합니다.

coroutine readexactly(n)

정확히 n 바이트를 읽습니다.

n 바이트를 읽기 전에 EOF에 도달하면, IncompleteReadError를 일으킵니다. 부분적으로 읽은 데이터를 가져오려면 IncompleteReadError.partial 어트리뷰트를 사용하십시오.

coroutine readuntil(separator=b'\n')

separator가 발견될 때까지 스트림에서 데이터를 읽습니다.

성공하면, 데이터와 separator가 내부 버퍼에서 제거됩니다 (소비됩니다). 반환된 데이터에는 끝에 separator가 포함됩니다.

읽은 데이터의 양이 구성된 스트림 제한을 초과하면 LimitOverrunError 예외가 발생하고, 데이터는 내부 버퍼에 그대로 남아 있으며 다시 읽을 수 있습니다.

완전한 separator가 발견되기 전에 EOF에 도달하면 IncompleteReadError 예외가 발생하고, 내부 버퍼가 재설정됩니다. IncompleteReadError.partial 어트리뷰트에는 separator 일부가 포함될 수 있습니다.

버전 3.5.2에 추가.

at_eof()

버퍼가 비어 있고 feed_eof()가 호출되었으면 True를 반환합니다.

StreamWriter

class asyncio.StreamWriter

IO 스트림에 데이터를 쓰는 API를 제공하는 기록기(writer) 객체를 나타냅니다.

StreamWriter 객체를 직접 인스턴스로 만드는 것은 권장되지 않습니다. 대신 open_connection()start_server()를 사용하십시오.

write(data)

이 메서드는 하부 소켓에 data를 즉시 기록하려고 시도합니다. 실패하면, data는 보낼 수 있을 때까지 내부 쓰기 버퍼에 계류됩니다.

이 메서드는 drain() 메서드와 함께 사용해야 합니다:

stream.write(data)
await stream.drain()
writelines(data)

이 메서드는 하부 소켓에 바이트열의 리스트(또는 임의의 이터러블)를 즉시 기록합니다. 실패하면, data는 보낼 수 있을 때까지 내부 쓰기 버퍼에 계류됩니다.

이 메서드는 drain() 메서드와 함께 사용해야 합니다:

stream.writelines(lines)
await stream.drain()
close()

이 메서드는 스트림과 하부 소켓을 닫습니다.

The method should be used, though not mandatory, along with the wait_closed() method:

stream.close()
await stream.wait_closed()
can_write_eof()

하부 트랜스포트가 write_eof() 메서드를 지원하면 True를 반환하고, 그렇지 않으면 False를 반환합니다.

write_eof()

버퍼링 된 쓰기 데이터가 플러시 된 후에 스트림의 쓰기 끝을 닫습니다.

transport

하부 asyncio 트랜스포트를 돌려줍니다.

get_extra_info(name, default=None)

선택적 트랜스포트 정보에 액세스합니다; 자세한 내용은 BaseTransport.get_extra_info()를 참조하십시오.

coroutine drain()

스트림에 기록을 다시 시작하는 것이 적절할 때까지 기다립니다. 예:

writer.write(data)
await writer.drain()

이것은 하부 IO 쓰기 버퍼와 상호 작용하는 흐름 제어 메서드입니다. 버퍼의 크기가 높은 수위에 도달하면, 버퍼 크기가 낮은 수위까지 내려가서 쓰기가 다시 시작될 수 있을 때까지 drain()은 블록합니다. 기다릴 것이 없으면, drain()은 즉시 반환합니다.

is_closing()

스트림이 닫혔거나 닫히고 있으면 True를 반환합니다.

버전 3.7에 추가.

coroutine wait_closed()

스트림이 닫힐 때까지 기다립니다.

Should be called after close() to wait until the underlying connection is closed, ensuring that all data has been flushed before e.g. exiting the program.

버전 3.7에 추가.

예제

스트림을 사용하는 TCP 메아리 클라이언트

asyncio.open_connection() 함수를 사용하는 TCP 메아리 클라이언트:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

더 보기

TCP 메아리 클라이언트 프로토콜 예제는 저수준 loop.create_connection() 메서드를 사용합니다.

스트림을 사용하는 TCP 메아리 서버

asyncio.start_server() 함수를 사용하는 TCP 메아리 서버:

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

더 보기

TCP 메아리 서버 프로토콜 예제는 loop.create_server() 메서드를 사용합니다.

HTTP 헤더 가져오기

명령 줄로 전달된 URL의 HTTP 헤더를 조회하는 간단한 예제:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()
    await writer.wait_closed()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

사용법:

python example.py http://example.com/path/page.html

또는 HTTPS를 사용하면:

python example.py https://example.com/path/page.html

스트림을 사용하여 데이터를 기다리는 열린 소켓 등록

open_connection() 함수를 사용하여 소켓이 데이터를 수신할 때까지 기다리는 코루틴:

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()
    await writer.wait_closed()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

더 보기

프로토콜을 사용하여 데이터를 기다리는 열린 소켓 등록 예제는 저수준 프로토콜과 loop.create_connection() 메서드를 사용합니다.

파일 기술자에서 읽기 이벤트를 관찰하기 예제는 저수준 loop.add_reader() 메서드를 사용하여 파일 기술자를 관찰합니다.