동기화 프리미티브

소스 코드: Lib/asyncio/locks.py


asyncio 동기화 프리미티브는 threading 모듈의 것과 유사하도록 설계되었습니다만 두 가지 중요한 주의 사항이 있습니다:

  • asyncio 프리미티브는 스레드 안전하지 않으므로, OS 스레드 동기화(이를 위해서는 threading을 사용하십시오)에 사용하면 안 됩니다.

  • 이러한 동기화 프리미티브의 메서드는 timeout 인자를 받아들이지 않습니다; asyncio.wait_for() 함수를 사용하여 시간제한이 있는 연산을 수행하십시오.

asyncio에는 다음과 같은 기본 동기화 프리미티브가 있습니다:


Lock

class asyncio.Lock

asyncio 태스크를 위한 뮤텍스 록을 구현합니다. 스레드 안전하지 않습니다.

asyncio 록은 공유 자원에 대한 독점 액세스를 보장하는 데 사용될 수 있습니다.

Lock을 사용하는 가장 좋은 방법은 async with 문입니다:

lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

이는 다음과 동등합니다:

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

버전 3.10에서 변경: Removed the loop parameter.

coroutine acquire()

록을 얻습니다.

이 메서드는 록이 풀림(unlocked)이 될 때까지 기다리고, 잠김(locked)으로 설정한 다음 True를 반환합니다.

잠금이 해제되기를 기다리는 acquire()에서 둘 이상의 코루틴 블록 될 때, 결국 한 개의 코루틴만 진행됩니다.

록을 얻는 것은 공평(fair)합니다: 진행할 코루틴은 록을 기다리기 시작한 첫 번째 코루틴이 됩니다.

release()

록을 반납합니다.

록이 잠김(locked)이면 풀림(unlocked)으로 재설정하고 돌아옵니다.

록이 풀림(unlocked)이면 RuntimeError가 발생합니다.

locked()

록이 잠김(locked)이면 True를 반환합니다.

Event

class asyncio.Event

이벤트 객체. 스레드 안전하지 않습니다.

asyncio 이벤트는 어떤 이벤트가 발생했음을 여러 asyncio 태스크에 알리는 데 사용할 수 있습니다.

Event 객체는 set() 메서드로 으로 설정하고 clear() 메서드로 거짓으로 재설정할 수 있는 내부 플래그를 관리합니다. wait() 메서드는 플래그가 으로 설정될 때까지 블록합니다. 플래그는 초기에 거짓으로 설정됩니다.

버전 3.10에서 변경: Removed the loop parameter.

예:

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())
coroutine wait()

이벤트가 설정될 때까지 기다립니다.

이벤트가 설정되었으면 True를 즉시 반환합니다. 그렇지 않으면 다른 태스크가 set()을 호출할 때까지 블록합니다.

set()

이벤트를 설정합니다.

이벤트가 설정되기를 기다리는 모든 태스크는 즉시 깨어납니다.

clear()

이벤트를 지웁니다 (재설정).

이제 wait()를 기다리는 태스크는 set() 메서드가 다시 호출될 때까지 블록 됩니다.

is_set()

이벤트가 설정되면 True를 반환합니다.

Condition

class asyncio.Condition(lock=None)

Condition 객체. 스레드 안전하지 않습니다.

asyncio 조건 프리미티브는 태스크가 어떤 이벤트가 발생하기를 기다린 다음 공유 자원에 독점적으로 액세스하는데 사용할 수 있습니다.

본질에서, Condition 객체는 EventLock의 기능을 결합합니다. 여러 개의 Condition 객체가 하나의 Lock을 공유할 수 있으므로, 공유 자원의 특정 상태에 관심이 있는 다른 태스크 간에 공유 자원에 대한 독점적 액세스를 조정할 수 있습니다.

선택적 lock 인자는 Lock 객체나 None 이어야 합니다. 후자의 경우 새로운 Lock 객체가 자동으로 만들어집니다.

버전 3.10에서 변경: Removed the loop parameter.

Condition을 사용하는 가장 좋은 방법은 async with 문입니다:

cond = asyncio.Condition()

# ... later
async with cond:
    await cond.wait()

이는 다음과 동등합니다:

cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()
coroutine acquire()

하부 록을 얻습니다.

이 메서드는 하부 록이 풀림(unlocked)이 될 때까지 대기하고, 잠김(locked)으로 설정한 다음 True를 반환합니다.

notify(n=1)

이 조건을 기다리는 최대 n 태스크(기본적으로 1개)를 깨웁니다. 대기 중인 태스크가 없으면 이 메서드는 no-op입니다.

이 메서드를 호출하기 전에 록을 얻어야 하고, 호출 직후에 반납해야 합니다. 풀린(unlocked) 록으로 호출하면 RuntimeError 에러가 발생합니다.

locked()

하부 록을 얻었으면 True를 돌려줍니다.

notify_all()

이 조건에 대기 중인 모든 태스크를 깨웁니다.

이 메서드는 notify()처럼 작동하지만, 대기 중인 모든 태스크를 깨웁니다.

이 메서드를 호출하기 전에 록을 얻어야 하고, 호출 직후에 반납해야 합니다. 풀린(unlocked) 록으로 호출하면 RuntimeError 에러가 발생합니다.

release()

하부 록을 반납합니다.

풀린 록으로 호출하면, RuntimeError가 발생합니다.

coroutine wait()

알릴 때까지 기다립니다.

이 메서드가 호출될 때 호출하는 태스크가 록을 얻지 않았으면 RuntimeError가 발생합니다.

이 메서드는 하부 잠금을 반납한 다음, notify()notify_all() 호출 때문에 깨어날 때까지 블록합니다. 일단 깨어나면, Condition은 록을 다시 얻고, 이 메서드는 True를 돌려줍니다.

coroutine wait_for(predicate)

predicate가 이 될 때까지 기다립니다.

predicate는 논릿값으로 해석될 결과를 돌려주는 콜러블이어야 합니다. 최종값이 반환 값입니다.

Semaphore

class asyncio.Semaphore(value=1)

Semaphore 객체. 스레드 안전하지 않습니다.

세마포어는 각 acquire() 호출로 감소하고, 각 release() 호출로 증가하는 내부 카운터를 관리합니다. 카운터는 절대로 0 밑으로 내려갈 수 없습니다; acquire()가 0을 만나면, release()를 호출할 때까지 기다리면서 블록합니다.

선택적 value 인자는 내부 카운터의 초깃값을 제공합니다 (기본적으로 1). 지정된 값이 0보다 작으면 ValueError가 발생합니다.

버전 3.10에서 변경: Removed the loop parameter.

Semaphore를 사용하는 가장 좋은 방법은 async with 문입니다:

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

이는 다음과 동등합니다:

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()
coroutine acquire()

세마포어를 얻습니다.

내부 카운터가 0보다 크면, 1 감소시키고 True를 즉시 반환합니다. 0이면, release()가 호출될 때까지 기다린 다음 True를 반환합니다.

locked()

세마포어를 즉시 얻을 수 없으면 True를 반환합니다.

release()

세마포어를 반납하고 내부 카운터를 1 증가시킵니다. 세마포어를 얻기 위해 대기하는 태스크를 깨울 수 있습니다.

BoundedSemaphore와 달리, Semaphoreacquire() 호출보다 더 많은 release() 호출을 허용합니다.

BoundedSemaphore

class asyncio.BoundedSemaphore(value=1)

제한된 세마포어 객체. 스레드 안전하지 않습니다.

제한된 세마포어는 초기 value 위로 내부 카운터를 증가시키면 release()에서 ValueError를 발생시키는 Semaphore 버전입니다.

버전 3.10에서 변경: Removed the loop parameter.

Barrier

class asyncio.Barrier(parties)

A barrier object. Not thread-safe.

A barrier is a simple synchronization primitive that allows to block until parties number of tasks are waiting on it. Tasks can wait on the wait() method and would be blocked until the specified number of tasks end up waiting on wait(). At that point all of the waiting tasks would unblock simultaneously.

async with can be used as an alternative to awaiting on wait().

The barrier can be reused any number of times.

예:

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

Result of this example is:

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

버전 3.11에 추가.

coroutine wait()

Pass the barrier. When all the tasks party to the barrier have called this function, they are all unblocked simultaneously.

When a waiting or blocked task in the barrier is cancelled, this task exits the barrier which stays in the same state. If the state of the barrier is “filling”, the number of waiting task decreases by 1.

The return value is an integer in the range of 0 to parties-1, different for each task. This can be used to select a task to do some special housekeeping, e.g.:

...
async with barrier as position:
   if position == 0:
      # Only one task prints this
      print('End of *draining phase*')

This method may raise a BrokenBarrierError exception if the barrier is broken or reset while a task is waiting. It could raise a CancelledError if a task is cancelled.

coroutine reset()

Return the barrier to the default, empty state. Any tasks waiting on it will receive the BrokenBarrierError exception.

If a barrier is broken it may be better to just leave it and create a new one.

coroutine abort()

Put the barrier into a broken state. This causes any active or future calls to wait() to fail with the BrokenBarrierError. Use this for example if one of the tasks needs to abort, to avoid infinite waiting tasks.

parties

The number of tasks required to pass the barrier.

n_waiting

The number of tasks currently waiting in the barrier while filling.

broken

A boolean that is True if the barrier is in the broken state.

exception asyncio.BrokenBarrierError

This exception, a subclass of RuntimeError, is raised when the Barrier object is reset or broken.


버전 3.9에서 변경: await lock 이나 yield from lock 및/또는 with 문(with await lock, with (yield from lock))을 사용하여 록을 얻는 것은 제거되었습니다. 대신 async with lock을 사용하십시오.