threading --- スレッドベースの並列処理

ソースコード: Lib/threading.py


このモジュールでは、高水準のスレッドインターフェースをより低水準 な _thread モジュールの上に構築しています。

バージョン 3.7 で変更: このモジュールは以前はオプションでしたが、常に利用可能なモジュールとなりました。

参考

concurrent.futures.ThreadPoolExecutor offers a higher level interface to push tasks to a background thread without blocking execution of the calling thread, while still being able to retrieve their results when needed.

queue provides a thread-safe interface for exchanging data between running threads.

asyncio offers an alternative approach to achieving task level concurrency without requiring the use of multiple operating system threads.

注釈

In the Python 2.x series, this module contained camelCase names for some methods and functions. These are deprecated as of Python 3.10, but they are still supported for compatibility with Python 2.5 and lower.

CPython 実装の詳細: CPython は Global Interpreter Lock のため、ある時点で Python コードを実行できるスレッドは1つに限られます (ただし、いくつかのパフォーマンスが強く求められるライブラリはこの制限を克服しています)。アプリケーションにマルチコアマシンの計算能力をより良く利用させたい場合は、 multiprocessing モジュールや concurrent.futures.ProcessPoolExecutor の利用をお勧めします。 ただし、I/Oバウンドなタスクを並行して複数走らせたい場合においては、 マルチスレッドは正しい選択肢です。

Availability: not WASI.

このモジュールは WebAssembly では動作しないか、利用不可です。詳しくは、WebAssembly プラットフォーム を見てください。

このモジュールは以下の関数を定義しています:

threading.active_count()

生存中の Thread オブジェクトの数を返します。この数は enumerate() の返すリストの長さと同じです。

関数``activeCount``はこの関数の非推奨エイリアスです。

threading.current_thread()

関数を呼び出している処理のスレッドに対応する Thread オブジェクトを返します。関数を呼び出している処理のスレッドが threading モジュールで生成したものでない場合、限定的な機能しかもたないダミースレッドオブジェクトを返します。

関数``currentThread``はこの関数の非推奨エイリアスです。

threading.excepthook(args, /)

Thread.run() で発生したキャッチされない例外を処理する。

(実) 引数*args*は以下の属性をもちます:

  • exc_type: 例外の型

  • exc_value: 例外の値、None の可能性がある。

  • exc_traceback: Exception traceback, can be None.

  • thread: Thread which raised the exception, can be None.

If exc_type is SystemExit, the exception is silently ignored. Otherwise, the exception is printed out on sys.stderr.

If this function raises an exception, sys.excepthook() is called to handle it.

threading.excepthook() can be overridden to control how uncaught exceptions raised by Thread.run() are handled.

Storing exc_value using a custom hook can create a reference cycle. It should be cleared explicitly to break the reference cycle when the exception is no longer needed.

Storing thread using a custom hook can resurrect it if it is set to an object which is being finalized. Avoid storing thread after the custom hook completes to avoid resurrecting objects.

参考

sys.excepthook() handles uncaught exceptions.

Added in version 3.8.

threading.__excepthook__

Holds the original value of threading.excepthook(). It is saved so that the original value can be restored in case they happen to get replaced with broken or alternative objects.

Added in version 3.10.

threading.get_ident()

現在のスレッドの 'スレッドID' を返します。非ゼロの整数です。この値は直接の意味を持っていません; 例えばスレッド特有のデータの辞書に索引をつけるためのような、マジッククッキーとして意図されています。スレッドが終了し、他のスレッドが作られたとき、スレッド ID は再利用されるかもしれません。

Added in version 3.3.

threading.get_native_id()

Return the native integral Thread ID of the current thread assigned by the kernel. This is a non-negative integer. Its value may be used to uniquely identify this particular thread system-wide (until the thread terminates, after which the value may be recycled by the OS).

Availability: Windows, FreeBSD, Linux, macOS, OpenBSD, NetBSD, AIX, DragonFlyBSD, GNU/kFreeBSD.

Added in version 3.8.

バージョン 3.13 で変更: Added support for GNU/kFreeBSD.

threading.enumerate()

現在、アクティブな Thread オブジェクト全てのリストを返します。リストには、デーモンスレッド (daemonic thread)、 current_thread() の生成するダミースレッドオブジェクトが入ります。終了したスレッドとまだ開始していないスレッドは入りません。しかし、主スレッドは、たとえ終了しても、常に結果に含まれます。

threading.main_thread()

main Thread オブジェクトを返します。通常の条件では、メインスレッドはPythonインタプリタが起動したスレッドを指します。

Added in version 3.4.

threading.settrace(func)

threading モジュールを使って開始した全てのスレッドにトレース関数を設定します。 func は各スレッドの run() を呼び出す前にスレッドの sys.settrace() に渡されます。

threading.settrace_all_threads(func)

Set a trace function for all threads started from the threading module and all Python threads that are currently executing.

The func will be passed to sys.settrace() for each thread, before its run() method is called.

Added in version 3.12.

threading.gettrace()

settrace() 関数で設定したトレース関数を取得します。

Added in version 3.10.

threading.setprofile(func)

threading モジュールを使って開始した全てのスレッドにプロファイル関数を設定します。 func は各スレッドの run() を呼び出す前にスレッドの sys.setprofile() に渡されます。

threading.setprofile_all_threads(func)

Set a profile function for all threads started from the threading module and all Python threads that are currently executing.

The func will be passed to sys.setprofile() for each thread, before its run() method is called.

Added in version 3.12.

threading.getprofile()

setprofile() 関数で設定したプロファイラ関数を取得します。

Added in version 3.10.

threading.stack_size([size])

新しいスレッドを作るときのスレッドスタックサイズを返します。オプションの size 引数にはこれ以降に作成するスレッドのスタックサイズを指定し、0 (プラットフォームのデフォルト値または設定されたデフォルト値) か、 32,768 (32 KiB) 以上の正の整数でなければなりません。size が指定されない場合 0 が使われます。スレッドのスタックサイズの変更がサポートされていない場合、 RuntimeError を送出します。不正なスタックサイズが指定された場合、 ValueError を送出して、スタックサイズは変更されません。32 KiB は現在のインタープリタ自身のために十分であると保証された最小のスタックサイズです。いくつかのプラットフォームではスタックサイズに対して制限があることに注意してください。例えば最小のスタックサイズが 32 KiB より大きかったり、システムのメモリページサイズ の整数倍の必要があるなどです。この制限についてはプラットフォームのドキュメントを参照してください (一般的なページサイズは 4 KiB なので、プラットフォームに関する情報がない場合は 4096 の整数倍のスタックサイズを選ぶといいかもしれません)。

Availability: Windows, pthreads.

Unix platforms with POSIX threads support.

このモジュールでは以下の定数も定義しています:

threading.TIMEOUT_MAX

ブロックする関数 (Lock.acquire(), RLock.acquire(), Condition.wait() など) の timeout 引数に許される最大値。これ以上の値を timeout に指定すると OverflowError が発生します。

Added in version 3.2.

このモジュールは多くのクラスを定義しています。それらは下記のセクションで詳しく説明されます。

このモジュールのおおまかな設計は Java のスレッドモデルに基づいています。とはいえ、 Java がロックと条件変数を全てのオブジェクトの基本的な挙動にしているのに対し、 Python ではこれらを別個のオブジェクトに分けています。 Python の Thread クラスがサポートしているのは Java の Thread クラスの挙動のサブセットにすぎません; 現状では、優先度 (priority)やスレッドグループがなく、スレッドの破壊 (destroy)、中断 (stop)、一時停止 (suspend)、復帰 (resume)、割り込み (interrupt) は行えません。 Java の Thread クラスにおける静的メソッドに対応する機能が実装されている場合にはモジュールレベルの関数になっています。

以下に説明するメソッドは全て原子的 (atomic) に実行されます。

スレッドローカルデータ

スレッドローカルデータは、その値がスレッド固有のデータです。スレッドローカルデータを管理するには、単に local (あるいはそのサブクラス) のインスタンスを作成して、その属性に値を設定してください:

mydata = threading.local()
mydata.x = 1

インスタンスの値はスレッドごとに違った値になります。

class threading.local

スレッドローカルデータを表現するクラス。

For more details and extensive examples, see the documentation string of the _threading_local module: Lib/_threading_local.py.

Thread オブジェクト

The Thread class represents an activity that is run in a separate thread of control. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass. No other methods (except for the constructor) should be overridden in a subclass. In other words, only override the __init__() and run() methods of this class.

ひとたびスレッドオブジェクトを生成すると、スレッドの start() メソッドを呼び出して活動を開始しなければなりません。 start() メソッド はそれぞれのスレッドの run() メソッドを起動します。

スレッドの活動が始まると、スレッドは '生存中 (alive)' とみなされます。 スレッドは、通常 run() メソッドが終了するまで、もしくは捕捉されない例外が送出されるまで生存中となります。 is_alive() メソッドは、スレッドが生存中であるかどうか調べます。

スレッドは他のスレッドの join() メソッドを呼び出すことができます。このメソッドは、 join() メソッドを呼ばれたスレッドが終了するまでメソッドの呼び出し元のスレッドをブロックします。

スレッドは名前を持っています。名前はコンストラクタに渡すことができ、 name 属性を通して読み出したり変更したりできます。

run() メソッドが例外を発生させた場合、threading.excepthook() が呼び出され、例外を処理します。デフォルトでは、threading.excepthook()SystemExit を黙殺します。

スレッドには "デーモンスレッド (daemon thread)" であるというフラグを立てられます。 このフラグには、残っているスレッドがデーモンスレッドだけになった時に Python プログラム全体を終了させるという意味があります。フラグの初期値はスレッドを生成したスレッドから継承します。フラグの値は daemon プロパティまたは daemon コンストラクタ引数を通して設定できます。

注釈

デーモンスレッドは終了時にいきなり停止されます。デーモンスレッドで使われたリソース (開いているファイル、データベースのトランザクションなど) は適切に解放されないかもしれません。きちんと (gracefully) スレッドを停止したい場合は、スレッドを非デーモンスレッドにして、Event のような適切なシグナル送信機構を使用してください。

スレッドには "主スレッド (main thread)" オブジェクトがあります。主スレッドは Python プログラムを最初に制御していたスレッドです。主スレッドはデーモンスレッドではありません。

"ダミースレッドオブジェクト (dummy thread objects)" が作成される場合があります。ダミースレッドは、 "外来スレッド (alien thread)" に相当するスレッドオブジェクトです。ダミースレッドは、C コードから直接生成されたスレッドのような、 threading モジュールの外で開始された処理スレッドです。ダミースレッドオブジェクトには限られた機能しかなく、常に生存中、かつデーモンスレッドであるとみなされ、 join できません。また、外来スレッドの終了を検出するのは不可能なので、ダミースレッドは削除できません。

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

コンストラクタは常にキーワード引数を使って呼び出さなければなりません。各引数は以下の通りです:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

targetrun() メソッドによって起動される呼び出し可能オブジェクトです。デフォルトでは何も呼び出さないことを示す None になっています。

name*はスレッド名です。デフォルトでは、"Thread-*N"(N*は小さな10進数)、または*target*引数が指定された場合"Thread-*N (target)"("target"は``target.__name__``)という形式でユニークな名前が構成される。

argstarget を呼び出すときの引数のリストかタプルです。デフォルトは () です。

kwargstarget を呼び出すときのキーワード引数の辞書です。デフォルトは {} です。

None でない場合、daemon はスレッドがデーモンかどうかを明示的に設定します。None の場合 (デフォルト)、デーモン属性は現在のスレッドから継承されます。

サブクラスでコンストラクタをオーバライドした場合、必ずスレッドが何かを始める前に基底クラスのコンストラクタ (Thread.__init__()) を呼び出しておかなくてはなりません。

バージョン 3.3 で変更: Added the daemon parameter.

バージョン 3.10 で変更: name 引数が省略された場合、target 名を使用します。

start()

スレッドの活動を開始します。

このメソッドは、スレッドオブジェクトあたり一度しか呼び出してはなりません。 start() は、オブジェクトの run() メソッドが個別の処理スレッド中で呼び出されるように調整します。

同じスレッドオブジェクトに対し、このメソッドを2回以上呼び出した場合、 RuntimeError を送出します。

run()

スレッドの活動をもたらすメソッドです。

このメソッドはサブクラスでオーバライドできます。標準の run() メソッドでは、オブジェクトのコンストラクタの target 引数に呼び出し可能オブジェクトを指定した場合、 args および kwargs の位置引数およびキーワード引数とともに呼び出します。

Thread に渡される args の引数にリストやタプルを使っても、同じ効果が得られます。

以下はプログラム例です:

>>> from threading import Thread
>>> t = Thread(target=print, args=[1])
>>> t.run()
1
>>> t = Thread(target=print, args=(1,))
>>> t.run()
1
join(timeout=None)

スレッドが終了するまで待機します。 このメソッドは、 join() を呼ばれたスレッドが正常終了あるいは処理されない例外によって終了するか、オプションのタイムアウトが発生するまで、メソッドの呼び出し元のスレッドをブロックします。

When the timeout argument is present and not None, it should be a floating-point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened -- if the thread is still alive, the join() call timed out.

timeout が指定されないかまたは None であるときは、この操作はスレッドが終了するまでブロックします。

1つのスレッドは何回も join されることができます。

現在のスレッドに対して join() を呼び出そうとすると、デッドロックを引き起こすため RuntimeError が送出されます。 スレッドが開始される前に join() を呼び出すことも同様のエラーのため、同じ例外が送出されます。

name

識別のためにのみ用いられる文字列です。名前には機能上の意味づけ (semantics) はありません。複数のスレッドに同じ名前をつけてもかまいません。名前の初期値はコンストラクタで設定されます。

getName()
setName()

name に対する非推奨 getter/setter API; 代わりにプロパティを直接使用してください。

バージョン 3.10 で非推奨.

ident

'スレッド識別子' 、または、スレッドが開始されていなければ None です。非ゼロの整数です。 get_ident() 関数を参照下さい。スレッド識別子は、スレッドが終了した後、新たなスレッドが生成された場合、再利用され得ます。スレッド識別子は、スレッドが終了した後でも利用できます。

native_id

The Thread ID (TID) of this thread, as assigned by the OS (kernel). This is a non-negative integer, or None if the thread has not been started. See the get_native_id() function. This value may be used to uniquely identify this particular thread system-wide (until the thread terminates, after which the value may be recycled by the OS).

注釈

Similar to Process IDs, Thread IDs are only valid (guaranteed unique system-wide) from the time the thread is created until the thread has been terminated.

Availability: Windows, FreeBSD, Linux, macOS, OpenBSD, NetBSD, AIX, DragonFlyBSD.

Added in version 3.8.

is_alive()

スレッドが生存中かどうかを返します。

このメソッドは、 run() メソッドが起動する直前から run() メソッドが終了する直後までの間 True を返します。モジュール関数 enumerate() は、全ての生存中のスレッドのリストを返します。

daemon

このスレッドがデーモンスレッドか (True) か否か (False) を示すブール値。この値は start() の呼び出し前に設定されなければなりません。さもなければ RuntimeError が送出されます。初期値は生成側のスレッドから継承されます; メインスレッドはデーモンスレッドではないので、メインスレッドで作成されたすべてのスレッドは、デフォルトで daemon = False になります。

デーモンでない生存中のスレッドが全てなくなると、 Python プログラム全体が終了します。

isDaemon()
setDaemon()

daemon に対する非推奨 getter/setter API; 代わりにプロパティを直接使用してください。

バージョン 3.10 で非推奨.

Lock オブジェクト

プリミティブロックとは、ロックが生じた際に特定のスレッドによって所有されない同期プリミティブです。 Python では現在のところ拡張モジュール _thread で直接実装されている最も低水準の同期プリミティブを使えます。

プリミティブロックは2つの状態、 "ロック" または "アンロック" があります。ロックはアンロック状態で作成されます。ロックには基本となる二つのメソッド、 acquire()release() があります。ロックの状態がアンロックである場合、 acquire() は状態をロックに変更して即座に処理を戻します。 状態がロックの場合、 acquire() は他のスレッドが release() を呼び出してロックの状態をアンロックに変更するまでブロックします。その後、 acquire() 呼び出しは状態を再度ロックに設定してから処理を戻します。 release() メソッドを呼び出すのはロック状態のときでなければなりません; このメソッドはロックの状態をアンロックに変更して、即座に処理を戻します。 アンロックの状態のロックを解放しようとすると RuntimeError が送出されます。

ロックは コンテキストマネージメントプロトコル もサポートします。

複数のスレッドにおいて acquire() がアンロック状態への遷移を待っているためにブロックが起きている時に release() を呼び出してロックの状態をアンロックにすると、一つのスレッドだけが処理を進行できます。 どのスレッドが処理を進行できるのかは定義されておらず、実装によって異なるかもしれません。

全てのメソッドはアトミックに実行されます。

class threading.Lock

プリミティブロック (primitive lock) オブジェクトを実装しているクラスです。スレッドが一度ロックを獲得すると、それ以後のロック獲得の試みはロックが解放されるまでブロックします。どのスレッドでもロックを解放できます。

バージョン 3.13 で変更: Lock is now a class. In earlier Pythons, Lock was a factory function which returned an instance of the underlying private lock type.

acquire(blocking=True, timeout=-1)

ブロックあり、またはブロックなしでロックを獲得します。

引数 blockingTrue (デフォルト) に設定して呼び出した場合、ロックがアンロック状態になるまでブロックします。そしてそれをロック状態にしてから True を返します。

引数 blocking の値を False にして呼び出すとブロックしません。blockingTrue にして呼び出した場合にブロックするような状況では、直ちに False を返します。それ以外の場合には、ロックをロック状態にして True を返します。

正の値に設定された浮動小数点の timeout 引数とともに起動された場合、ロックを得られなければ最大で timeout によって指定された秒数だけブロックします。timeout 引数の -1 は無制限の待機を指定します。blockingFalse の場合に timeout を指定することは禁止されています。

ロックを獲得すると True を、ロックを獲得できなかったとき (例えば timeout が過ぎた場合) には False を返します。

バージョン 3.2 で変更: 新しい timeout 引数。

バージョン 3.2 で変更: Lock acquisition can now be interrupted by signals on POSIX if the underlying threading implementation supports it.

release()

ロックを解放します。これはロックを獲得したスレッドだけでなく、任意のスレッドから呼ぶことができます。

ロックの状態がロックのとき、状態をアンロックにリセットして処理を戻します。他のスレッドがロックがアンロック状態になるのを待ってブロックしている場合、ただ一つのスレッドだけが処理を継続できるようにします。

アンロック状態のロックに対して呼び出された場合、RuntimeError が送出されます。

戻り値はありません。

locked()

ロック状態のとき True を返します。

RLock オブジェクト

再入可能ロック (reentrant lock) とは、同じスレッドが複数回獲得できるような同期プリミティブです。再入可能ロックの内部では、プリミティブロックの使うロック/アンロック状態に加え、 "所有スレッド (owning thread)" と "再帰レベル (recursion level)" という概念を用いています。ロック状態では何らかのスレッドがロックを所有しており、アンロック状態ではいかなるスレッドもロックを所有していません。

Threads call a lock's acquire() method to lock it, and its release() method to unlock it.

注釈

Reentrant locks support the context management protocol, so it is recommended to use with instead of manually calling acquire() and release() to handle acquiring and releasing the lock for a block of code.

RLock's acquire()/release() call pairs may be nested, unlike Lock's acquire()/release(). Only the final release() (the release() of the outermost pair) resets the lock to an unlocked state and allows another thread blocked in acquire() to proceed.

acquire()/release() must be used in pairs: each acquire must have a release in the thread that has acquired the lock. Failing to call release as many times the lock has been acquired can lead to deadlock.

class threading.RLock

このクラスは再入可能ロックオブジェクトを実装します。再入可能ロックはそれを獲得したスレッドによって解放されなければなりません。いったんスレッドが再入可能ロックを獲得すると、同じスレッドはブロックされずにもう一度それを獲得できます ; そのスレッドは獲得した回数だけ解放しなければいけません。

RLock は実際にはファクトリ関数で、プラットフォームでサポートされる最も効率的なバージョンの具体的な RLock クラスのインスタンスを返すことに注意してください。

acquire(blocking=True, timeout=-1)

ブロックあり、またはブロックなしでロックを獲得します。

参考

Using RLock as a context manager

Recommended over manual acquire() and release() calls whenever practical.

When invoked with the blocking argument set to True (the default):

  • If no thread owns the lock, acquire the lock and return immediately.

  • If another thread owns the lock, block until we are able to acquire lock, or timeout, if set to a positive float value.

  • If the same thread owns the lock, acquire the lock again, and return immediately. This is the difference between Lock and RLock; Lock handles this case the same as the previous, blocking until the lock can be acquired.

When invoked with the blocking argument set to False:

  • If no thread owns the lock, acquire the lock and return immediately.

  • If another thread owns the lock, return immediately.

  • If the same thread owns the lock, acquire the lock again and return immediately.

In all cases, if the thread was able to acquire the lock, return True. If the thread was unable to acquire the lock (i.e. if not blocking or the timeout was reached) return False.

If called multiple times, failing to call release() as many times may lead to deadlock. Consider using RLock as a context manager rather than calling acquire/release directly.

バージョン 3.2 で変更: 新しい timeout 引数。

release()

再帰レベルをデクリメントしてロックを解放します。デクリメント後に再帰レベルがゼロになった場合、ロックの状態をアンロック (いかなるスレッドにも所有されていない状態) にリセットし、ロックの状態がアンロックになるのを待ってブロックしているスレッドがある場合にはその中のただ一つだけが処理を進行できるようにします。デクリメント後も再帰レベルがゼロでない場合、ロックの状態はロックのままで、呼び出し側のスレッドに所有されたままになります。

Only call this method when the calling thread owns the lock. A RuntimeError is raised if this method is called when the lock is not acquired.

戻り値はありません。

Condition オブジェクト

条件変数 (condition variable) は、常にある種のロックに関連付けられています; このロックは明示的に渡すことも、デフォルトで生成させることもできます。複数の条件変数で同じロックを共有しなければならない場合には、引渡しによる関連付けが便利です。ロックは条件オブジェクトの一部です: それを別々に扱う必要はありません。

条件変数は コンテキスト管理プロトコル に従います: with 文を使って囲まれたブロックの間だけ関連付けられたロックを獲得することができます。 acquire() メソッドと release() メソッドは、さらに関連付けられたロックの対応するメソッドを呼び出します。

他のメソッドは、関連付けられたロックを保持した状態で呼び出さなければなりません。 wait() メソッドはロックを解放します。そして別のスレッドが notify() または notify_all() を呼ぶことによってスレッドを起こすまでブロックします。一旦起こされたなら、 wait() は再びロックを得て戻ります。タイムアウトを指定することも可能です。

notify() メソッドは条件変数待ちのスレッドを1つ起こします。 notify_all() メソッドは条件変数待ちの全てのスレッドを起こします。

注意: notify()notify_all() はロックを解放しません; 従って、スレッドが起こされたとき、 wait() の呼び出しは即座に処理を戻すわけではなく、 notify() または notify_all() を呼び出したスレッドが最終的にロックの所有権を放棄したときに初めて処理を返すのです。

条件変数を使う典型的なプログラミングスタイルでは、何らかの共有された状態変数へのアクセスを同期させるためにロックを使います; 状態変数が特定の状態に変化したことを知りたいスレッドは、自分の望む状態になるまで繰り返し wait() を呼び出します。その一方で、状態変更を行うスレッドは、前者のスレッドが待ち望んでいる状態であるかもしれないような状態へ変更を行ったときに notify()notify_all() を呼び出します。例えば、以下のコードは無制限のバッファ容量のときの一般的な生産者-消費者問題です:

# Consume one item
with cv:
    while not an_item_is_available():
        cv.wait()
    get_an_available_item()

# Produce one item
with cv:
    make_an_item_available()
    cv.notify()

アプリケーションの条件をチェックする while ループは必須です。なぜなら、 wait() が任意の長時間の後で返り、 notify() 呼び出しを促した条件がもはや真でないことがありえるからです。これはマルチスレッドプログラミングに固有です。条件チェックを自動化するために wait_for() メソッドを使うことができ、それはタイムアウトの計算を簡略化します:

# Consume an item
with cv:
    cv.wait_for(an_item_is_available)
    get_an_available_item()

notify()notify_all() のどちらを使うかは、その状態の変化に興味を持っている待ちスレッドが一つだけなのか、あるいは複数なのかで考えます。例えば、典型的な生産者-消費者問題では、バッファに1つの要素を加えた場合には消費者スレッドを 1 つしか起こさなくてかまいません。

class threading.Condition(lock=None)

このクラスは条件変数 (condition variable) オブジェクトを実装します。条件変数を使うと、1つ以上のスレッドを別のスレッドの通知があるまで待機させておけます。

lockNone でない値を指定した場合、その値は Lock または RLock オブジェクトでなければなりません。 この場合、 lock は根底にあるロックオブジェクトとして使われます。 それ以外の場合には、 RLock オブジェクトを新しく作成して使います。

バージョン 3.3 で変更: ファクトリ関数からクラスに変更されました。

acquire(*args)

根底にあるロックを獲得します。このメソッドは根底にあるロックの対応するメソッドを呼び出します。そのメソッドの戻り値を返します。

release()

根底にあるロックを解放します。このメソッドは根底にあるロックの対応するメソッドを呼び出します。戻り値はありません。

wait(timeout=None)

通知 (notify) を受けるか、タイムアウトするまで待機します。呼び出し側のスレッドがロックを獲得していないときにこのメソッドを呼び出すと RuntimeError が送出されます。

このメソッドは根底にあるロックを解放し、他のスレッドが同じ条件変数に対して notify() または notify_all() を呼び出して現在のスレッドを起こすか、オプションのタイムアウトが発生するまでブロックします。一度スレッドが起こされると、再度ロックを獲得して処理を戻します。

When the timeout argument is present and not None, it should be a floating-point number specifying a timeout for the operation in seconds (or fractions thereof).

根底にあるロックが RLock である場合、 release() メソッドではロックは解放されません。というのも、ロックが再帰的に複数回獲得されている場合には、 release() によって実際にアンロックが行われないかもしれないからです。その代わり、ロックが再帰的に複数回獲得されていても確実にアンロックを行える RLock クラスの内部インターフェースを使います。その後ロックを再獲得する時に、もう一つの内部インターフェースを使ってロックの再帰レベルを復帰します。

与えられた timeout が過ぎていなければ返り値は True です。タイムアウトした場合には False が返ります。

バージョン 3.2 で変更: 以前は、このメソッドは常に None を返していました。

wait_for(predicate, timeout=None)

条件が真と判定されるまで待ちます。 predicate は呼び出し可能オブジェクトでなければならず、その結果はブール値として解釈されます。 最大の待ち時間を指定する timeout を与えることができます。

このユーティリティメソッドは、述語が満たされるかタイムアウトが発生するまで wait() を繰り返し呼び出す場合があります。戻り値は述語の最後の戻り値で、もしメソッドがタイムアウトすれば、 False と評価されます。

タイムアウト機能を無視すれば、このメソッドの呼び出しは以下のように書くのとほぼ等価です:

while not predicate():
    cv.wait()

したがって、 wait() と同じルールが適用されます: 呼び出された時にロックを保持していなければならず、戻るときにロックが再度獲得されます。述語はロックを保持した状態で評価されます。

Added in version 3.2.

notify(n=1)

デフォルトで、この条件変数を待っている1つのスレッドを起こします。 呼び出し側のスレッドがロックを獲得していないときにこのメソッドを呼び出すと RuntimeError が送出されます。

何らかの待機中スレッドがある場合、そのうち n スレッドを起こします。待機中のスレッドがなければ何もしません。

現在の実装では、少なくとも n スレッドが待機中であれば、ちょうど n スレッドを起こします。とはいえ、この挙動に依存するのは安全ではありません。将来、実装の最適化によって、複数のスレッドを起こすようになるかもしれないからです。

注意: 起こされたスレッドは実際にロックを再獲得できるまで wait() 呼び出しから戻りません。 notify() はロックを解放しないので、 notify() 呼び出し側は明示的にロックを解放しなければなりません。

notify_all()

この条件を待っているすべてのスレッドを起こします。このメソッドは notify() のように動作しますが、 1 つではなくすべての待ちスレッドを起こします。呼び出し側のスレッドがロックを獲得していない場合、 RuntimeError が送出されます。

関数``notifyAll``はこの関数の非推奨エイリアスです。

Semaphore オブジェクト

セマフォ (semaphore) は、計算機科学史上最も古い同期プリミティブの一つ で、草創期のオランダ計算機科学者 Edsger W. Dijkstra によって発明されました (彼は acquire()release() の代わりに P()V() を使いました)。

セマフォは acquire() でデクリメントされ release() でインクリメントされるような内部カウンタを管理します。 カウンタは決してゼロより小さくはなりません; acquire() は、カウンタがゼロになっている場合、他のスレッドが release() を呼び出すまでブロックします。

セマフォは コンテキストマネージメントプロトコル もサポートします。

class threading.Semaphore(value=1)

このクラスはセマフォ (semaphore) オブジェクトを実装します。セマフォは、 release() を呼び出した数から acquire() を呼び出した数を引き、初期値を足した値を表す極小のカウンタを管理します。 acquire() メソッドは、カウンタの値を負にせずに処理を戻せるまで必要ならば処理をブロックします。 value を指定しない場合、デフォルトの値は 1 になります。

オプションの引数には、内部カウンタの初期値を指定します。デフォルトは 1 です。与えられた value が 0 より小さい場合、 ValueError が送出されます。

バージョン 3.3 で変更: ファクトリ関数からクラスに変更されました。

acquire(blocking=True, timeout=None)

セマフォを獲得します。

When invoked without arguments:

  • If the internal counter is larger than zero on entry, decrement it by one and return True immediately.

  • If the internal counter is zero on entry, block until awoken by a call to release(). Once awoken (and the counter is greater than 0), decrement the counter by 1 and return True. Exactly one thread will be awoken by each call to release(). The order in which threads are awoken should not be relied on.

blockingFalse にして呼び出すとブロックしません。引数なしで呼び出した場合にブロックするような状況であった場合には直ちに False を返します。それ以外の場合には、引数なしで呼び出したときと同じ処理を行い True を返します。

None 以外の timeout で起動された場合、最大で timeout 秒ブロックします。 acquire が その間隔の間で完了しなかった場合は False が返ります。そうでなければ True が返ります。

バージョン 3.2 で変更: 新しい timeout 引数。

release(n=1)

内部カウンタを n インクリメントして、セマフォを解放します。 release() 処理に入ったときにカウンタがゼロであり、カウンタの値がゼロより大きくなるのを待っている別のスレッドがあった場合、それらのスレッドから n 個を起こします。

バージョン 3.9 で変更: 複数の待機中のスレッドを一度に解放する n パラメータを追加しました。

class threading.BoundedSemaphore(value=1)

有限セマフォ (bounded semaphore) オブジェクトを実装しているクラスです。有限セマフォは、現在の値が初期値を超過しないようチェックを行います。超過を起こした場合、 ValueError を送出します。たいていの場合、セマフォは限られた容量のリソースを保護するために使われるものです。従って、あまりにも頻繁なセマフォの解放はバグが生じているしるしです。 value を指定しない場合、デフォルトの値は 1 になります。

バージョン 3.3 で変更: ファクトリ関数からクラスに変更されました。

Semaphore の例

セマフォはしばしば、容量に限りのある資源、例えばデータベースサーバなどを保護するために使われます。リソースが固定の状況では、常に有限セマフォを使わなければなりません。主スレッドは、作業スレッドを立ち上げる前にセマフォを初期化します:

maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)

作業スレッドは、ひとたび立ち上がると、サーバへ接続する必要が生じたときにセマフォの acquire() および release() メソッドを呼び出します:

with pool_sema:
    conn = connectdb()
    try:
        # ... use connection ...
    finally:
        conn.close()

有限セマフォを使うと、セマフォを獲得回数以上に解放してしまうというプログラム上の間違いを見逃しにくくします。

Event オブジェクト

イベントは、あるスレッドがイベントを発信し、他のスレッドはそれを待つという、スレッド間で通信を行うための最も単純なメカニズムの一つです。

イベントオブジェクトは内部フラグを管理します。このフラグは set() メソッドで値を true に、 clear() メソッドで値を false にリセットします。 wait() メソッドはフラグが true になるまでブロックします。

class threading.Event

イベントオブジェクトを実装しているクラスです。イベントは set() メソッドを使うと True に、 clear() メソッドを使うと False にセットされるようなフラグを管理します。 wait() メソッドは、全てのフラグが true になるまでブロックするようになっています。フラグの初期値は false です。

バージョン 3.3 で変更: ファクトリ関数からクラスに変更されました。

is_set()

内部フラグが真のとき True を返します。

メソッド isSet はこのメソッドの非推奨エイリアスです。

set()

内部フラグの値を true にセットします。フラグの値が true になるのを待っている全てのスレッドを起こします。一旦フラグが true になると、スレッドが wait() を呼び出しても全くブロックしなくなります。

clear()

内部フラグの値を false にリセットします。以降は、 set() を呼び出して再び内部フラグの値を true にセットするまで、 wait() を呼び出したスレッドはブロックするようになります。

wait(timeout=None)

Block as long as the internal flag is false and the timeout, if given, has not expired. The return value represents the reason that this blocking method returned; True if returning because the internal flag is set to true, or False if a timeout is given and the internal flag did not become true within the given wait time.

When the timeout argument is present and not None, it should be a floating-point number specifying a timeout for the operation in seconds, or fractions thereof.

バージョン 3.1 で変更: 以前は、このメソッドは常に None を返していました。

Timer オブジェクト

このクラスは、一定時間経過後に実行される活動、すなわちタイマ活動を表現します。 TimerThread のサブクラスであり、自作のスレッドを構築した一例でもあります。

Timers are started, as with threads, by calling their Timer.start method. The timer can be stopped (before its action has begun) by calling the cancel() method. The interval the timer will wait before executing its action may not be exactly the same as the interval specified by the user.

例えば:

def hello():
    print("hello, world")

t = Timer(30.0, hello)
t.start()  # after 30 seconds, "hello, world" will be printed
class threading.Timer(interval, function, args=None, kwargs=None)

interval 秒後に引数 args キーワード引数 kwargsfunction を実行するようなタイマを生成します。args*が ``None`` (デフォルト) なら空のリストが使用されます。*kwargsNone (デフォルト) なら空の辞書が使用されます。

バージョン 3.3 で変更: ファクトリ関数からクラスに変更されました。

cancel()

タイマをストップして、その動作の実行をキャンセルします。このメソッドはタイマがまだ活動待ち状態にある場合にのみ動作します。

バリアオブジェクト

Added in version 3.2.

このクラスは、互いを待つ必要のある固定の数のスレッドで使用するための単純な同期プリミティブを提供します。それぞれのスレッドは wait() メソッドを呼ぶことによりバリアを通ろうとしてブロックします。すべてのスレッドがそれぞれの wait() メソッドを呼び出した時点で、すべてのスレッドが同時に解放されます。

バリアは同じ数のスレッドに対して何度でも再利用することができます。

例として、クライアントとサーバの間でスレッドを同期させる単純な方法を紹介します:

b = Barrier(2, timeout=5)

def server():
    start_server()
    b.wait()
    while True:
        connection = accept_connection()
        process_server_connection(connection)

def client():
    b.wait()
    while True:
        connection = make_connection()
        process_client_connection(connection)
class threading.Barrier(parties, action=None, timeout=None)

parties 個のスレッドのためのバリアオブジェクトを作成します。 action は、もし提供されるなら呼び出し可能オブジェクトで、スレッドが解放される時にそのうちの1つによって呼ばれます。 timeout は、 wait() メソッドに対して none が指定された場合のデフォルトのタイムアウト値です。

wait(timeout=None)

バリアを通ります。バリアに対するすべてのスレッドがこの関数を呼んだ時に、それらは同時にすべて解放されます。timeout が提供される場合、それはクラスコンストラクタに渡された値に優先して使用されます。

返り値は 0 から parties -- 1 の範囲の整数で、それぞれのスレッドに対して異なります。これは、特別な後始末 (housekeeping) を行うスレッドを選択するために使用することができます。例えば:

i = barrier.wait()
if i == 0:
    # Only one thread needs to print this
    print("passed the barrier")

action がコンストラクタに渡されていれば、スレッドのうちの1つが解放される前にそれを呼び出します。万一この呼び出しでエラーが発生した場合、バリアは broken な状態に陥ります。

この呼び出しがタイムアウトする場合、バリアは broken な状態に陥ります。

スレッドが待っている間にバリアが broken になるかリセットされた場合、このメソッドは BrokenBarrierError 例外を送出するかもしれません。

reset()

バリアをデフォルトの空の状態に戻します。そのバリアの上で待っているすべてのスレッドは BrokenBarrierError 例外を受け取ります。

状態が未知の他のスレッドがある場合、この関数を使用するのに何らかの外部同期を必要とするかもしれないことに注意してください。バリアが broken な場合、単にそれをそのままにして新しいものを作成する方がよいでしょう。

abort()

バリアを broken な状態にします。これによって、現在または将来の wait() 呼び出しが BrokenBarrierError とともに失敗するようになります。これを使うと、例えばスレッドが異常終了する必要がある場合にアプリケーションがデッドロックするのを避けることができます。

スレッドのうちの1つが返ってこないことに対して自動的に保護するように、単純に常識的な timeout 値でバリアを作成することは望ましいかもしれません。

parties

バリアを通るために要求されるスレッドの数。

n_waiting

現在バリアの中で待っているスレッドの数。

broken

バリアが broken な状態である場合に True となるブール値。

exception threading.BrokenBarrierError

Barrier オブジェクトがリセットされるか broken な場合に、この例外 (RuntimeError のサブクラス) が送出されます。

with 文でのロック・条件変数・セマフォの使い方

All of the objects provided by this module that have acquire and release methods can be used as context managers for a with statement. The acquire method will be called when the block is entered, and release will be called when the block is exited. Hence, the following snippet:

with some_lock:
    # do something...

は、以下と同じです

some_lock.acquire()
try:
    # do something...
finally:
    some_lock.release()

現在のところ、 LockRLockConditionSemaphoreBoundedSemaphorewith 文のコンテキストマネージャとして使うことができます。