multiprocessing --- プロセスベースの並列処理¶
ソースコード: Lib/multiprocessing/
はじめに¶
multiprocessing は、 threading と似た API で複数のプロセスの生成をサポートするパッケージです。 multiprocessing パッケージは、ローカルとリモート両方の並行処理を提供します。また、このパッケージはスレッドの代わりにサブプロセスを使用することにより、グローバルインタープリタロック の問題を避ける工夫が行われています。このような特徴があるため multiprocessing モジュールを使うことで、マルチプロセッサーマシンの性能を最大限に活用することができるでしょう。なお、このモジュールは Unix と Windows の両方で動作します。
multiprocessing モジュールでは、threading モジュールには似たものが存在しない API も導入されています。その最たるものが Pool オブジェクトです。これは複数の入力データに対して、サブプロセス群に入力データを分配 (データ並列) して関数を並列実行するのに便利な手段を提供します。以下の例では、モジュール内で関数を定義して、子プロセスがそのモジュールを正常にインポートできるようにする一般的な方法を示します。 Pool を用いたデータ並列の基礎的な例は次の通りです:
from multiprocessing import Pool
def f(x):
    return x*x
if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))
標準出力に以下が出力されます:
[1, 4, 9]
Process クラス¶
multiprocessing モジュールでは、プロセスは以下の手順によって生成されます。はじめに Process のオブジェクトを作成し、続いて start() メソッドを呼び出します。この Process クラスは threading.Thread クラスと同様の API を持っています。まずは、簡単な例をもとにマルチプロセスを使用したプログラムについてみていきましょう
from multiprocessing import Process
def f(name):
    print('hello', name)
if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()
実行された個々のプロセス ID を表示するために拡張したサンプルコードを以下に示します:
from multiprocessing import Process
import os
def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
def f(name):
    info('function f')
    print('hello', name)
if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()
なぜ if __name__ == '__main__' という記述が必要かは プログラミングガイドライン を参照してください。
コンテキストと開始方式¶
プラットフォームにもよりますが、multiprocessing はプロセスを開始するために 3 つの方法をサポートしています。それら 開始方式 は以下のとおりです
- spawn
親プロセスは新たに python インタープリタープロセスを開始します。子プロセスはプロセスオブジェクトの
run()メソッドの実行に必要なリソースのみ継承します。特に、親プロセスからの不要なファイル記述子とハンドルは継承されません。この方式を使用したプロセスの開始は fork や forkserver に比べ遅くなります。Unix と Windows で利用可能。Windows と macOS でのデフォルト。
- fork
親プロセスは
os.fork()を使用して Python インタープリターをフォークします。子プロセスはそれが開始されるとき、事実上親プロセスと同一になります。親プロセスのリソースはすべて子プロセスに継承されます。マルチスレッドプロセスのフォークは安全性に問題があることに注意してください。Unix でのみ利用可能。Unix でのデフォルト。
- forkserver
プログラムを開始するとき forkserver 方式を選択した場合、サーバープロセスが開始されます。それ以降、新しいプロセスが必要になったときはいつでも、親プロセスはサーバーに接続し、新しいプロセスのフォークを要求します。フォークサーバープロセスはシングルスレッドなので
os.fork()の使用に関しても安全です。不要なリソースは継承されません。Unix パイプを経由したファイル記述子の受け渡しをサポートする Unix で利用可能。
バージョン 3.8 で変更: macOS では、 spawn 開始方式がデフォルトになりました。 fork 開始方法は、サブプロセスのクラッシュを引き起こす可能性があるため、安全ではありません。 bpo-33725 を参照。
バージョン 3.4 で変更: すべての Unix プラットフォームで spawn が、一部のプラットフォームで forkserver が追加されました。Windows では親プロセスの継承可能な全ハンドルが子プロセスに継承されることがなくなりました。
Unix で開始方式に spawn あるいは forkserver を使用した場合は、プログラムのプロセスによって作成されたリンクされていない名前付きシステムリソース (名前付きセマフォや、SharedMemory オブジェクト) を追跡する リソーストラッカー プロセスも開始されます。全プロセスが終了したときに、リソーストラッカーは残っているすべての追跡していたオブジェクトのリンクを解除します。通常そういったことはないのですが、プロセスがシグナルによって kill されたときに "漏れた" リソースが発生する場合があります。(この場合、名前付きセマフォと共有メモリセグメントは、システムが再起動されるまでリンク解除されません。名前付きセマフォの個数はシステムによって制限されており、共有メモリセグメントはメインメモリを占有するため、これらは問題になる可能性があります。)
開始方式はメインモジュールの if __name__ == '__main__' 節内で、関数 set_start_method() によって指定します。以下に例を示します:
import multiprocessing as mp
def foo(q):
    q.put('hello')
if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()
関数 set_start_method() はプログラム内で複数回使用してはいけません。
もうひとつの方法として、get_context() を使用してコンテキストオブジェクトを取得することができます。コンテキストオブジェクトは multiprocessing モジュールと同じ API を持ち、同じプログラム内で複数の開始方式を使用できます。
import multiprocessing as mp
def foo(q):
    q.put('hello')
if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()
あるコンテキストに関連したオブジェクトは、異なるコンテキストのプロセスとは互換性がない場合があることに注意してください。特に、fork コンテキストを使用して作成されたロックは、spawn あるいは forkserver を使用して開始されたプロセスに渡すことはできません。
特定の開始方式の使用を要求するライブラリは get_context() を使用してライブラリ利用者の選択を阻害しないようにするべきです。
警告
Unix において、'spawn' あるいは 'forkserver' で開始された場合、 "frozen" な実行可能形式 (PyInstaller や cx_Freeze で作成されたバイナリなど) は使用できません。'fork' で開始した場合は動作します。
プロセス間でのオブジェクト交換¶
multiprocessing モジュールでは、プロセス間通信の手段が2つ用意されています。それぞれ以下に詳細を示します:
キュー (Queue)
Queueクラスはqueue.Queueクラスとほとんど同じように使うことができます。以下に例を示します:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()キューはスレッドセーフであり、プロセスセーフです。
パイプ (Pipe)
Pipe()関数はパイプで繋がれたコネクションオブジェクトのペアを返します。デフォルトでは双方向性パイプを返します。以下に例を示します:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()パイプのそれぞれの端を表す2つのコネクションオブジェクトが
Pipe()関数から返されます。各コネクションオブジェクトには、send()、recv()、その他のメソッドがあります。2つのプロセス (またはスレッド) がパイプの 同じ 端で同時に読み込みや書き込みを行うと、パイプ内のデータが破損してしまうかもしれないことに注意してください。もちろん、各プロセスがパイプの別々の端を同時に使用するならば、データが破壊される危険性はありません。
プロセス間の同期¶
multiprocessing は threading モジュールと等価な同期プリミティブを備えています。以下の例では、ロックを使用して、一度に1つのプロセスしか標準出力に書き込まないようにしています:
from multiprocessing import Process, Lock
def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()
if __name__ == '__main__':
    lock = Lock()
    for num in range(10):
        Process(target=f, args=(lock, num)).start()
ロックを使用しないで標準出力に書き込んだ場合は、各プロセスからの出力がごちゃまぜになってしまいます。
ワーカープロセスのプールを使用¶
Pool クラスは、ワーカープロセスをプールする機能を備えています。このクラスには、異なる方法でワーカープロセスへタスクを割り当てるいくつかのメソッドがあります。
例えば:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
    return x*x
if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:
        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))
        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)
        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"
        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process
        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])
        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")
        print("For the moment, the pool remains available for more work")
    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")
プールオブジェクトのメソッドは、そのプールを作成したプロセスのみが呼び出すべきです。
注釈
このパッケージに含まれる機能を使用するためには、子プロセスから __main__ モジュールをインポートできる必要があります。このことについては プログラミングガイドライン で触れていますが、ここであらためて強調しておきます。なぜかというと、いくつかのサンプルコード、例えば multiprocessing.pool.Pool のサンプルはインタラクティブシェル上では動作しないからです。以下に例を示します:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> with p:
...   p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(もしこのコードを試すなら、実際には3つの完全なトレースバックがばらばらの順番で出力されますし、親プロセスを何らかの方法で止める必要があります。)
リファレンス¶
multiprocessing パッケージは threading モジュールの API とほとんど同じです。
Process クラスと例外¶
- 
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶
- Process オブジェクトは各プロセスの処理を表します。 - Processクラスは- threading.Threadクラスのすべてのメソッドと同じインターフェースを提供します。- コンストラクターは必ずキーワード引数で呼び出すべきです。引数 group には必ず - Noneを渡してください。 この引数は- threading.Threadクラスとの互換性のためだけに残されています。引数 target には、- run()メソッドから呼び出される callable オブジェクトを渡します。この引数はデフォルトで- Noneとなっており、何も呼び出されません。引数 name にはプロセス名を渡します (詳細は- nameを見てください)。 args は対象の呼び出しに対する引数のタプルを渡します。 kwargs は対象の呼び出しに対するキーワード引数の辞書を渡します。もし提供されれば、キーワード専用の daemon 引数はプロセスの- daemonフラグを- Trueまたは- Falseにセットします。- Noneの場合 (デフォルト)、このフラグは作成するプロセスから継承されます。- デフォルトでは、target には引数が渡されないようになっています。 - サブクラスがコンストラクターをオーバーライドする場合は、そのプロセスに対する処理を行う前に基底クラスのコンストラクター ( - Process.__init__()) を実行しなければなりません。- バージョン 3.3 で変更: daemon 引数が追加されました。 - 
run()¶
- プロセスが実行する処理を表すメソッドです。 - このメソッドはサブクラスでオーバーライドすることができます。標準の - run()メソッドは、コンストラクターの target 引数として渡された呼び出し可能オブジェクトを呼び出します。もしコンストラクターに args もしくは kwargs 引数が渡されていれば、呼び出すオブジェクトにこれらの引数を渡します。
 - 
start()¶
- プロセスの処理を開始するためのメソッドです。 - 各 Process オブジェクトに対し、このメソッドが2回以上呼び出されてはいけません。各プロセスでオブジェクトの - run()メソッドを呼び出す準備を行います。
 - 
join([timeout])¶
- オプションの引数 timeout が - None(デフォルト) の場合、- join()メソッドが呼ばれたプロセスは処理が終了するまでブロックします。 timeout が正の数である場合、最大 timeout 秒ブロックします。 プロセスが終了あるいはタイムアウトした場合、メソッドは- Noneを返すことに注意してください。 プロセスの- exitcodeを確認し終了したかどうかを判断してください。- 1つのプロセスは何回も join されることができます。 - プロセスは自分自身を join することはできません。それはデッドロックを引き起こすことがあるからです。プロセスが start される前に join しようとするとエラーが発生します。 
 - 
name¶
- プロセスの名前。名前は識別のためだけに使用される文字列です。それ自体には特別な意味はありません。複数のプロセスに同じ名前が与えられても構いません。 - 最初の名前はコンストラクターによってセットされます。コンストラクターに明示的な名前が渡されない場合、 'Process-N1:N2:...:Nk' 形式の名前が構築されます。ここでそれぞれの Nk はその親のN番目の子供です。 
 - 
daemon¶
- デーモンプロセスであるかのフラグであり、ブール値です。この属性は - start()が呼び出される前に設定されている必要があります。- 初期値は作成するプロセスから継承します。 - あるプロセスが終了するとき、そのプロセスはその子プロセスであるデーモンプロセスすべてを終了させようとします。 - デーモンプロセスは子プロセスを作成できないことに注意してください。もし作成できてしまうと、そのデーモンプロセスの親プロセスが終了したときにデーモンプロセスの子プロセスが孤児になってしまう場合があるからです。さらに言えば、デーモンプロセスはUnix デーモンやサービスでは なく 通常のプロセスであり、非デーモンプロセスが終了すると終了されます (そして join されません)。 
 - threading.Threadクラスの API に加えて- Processクラスのオブジェクトには以下の属性およびメソッドがあります:- 
pid¶
- プロセスIDを返します。プロセスの生成前は - Noneが設定されています。
 - 
exitcode¶
- The child's exit code. This will be - Noneif the process has not yet terminated.- If the child's - run()method returned normally, the exit code will be 0. If it terminated via- sys.exit()with an integer argument N, the exit code will be N.- If the child terminated due to an exception not caught within - run(), the exit code will be 1. If it was terminated by signal N, the exit code will be the negative value -N.
 - 
authkey¶
- プロセスの認証キーです (バイト文字列です)。 - multiprocessingモジュールがメインプロセスにより初期化される場合には、- os.urandom()関数を使用してランダムな値が設定されます。- Processクラスのオブジェクトの作成時にその親プロセスから認証キーを継承します。もしくは- authkeyに別のバイト文字列を設定することもできます。- 詳細は 認証キー を参照してください。 
 - 
sentinel¶
- プロセスが終了するときに "ready" となるシステムオブジェクトの数値ハンドル。 - multiprocessing.connection.wait()を使用していくつかのイベントを同時に wait したい場合はこの値を使うことができます。それ以外の場合は- join()を呼ぶ方がより単純です。- Windows においては、これは - WaitForSingleObjectおよび- WaitForMultipleObjectsファミリーの API 呼び出しで使用可能な OS ハンドルです。 Unix においては、これは- selectモジュールのプリミティブで使用可能なファイル記述子です。- バージョン 3.3 で追加. 
 - 
terminate()¶
- プロセスを終了します。Unix 環境では - SIGTERMシグナルを、 Windows 環境では- TerminateProcess()を使用して終了させます。終了ハンドラーや finally 節などは、実行されないことに注意してください。- このメソッドにより終了するプロセスの子孫プロセスは、終了 しません 。そういった子孫プロセスは単純に孤児になります。 - 警告 - このメソッドの使用時に、関連付けられたプロセスがパイプやキューを使用している場合には、使用中のパイプやキューが破損して他のプロセスから使用できなくなる可能性があります。同様に、プロセスがロックやセマフォなどを取得している場合には、このプロセスが終了してしまうと他のプロセスのデッドロックの原因になるでしょう。 
 - 
kill()¶
- meth:terminate() と同様の動作をしますが、Unix では``SIGKILL`` シグナルを使用します。 - バージョン 3.7 で追加. 
 - 
close()¶
- Processオブジェクトを閉じ、関連付けられていたすべてのリソースを開放します。中のプロセスが実行中であった場合、- ValueErrorを送出します。- close()が成功した場合、class:Process オブジェクトの他のメソッドや属性は、ほとんどが- ValueErrorを送出します。- バージョン 3.7 で追加. 
 - プロセスオブジェクトが作成したプロセスのみが - start(),- join(),- is_alive(),- terminate()と- exitcodeのメソッドを呼び出すべきです。- 以下の例では - Processのメソッドの使い方を示しています:- >>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True 
- 
- 
exception multiprocessing.ProcessError¶
- すべての - multiprocessing例外の基底クラスです。
- 
exception multiprocessing.BufferTooShort¶
- この例外は - Connection.recv_bytes_into()によって発生し、バッファーオブジェクトが小さすぎてメッセージが読み込めないことを示します。- eが- BufferTooShortのインスタンスであるとすると、- e.args[0]はそのメッセージをバイト文字列で与えるものです。
- 
exception multiprocessing.AuthenticationError¶
- 認証エラーがあった場合に送出されます。 
- 
exception multiprocessing.TimeoutError¶
- タイムアウトをサポートするメソッドでタイムアウトが過ぎたときに送出されます。 
パイプ (Pipe) とキュー (Queue)¶
複数のプロセスを使う場合、一般的にはメッセージパッシングをプロセス間通信に使用し、ロックのような同期プリミティブを使用しないようにします。
メッセージのやりとりのために Pipe() (2つのプロセス間の通信用)、もしくはキュー (複数のメッセージ生成プロセス (producer)、消費プロセス (consumer) の実現用) を使うことができます。
Queue, SimpleQueue と JoinableQueue 型は複数プロセスから生成/消費を行う FIFO キューです。これらのキューは標準ライブラリの queue.Queue を模倣しています。 Queue には Python 2.5 の queue.Queue クラスで導入された task_done() と join() メソッドがないことが違う点です。
もし JoinableQueue を使用するなら、キューから削除される各タスクのために JoinableQueue.task_done() を呼び出さなければ なりません 。さもないと、いつか完了していないタスクを数えるためのセマフォがオーバーフローし、例外を発生させるでしょう。
管理オブジェクトを使用することで共有キューを作成できることも覚えておいてください。詳細は マネージャー を参照してください。
注釈
multiprocessing は、タイムアウトを伝えるために、通常の queue.Empty と queue.Full 例外を使用します。それらは multiprocessing の名前空間では利用できないため、queue からインポートする必要があります。
注釈
オブジェクトがキューに追加される際、そのオブジェクトは pickle 化されています。そのため、バックグラウンドのスレッドが後になって下位層のパイプに pickle 化されたデータをフラッシュすることがあります。これにより、少し驚くような結果になりますが、実際に問題になることはないはずです。これが問題になるような状況では、かわりに manager を使ってキューを作成することができるからです。
- 空のキューの中にオブジェクトを追加した後、キューの - empty()メソッドが- Falseを返すまでの間にごくわずかな遅延が起きることがあり、- get_nowait()が- queue.Emptyを発生させることなく制御が呼び出し元に返ってしまうことがあります。
- 複数のプロセスがオブジェクトをキューに詰めている場合、キューの反対側ではオブジェクトが詰められたのとは違う順序で取得される可能性があります。ただし、同一のプロセスから詰め込まれたオブジェクトは、それらのオブジェクト間では、必ず期待どおりの順序になります。 
警告
Queue を利用しようとしている最中にプロセスを Process.terminate() や os.kill() で終了させる場合、キューにあるデータは破損し易くなります。終了した後で他のプロセスがキューを利用しようとすると、例外を発生させる可能性があります。
警告
上述したように、もし子プロセスがキューへ要素を追加するなら (かつ JoinableQueue.cancel_join_thread を使用しないなら) そのプロセスはバッファーされたすべての要素がパイプへフラッシュされるまで終了しません。
これは、そのプロセスを join しようとする場合、キューに追加されたすべての要素が消費されたことが確実でないかぎり、デッドロックを発生させる可能性があることを意味します。似たような現象で、子プロセスが非デーモンプロセスの場合、親プロセスは終了時に非デーモンのすべての子プロセスを join しようとしてハングアップする可能性があります。
マネージャーを使用して作成されたキューではこの問題はありません。詳細は プログラミングガイドライン を参照してください。
プロセス間通信におけるキューの使用例を知りたいなら 使用例 を参照してください。
- 
multiprocessing.Pipe([duplex])¶
- パイプの両端を表す - Connectionオブジェクトのペア- (conn1, conn2)を返します。- duplex が - True(デフォルト) ならパイプは双方向性です。duplex が- Falseならパイプは一方向性で、- conn1はメッセージの受信専用、- conn2はメッセージの送信専用になります。
- 
class multiprocessing.Queue([maxsize])¶
- パイプや2~3個のロック/セマフォを使用して実装されたプロセス共有キューを返します。あるプロセスが最初に要素をキューへ追加するとき、バッファーからパイプの中へオブジェクトを転送する供給スレッドが開始されます。 - 標準ライブラリの - queueモジュールの通常の- queue.Emptyや- queue.Full例外がタイムアウトを伝えるために送出されます。- Queueは- task_done()や- join()を除く- queue.Queueのすべてのメソッドを実装します。- 
qsize()¶
- おおよそのキューのサイズを返します。マルチスレッディング/マルチプロセスの特性上、この数値は信用できません。 - Note that this may raise - NotImplementedErroron Unix platforms like macOS where- sem_getvalue()is not implemented.
 - 
empty()¶
- キューが空っぽなら - Trueを、そうでなければ- Falseを返します。マルチスレッディング/マルチプロセシングの特性上、これは信用できません。
 - 
full()¶
- キューがいっぱいなら - Trueを、そうでなければ- Falseを返します。マルチスレッディング/マルチプロセシングの特性上、これは信用できません。
 - 
put(obj[, block[, timeout]])¶
- キューの中へ obj を追加します。オプションの引数 block が - True(デフォルト) 且つ timeout が- None(デフォルト) なら、空きスロットが利用可能になるまで必要であればブロックします。 timeout が正の数なら、最大 timeout 秒ブロックして、その時間内に空きスロットが利用できなかったら- queue.Full例外を発生させます。それ以外 (block が- False) で、空きスロットがすぐに利用可能な場合はキューに要素を追加します。そうでなければ- queue.Full例外が発生します(その場合 timeout は無視されます)。- バージョン 3.8 で変更: If the queue is closed, - ValueErroris raised instead of- AssertionError.
 - 
put_nowait(obj)¶
- put(obj, False)と等価です。
 - 
get([block[, timeout]])¶
- キューから要素を取り出して削除します。オプションの引数 block が - True(デフォルト) 且つ timeout が- None(デフォルト) なら、要素が取り出せるまで必要であればブロックします。 timeout が正の数なら、最大 timeout 秒ブロックして、その時間内に要素が取り出せなかったら- queue.Empty例外を発生させます。それ以外 (block が- False) で、要素がすぐに取り出せる場合は要素を返します。そうでなければ- queue.Empty例外が発生します(その場合 timeout は無視されます)。- バージョン 3.8 で変更: If the queue is closed, - ValueErroris raised instead of- OSError.
 - 
get_nowait()¶
- get(False)と等価です。
 - multiprocessing.Queueは- queue.Queueにはない追加メソッドがあります。 これらのメソッドは通常、ほとんどのコードに必要ありません:- 
close()¶
- カレントプロセスからこのキューへそれ以上データが追加されないことを表します。バックグラウンドスレッドはパイプへバッファーされたすべてのデータをフラッシュするとすぐに終了します。これはキューがガベージコレクトされるときに自動的に呼び出されます。 
 - 
join_thread()¶
- バックグラウンドスレッドを join します。このメソッドは - close()が呼び出された後でのみ使用されます。バッファーされたすべてのデータがパイプへフラッシュされるのを保証するため、バックグラウンドスレッドが終了するまでブロックします。- デフォルトでは、あるプロセスがキューを作成していない場合、終了時にキューのバックグラウンドスレッドを join しようとします。そのプロセスは - join_thread()が何もしないように- cancel_join_thread()を呼び出すことができます。
 - 
cancel_join_thread()¶
- join_thread()がブロッキングするのを防ぎます。特にこれはバックグラウンドスレッドがそのプロセスの終了時に自動的に join されるのを防ぎます。詳細は- join_thread()を参照してください。- このメソッドは - allow_exit_without_flush()という名前のほうがよかったかもしれません。キューに追加されたデータが失われてしまいがちなため、このメソッドを使う必要はほぼ確実にないでしょう。本当にこれが必要になるのは、キューに追加されたデータを下位層のパイプにフラッシュすることなくカレントプロセスを直ちに終了する必要があり、かつ失われるデータに関心がない場合です。
 - 注釈 - このクラスに含まれる機能には、ホストとなるオペレーティングシステム上で動作している共有セマフォ (shared semaphore) を使用しているものがあります。これが使用できない場合には、このクラスが無効になり、 - Queueをインスタンス化する時に- ImportErrorが発生します。詳細は bpo-3770 を参照してください。同様のことが、以下に列挙されている特殊なキューでも成り立ちます。
- 
- 
class multiprocessing.SimpleQueue¶
- 単純化された - Queue型です。ロックされた- Pipeと非常に似ています。- 
close()¶
- Close the queue: release internal resources. - A queue must not be used anymore after it is closed. For example, - get(),- put()and- empty()methods must no longer be called.- バージョン 3.9 で追加. 
 - 
empty()¶
- キューが空ならば - Trueを、そうでなければ- Falseを返します。
 - 
get()¶
- キューから要素を削除して返します。 
 - 
put(item)¶
- item をキューに追加します。 
 
- 
- 
class multiprocessing.JoinableQueue([maxsize])¶
- JoinableQueueは- Queueのサブクラスであり、- task_done()や- join()メソッドが追加されているキューです。- 
task_done()¶
- 以前にキューへ追加されたタスクが完了したことを表します。キューのコンシューマによって使用されます。 タスクをフェッチするために使用されるそれぞれの - get()に対して、 後続の- task_done()呼び出しはタスクの処理が完了したことをキューへ伝えます。- もし - join()がブロッキング状態なら、 すべての要素が処理されたときに復帰します(- task_done()呼び出しが すべての要素からキュー内へ- put()されたと受け取ったことを意味します)。- キューにある要素より多く呼び出された場合 - ValueErrorが発生します。
 - 
join()¶
- キューにあるすべてのアイテムが取り出されて処理されるまでブロックします。 - キューに要素が追加されると未完了タスク数が増えます。コンシューマがキューの要素が取り出されてすべての処理が完了したことを表す - task_done()を呼び出すと数が減ります。 未完了タスク数がゼロになると- join()はブロッキングを解除します。
 
- 
その他¶
- 
multiprocessing.active_children()¶
- カレントプロセスのすべてのアクティブな子プロセスのリストを返します。 - これを呼び出すと "join" してすでに終了しているプロセスには副作用があります。 
- 
multiprocessing.cpu_count()¶
- システムの CPU 数を返します。 - この数は現在のプロセスが使える CPU 数と同じものではありません。 使用可能な CPU 数は - len(os.sched_getaffinity(0))で取得できます。- When the number of CPUs cannot be determined a - NotImplementedErroris raised.
- 
multiprocessing.current_process()¶
- カレントプロセスに対応する - Processオブジェクトを返します。- threading.current_thread()とよく似た関数です。
- 
multiprocessing.parent_process()¶
- Return the - Processobject corresponding to the parent process of the- current_process(). For the main process,- parent_processwill be- None.- バージョン 3.8 で追加. 
- 
multiprocessing.freeze_support()¶
- multiprocessingを使用しているプログラムをフリーズして Windows の実行可能形式を生成するためのサポートを追加します。(py2exe , PyInstaller や cx_Freeze でテストされています。)- メインモジュールの - if __name__ == '__main__'の直後にこの関数を呼び出す必要があります。以下に例を示します:- from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start() - もし - freeze_support()の行がない場合、フリーズされた実行可能形式を実行しようとすると- RuntimeErrorを発生させます。- freeze_support()の呼び出しは Windows 以外の OS では効果がありません。さらに、もしモジュールが Windows の通常の Python インタプリタによって実行されているならば(プログラムがフリーズされていなければ)- freeze_support()は効果がありません。
- 
multiprocessing.get_all_start_methods()¶
- サポートしている開始方式のリストを返します。先頭の要素がデフォルトを意味します。利用可能な開始方式には - 'fork'、- 'spawn'および- 'forkserver'があります。Windows では- 'spawn'のみが利用可能です。Unix では- 'fork'および- 'spawn'は常にサポートされており、- 'fork'がデフォルトになります。- バージョン 3.4 で追加. 
- 
multiprocessing.get_context(method=None)¶
- multiprocessingモジュールと同じ属性を持つコンテキストオブジェクトを返します。- method が - Noneの場合、デフォルトのコンテキストが返されます。その他の場合 method は- 'fork'、- 'spawn'あるいは- 'forkserver'でなければなりません。指定された開始方式が利用できない場合は- ValueErrorが送出されます。- バージョン 3.4 で追加. 
- 
multiprocessing.get_start_method(allow_none=False)¶
- 開始するプロセスで使用する開始方式名を返します。 - 開始方式がまだ確定しておらず、allow_none の値が偽の場合、開始方式はデフォルトに確定され、その名前が返されます。開始方式が確定しておらず、allow_none の値が真の場合、 - Noneが返されます。- The return value can be - 'fork',- 'spawn',- 'forkserver'or- None.- 'fork'is the default on Unix, while- 'spawn'is the default on Windows and macOS.
バージョン 3.8 で変更: macOS では、 spawn 開始方式がデフォルトになりました。 fork 開始方法は、サブプロセスのクラッシュを引き起こす可能性があるため、安全ではありません。 bpo-33725 を参照。
バージョン 3.4 で追加.
- 
multiprocessing.set_executable(executable)¶
- Set the path of the Python interpreter to use when starting a child process. (By default - sys.executableis used). Embedders will probably need to do some thing like- set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe')) - 子プロセスを作成する前に行ってください。 - バージョン 3.4 で変更: Unix で開始方式に - 'spawn'を使用している場合にサポートされました。
- 
multiprocessing.set_start_method(method)¶
- 子プロセスの開始方式を指定します。method には - 'fork'、- 'spawn'あるいは- 'forkserver'を指定できます。- これは一度しか呼び出すことができず、その場所もメインモジュールの - if __name__ == '__main__'節内で保護された状態でなければなりません。- バージョン 3.4 で追加. 
Connection オブジェクト¶
Connection オブジェクトは pickle でシリアライズ可能なオブジェクトか文字列を送ったり、受け取ったりします。そういったオブジェクトはメッセージ指向の接続ソケットと考えられます。
Connection オブジェクトは通常は Pipe を使用して作成されます。 詳細は リスナーとクライアント も参照してください。
- 
class multiprocessing.connection.Connection¶
- 
send(obj)¶
- コネクションの相手側へ - recv()を使用して読み込むオブジェクトを送ります。- オブジェクトは pickle でシリアライズ可能でなければなりません。 pickle が極端に大きすぎる (OS にも依りますが、およそ 32 MiB+) と、 - ValueError例外が送出されることがあります。
 - 
recv()¶
- コネクションの相手側から - send()を使用して送られたオブジェクトを返します。 何か受け取るまでブロックします。何も受け取らずにコネクションの相手側でクローズされた場合- EOFErrorが発生します。
 - 
fileno()¶
- コネクションが使用するハンドラーか、ファイル記述子を返します。 
 - 
close()¶
- コネクションをクローズします。 - コネクションがガベージコレクトされるときに自動的に呼び出されます。 
 - 
poll([timeout])¶
- 読み込み可能なデータがあるかどうかを返します。 - timeout が指定されていなければすぐに返します。 timeout に数値を指定すると、最大指定した秒数をブロッキングします。 timeout に - Noneを指定するとタイムアウトせずにずっとブロッキングします。- multiprocessing.connection.wait()を使って複数のコネクションオブジェクトを同時にポーリングできることに注意してください。
 - 
send_bytes(buffer[, offset[, size]])¶
- bytes-like object から完全なメッセージとしてバイトデータを送ります。 - offset が指定されると buffer のその位置からデータが読み込まれます。 size が指定されるとバッファーからその量のデータが読み込まれます。非常に大きなバッファー (OS に依存しますが、およそ 32MiB+) を指定すると、 - ValueError例外が発生するかもしれません。
 - 
recv_bytes([maxlength])¶
- コネクションの相手側から送られたバイトデータの完全なメッセージを文字列として返します。何か受け取るまでブロックします。受け取るデータが何も残っておらず、相手側がコネクションを閉じていた場合、 - EOFErrorが送出されます。- maxlength を指定していて、かつメッセージが maxlength より長い場合、 - OSErrorが発生してコネクションからそれ以上読めなくなります。
 - 
recv_bytes_into(buffer[, offset])¶
- コネクションの相手側から送られたバイトデータを buffer に読み込み、メッセージのバイト数を返します。 何か受け取るまでブロックします。何も受け取らずにコネクションの相手側でクローズされた場合 - EOFErrorが発生します。- buffer は書き込み可能な bytes-like object でなければなりません。 offset が与えられたら、その位置からバッファーへメッセージが書き込まれます。 オフセットは buffer バイトよりも小さい正の数でなければなりません。 - バッファーがあまりに小さいと - BufferTooShort例外が発生します。- eが例外インスタンスとすると完全なメッセージは- e.args[0]で確認できます。
 - バージョン 3.3 で変更: - Connection.send()と- Connection.recv()を使用して Connection オブジェクト自体をプロセス間で転送できるようになりました。- バージョン 3.3 で追加: Connection オブジェクトがコンテキストマネージメント・プロトコルをサポートするようになりました。 -- コンテキストマネージャ型 を参照してください。 - __enter__()は Connection オブジェクトを返します。また- __exit__()は- close()を呼び出します。
- 
例えば:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告
Connection.recv() メソッドは受信したデータを自動的に unpickle 化します。それはメッセージを送ったプロセスが信頼できる場合を除いてセキュリティリスクになります。
そのため Pipe() を使用してコネクションオブジェクトを生成する場合を除いて、何らかの認証処理を実行した後で recv() や send() メソッドのみを使用すべきです。詳細は 認証キー を参照してください。
警告
もしプロセスがパイプの読み込みまたは書き込み中に kill されると、メッセージの境界がどこなのか分からなくなってしまうので、そのパイプ内のデータは破損してしまいがちです。
同期プリミティブ¶
一般的にマルチプロセスプログラムは、マルチスレッドプログラムほどは同期プリミティブを必要としません。詳細は threading モジュールのドキュメントを参照してください。
マネージャーオブジェクトを使用して同期プリミティブを作成できることも覚えておいてください。詳細は マネージャー を参照してください。
- 
class multiprocessing.Barrier(parties[, action[, timeout]])¶
- バリアーオブジェクト: - threading.Barrierのクローンです。- バージョン 3.3 で追加. 
- 
class multiprocessing.BoundedSemaphore([value])¶
- 有限セマフォオブジェクト: - threading.BoundedSemaphoreの類似物です。- よく似た - threading.BoundedSemaphoreとは、次の一点だけ異なります。- acquireメソッドの第一引数名は block で、- Lock.acquire()と一致しています。- 注釈 - On macOS, this is indistinguishable from - Semaphorebecause- sem_getvalue()is not implemented on that platform.
- 
class multiprocessing.Condition([lock])¶
- 状態変数: - threading.Conditionの別名です。- lock を指定するなら - multiprocessingの- Lockか- RLockオブジェクトにすべきです。- バージョン 3.3 で変更: - wait_for()メソッドが追加されました。
- 
class multiprocessing.Event¶
- threading.Eventのクローンです。
- 
class multiprocessing.Lock¶
- 再帰しないロックオブジェクトで、 - threading.Lock相当のものです。プロセスやスレッドがロックをいったん獲得 (acquire) すると、それに続くほかのプロセスやスレッドが獲得しようとする際、それが解放 (release) されるまではブロックされます。解放はどのプロセス、スレッドからも行えます。スレッドに対して適用される- threading.Lockのコンセプトと振る舞いは、特筆すべきものがない限り、プロセスとスレッドに適用される- multiprocessing.Lockに引き継がれています。- Lockは実際にはファクトリ関数で、デフォルトコンテキストで初期化された- multiprocessing.synchronize.Lockのインスタンスを返すことに注意してください。- Lockは context manager プロトコルをサポートしています。つまり- with文で使うことができます。- 
acquire(block=True, timeout=None)¶
- ブロックあり、またはブロックなしでロックを獲得します。 - 引数 block を - True(デフォルト) に設定して呼び出した場合、ロックがアンロック状態になるまでブロックします。ブロックから抜けるとそれをロック状態にしてから- Trueを返します。- threading.Lock.acquire()の最初の引数とは名前が違っているので注意してください。- 引数 block の値を - Falseにして呼び出すとブロックしません。 現在ロック状態であれば、直ちに- Falseを返します。それ以外の場合には、ロックをロック状態にして- Trueを返します。- timeout として正の浮動小数点数を与えて呼び出すと、ロックが獲得できない限り、指定された秒数だけブロックします。 timeout 値に負数を与えると、ゼロを与えた場合と同じになります。 timeout 値の - None(デフォルト) を与えると、無限にブロックします。 timeout 引数の負数と- Noneの扱いは、- threading.Lock.acquire()に実装された動作と異なるので注意してください。 block が- Falseの場合、 timeout は実際的な意味を持たなくなるので無視されます。ロックを獲得した場合は- True、タイムアウトした場合は- Falseで戻ります。
 - 
release()¶
- ロックを解放します。これはロックを獲得したプロセスやスレッドだけでなく、任意のプロセスやスレッドから呼ぶことができます。 - threading.Lock.release()と同じように振舞いますが、ロックされていない場合に呼び出すと- ValueErrorとなる点だけが違います。
 
- 
- 
class multiprocessing.RLock¶
- 再帰ロックオブジェクトで、 - threading.RLock相当のものです。再帰ロックオブジェクトはそれを獲得 (acquire) したプロセスやスレッドが解放 (release) しなければなりません。プロセスやスレッドがロックをいったん獲得すると、同じプロセスやスレッドはブロックされずに再度獲得出来ます。そのプロセスやスレッドは獲得した回数ぶん解放しなければなりません。- RLockは実際にはファクトリ関数で、デフォルトコンテキストで初期化された- multiprocessing.synchronize.Lockのインスタンスを返すことに注意してください。- RLockは context manager プロトコルをサポートしています。つまり- with文で使うことができます。- 
acquire(block=True, timeout=None)¶
- ブロックあり、またはブロックなしでロックを獲得します。 - block 引数を - Trueにして呼び出した場合、ロックが既にカレントプロセスもしくはカレントスレッドが既に所有していない限りは、アンロック状態 (どのプロセス、スレッドも所有していない状態) になるまでブロックします。ブロックから抜けるとカレントプロセスもしくはカレントスレッドが (既に持っていなければ) 所有権を得て、再帰レベルをインクリメントし、- Trueで戻ります。- threading.RLock.acquire()の実装とはこの最初の引数の振る舞いが、その名前自身を始めとしていくつか違うので注意してください。- block 引数を - Falseにして呼び出した場合、ブロックしません。ロックが他のプロセスもしくはスレッドにより獲得済み (つまり所有されている) であれば、カレントプロセスまたはカレントスレッドは所有権を得ず、再帰レベルも変更せずに、- Falseで戻ります。ロックがアンロック状態の場合、カレントプロセスもしくはカレントスレッドは所有権を得て再帰レベルがインクリメントされ、- Trueで戻ります。(---訳注: block の True/False 関係なくここでの説明では「所有権を持っている場合の2度目以降の aquire」の説明が欠けています。2度目以降の acquire では再帰レベルがインクリメントされて即座に返ります。全体読めばわかるとは思いますが一応。---)- timeout 引数の使い方と振る舞いは - Lock.acquire()と同じです。 timeout 引数の振る舞いがいくつかの点で- threading.RLock.acquire()と異なるので注意してください。
 - 
release()¶
- 再帰レベルをデクリメントしてロックを解放します。デクリメント後に再帰レベルがゼロになった場合、ロックの状態をアンロック (いかなるプロセス、いかなるスレッドにも所有されていない状態) にリセットし、ロックの状態がアンロックになるのを待ってブロックしているプロセスもしくはスレッドがある場合にはその中のただ一つだけが処理を進行できるようにします。デクリメント後も再帰レベルがゼロでない場合、ロックの状態はロックのままで、呼び出し側のプロセスもしくはスレッドに所有されたままになります。 - このメソッドは呼び出しプロセスあるいはスレッドがロックを所有している場合に限り呼び出してください。所有者でないプロセスもしくはスレッドによって呼ばれるか、あるいはアンロック (未所有) 状態で呼ばれた場合、 - AssertionErrorが送出されます。同じ状況での- threading.RLock.release()実装とは例外の型が異なるので注意してください。
 
- 
- 
class multiprocessing.Semaphore([value])¶
- セマフォオブジェクト: - threading.Semaphoreのクローンです。- よく似た - threading.BoundedSemaphoreとは、次の一点だけ異なります。- acquireメソッドの第一引数名は block で、- Lock.acquire()と一致しています。
注釈
On macOS, sem_timedwait is unsupported, so calling acquire() with
a timeout will emulate that function's behavior using a sleeping loop.
注釈
メインスレッドが BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() 又は Condition.wait() を呼び出してブロッキング状態のときに Ctrl-C で生成される SIGINT シグナルを受け取ると、その呼び出しはすぐに中断されて KeyboardInterrupt が発生します。
これは同等のブロッキング呼び出しが実行中のときに SIGINT が無視される threading の振る舞いとは違っています。
注釈
このパッケージに含まれる機能には、ホストとなるオペレーティングシステム上で動作している共有セマフォを使用しているものがあります。これが使用できない場合には、multiprocessing.synchronize モジュールが無効になり、このモジュールのインポート時に ImportError が発生します。詳細は bpo-3770 を参照してください。
マネージャー¶
マネージャーは異なるプロセス間で共有されるデータの作成方法を提供します。これには別のマシン上で走るプロセス間のネットワーク越しの共有も含まれます。マネージャーオブジェクトは 共有オブジェクト を管理するサーバープロセスを制御します。他のプロセスはプロキシ経由で共有オブジェクトへアクセスすることができます。
- 
multiprocessing.Manager()¶
- プロセス間でオブジェクトを共有するために使用される - SyncManagerオブジェクトを返します。返されたマネージャーオブジェクトは生成される子プロセスに対応付けられ、共有オブジェクトを作成するメソッドや、共有オブジェクトに対応するプロキシを返すメソッドを持ちます。
マネージャープロセスは親プロセスが終了するか、ガベージコレクトされると停止します。マネージャークラスは multiprocessing.managers モジュールで定義されています:
- 
class multiprocessing.managers.BaseManager([address[, authkey]])¶
- BaseManager オブジェクトを作成します。 - 作成後、 - start()または- get_server().serve_forever()を呼び出して、マネージャーオブジェクトが、開始されたマネージャープロセスを確実に参照するようにしてください。- address はマネージャープロセスが新たなコネクションを待ち受けるアドレスです。address が - Noneの場合、任意のアドレスが設定されます。- authkey はサーバープロセスへ接続しようとするコネクションの正当性を検証するために 使用される認証キーです。authkey が - Noneの場合- current_process().authkeyが使用されます。authkey を使用する場合はバイト文字列でなければなりません。- 
start([initializer[, initargs]])¶
- マネージャーを開始するためにサブプロセスを開始します。initializer が - Noneでなければ、サブプロセスは開始時に- initializer(*initargs)を呼び出します。
 - 
get_server()¶
- マネージャーの制御下にある実際のサーバーを表す - Serverオブジェクトを返します。- Serverオブジェクトは- serve_forever()メソッドをサポートします:- >>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever() - Serverはさらに- address属性も持っています。
 - 
connect()¶
- ローカルからリモートのマネージャーオブジェクトへ接続します: - >>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect() 
 - 
register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶
- マネージャークラスで呼び出し可能オブジェクト(callable)や型を登録するために使用されるクラスメソッドです。 - typeid は特に共有オブジェクトの型を識別するために使用される "型識別子" です。これは文字列でなければなりません。 - callable はこの型識別子のオブジェクトを作成するために使用される呼び出し可能オブジェクトです。マネージャーインスタンスが - connect()メソッドを使ってサーバーに接続されているか、 create_method 引数が- Falseの場合は、- Noneでも構いません。- proxytype はこの typeid で共有オブジェクトのプロキシを作成するために使用される - BaseProxyのサブクラスです。- Noneの場合、プロキシクラスは自動的に作成されます。- exposed は - BaseProxy._callmethod()を使用したアクセスが許されるべき typeid をプロキシするメソッド名のシーケンスを指定するために使用されます (exposed が- Noneの場合- proxytype._exposed_が存在すればそれが代わりに使用されます)。exposed リストが指定されない場合、共有オブジェクトのすべての "パブリックメソッド" がアクセス可能になります。 (ここでいう "パブリックメソッド" とは- __call__()メソッドを持つものと名前が- '_'で始まらないあらゆる属性を意味します。)- method_to_typeid はプロキシが返す exposed メソッドの返り値の型を指定するために使用されるマッピングで、メソッド名を typeid 文字列にマップします。 (method_to_typeid が - Noneの場合- proxytype._method_to_typeid_が存在すれば、それが代わりに使用されます。) メソッド名がこのマッピングのキーではないか、マッピングが- Noneの場合、そのメソッドによって返されるオブジェクトが値として (by value) コピーされます。- create_method は、共有オブジェクトを作成し、それに対するプロキシを返すようサーバープロセスに伝える、名前 typeid のメソッドを作成するかを決定します。デフォルトでは - Trueです。
 - BaseManagerインスタンスも読み出し専用属性を1つ持っています:- 
address¶
- マネージャーが使用するアドレスです。 
 - バージョン 3.3 で変更: マネージャーオブジェクトはコンテキストマネージメント・プロトコルをサポートします -- コンテキストマネージャ型 を参照してください。 - __enter__()は (まだ開始していない場合) サーバープロセスを開始してから、マネージャーオブジェクトを返します。- __exit__()は- shutdown()を呼び出します。- 旧バージョンでは、 - __enter__()はマネージャーのサーバープロセスがまだ開始していなかった場合でもプロセスを開始しませんでした。
- 
- 
class multiprocessing.managers.SyncManager¶
- プロセス間の同期のために使用される - BaseManagerのサブクラスです。- multiprocessing.Manager()はこの型のオブジェクトを返します。- Its methods create and return Proxy オブジェクト for a number of commonly used data types to be synchronized across processes. This notably includes shared lists and dictionaries. - 
Barrier(parties[, action[, timeout]])¶
- 共有 - threading.Barrierオブジェクトを作成して、そのプロキシを返します。- バージョン 3.3 で追加. 
 - 
BoundedSemaphore([value])¶
- 共有 - threading.BoundedSemaphoreオブジェクトを作成して、そのプロキシを返します。
 - 
Condition([lock])¶
- 共有 - threading.Conditionオブジェクトを作成して、そのプロキシを返します。- lock が提供される場合 - threading.Lockか- threading.RLockオブジェクトのためのプロキシになります。- バージョン 3.3 で変更: - wait_for()メソッドが追加されました。
 - 
Event()¶
- 共有 - threading.Eventオブジェクトを作成して、そのプロキシを返します。
 - 
Lock()¶
- 共有 - threading.Lockオブジェクトを作成して、そのプロキシを返します。
 - 
Queue([maxsize])¶
- 共有 - queue.Queueオブジェクトを作成して、そのプロキシを返します。
 - 
RLock()¶
- 共有 - threading.RLockオブジェクトを作成して、そのプロキシを返します。
 - 
Semaphore([value])¶
- 共有 - threading.Semaphoreオブジェクトを作成して、そのプロキシを返します。
 - 
Array(typecode, sequence)¶
- 配列を作成して、そのプロキシを返します。 
 - 
Value(typecode, value)¶
- 書き込み可能な - value属性を作成して、そのプロキシを返します。
 - バージョン 3.6 で変更: 共有オブジェクトは入れ子もできます。 例えば、共有リストのような共有コンテナオブジェクトは、 - SyncManagerがまとめて管理し同期を取っている他の共有オブジェクトを保持できます。
- 
- 
class multiprocessing.managers.Namespace¶
- SyncManagerに登録することのできる型です。- Namespace オブジェクトにはパブリックなメソッドはありませんが、書き込み可能な属性を持ちます。そのオブジェクト表現はその属性の値を表示します。 - しかし、Namespace オブジェクトのためにプロキシを使用するとき - '_'が先頭に付く属性はプロキシの属性になり、参照対象の属性にはなりません:- >>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello') 
カスタマイズされたマネージャー¶
独自のマネージャーを作成するには、BaseManager のサブクラスを作成して、 マネージャークラスで呼び出し可能なオブジェクトか新たな型を登録するために register() クラスメソッドを使用します。例えば:
from multiprocessing.managers import BaseManager
class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y
class MyManager(BaseManager):
    pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56
リモートマネージャーを使用する¶
あるマシン上でマネージャーサーバーを実行して、他のマシンからそのサーバーを使用するクライアントを持つことができます(ファイアウォールを通過できることが前提)。
次のコマンドを実行することでリモートクライアントからアクセスを受け付ける1つの共有キューのためにサーバーを作成します:
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
あるクライアントからサーバーへのアクセスは次のようになります:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
別のクライアントもそれを使用することができます:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
ローカルプロセスもそのキューへアクセスすることができます。クライアント上で上述のコードを使用してアクセスします:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super().__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Proxy オブジェクト¶
プロキシは別のプロセスで(おそらく)有効な共有オブジェクトを 参照する オブジェクトです。共有オブジェクトはプロキシの 参照対象 になるということができます。複数のプロキシオブジェクトが同じ参照対象を持つ可能性もあります。
プロキシオブジェクトはその参照対象の対応するメソッドを呼び出すメソッドを持ちます (そうは言っても、参照対象のすべてのメソッドが必ずしもプロキシ経由で利用可能なわけではありません)。 この方法で、プロキシオブジェクトはまるでその参照先と同じように使えます:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
プロキシに str() を適用すると参照対象のオブジェクト表現を返すのに対して、 repr() を適用するとプロキシのオブジェクト表現を返すことに注意してください。
プロキシオブジェクトの重要な機能は pickle 化ができることで、これによりプロセス間での受け渡しができます。 そのため、参照対象が Proxy オブジェクト を持てます。 これによって管理されたリスト、辞書、その他 Proxy オブジェクト をネストできます:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
同様に、辞書とリストのプロキシも他のプロキシの内部に入れてネストできます:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
(プロキシでない) 標準の list オブジェクトや dict オブジェクトが参照対象に含まれていた場合、それらの可変な値の変更はマネージャーからは伝搬されません。
というのも、プロキシには参照対象の中に含まれる値がいつ変更されたかを知る術が無いのです。
しかし、コンテナプロキシに値を保存する (これはプロキシオブジェクトの __setitem__ を起動します) 場合はマネージャーを通して変更が伝搬され、その要素を実際に変更するために、コンテナプロキシに変更後の値が再代入されます:
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
This approach is perhaps less convenient than employing nested Proxy オブジェクト for most use cases but also demonstrates a level of control over the synchronization.
注釈
multiprocessing のプロキシ型は値による比較に対して何もサポートしません。そのため、例えば以下のようになります:
>>> manager.list([1,2,3]) == [1,2,3]
False
比較を行いたいときは参照対象のコピーを使用してください。
- 
class multiprocessing.managers.BaseProxy¶
- プロキシオブジェクトは - BaseProxyのサブクラスのインスタンスです。- 
_callmethod(methodname[, args[, kwds]])¶
- プロキシの参照対象のメソッドの実行結果を返します。 - proxyがプロキシで、プロキシ内の参照対象が- objならこの式- proxy._callmethod(methodname, args, kwds) - はこの式を評価します - getattr(obj, methodname)(*args, **kwds) - (マネージャープロセス内の)。 - 返される値はその呼び出し結果のコピーか、新たな共有オブジェクトに対するプロキシになります。詳細は - BaseManager.register()の method_to_typeid 引数のドキュメントを参照してください。- その呼び出しによって例外が発生した場合、 - _callmethod()によってその例外は再送出されます。他の例外がマネージャープロセスで発生したなら、- RemoteError例外に変換されたものが- _callmethod()によって送出されます。- 特に methodname が 公開 されていない場合は例外が発生することに注意してください。 - _callmethod()の使用例になります:- >>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range 
 - 
_getvalue()¶
- 参照対象のコピーを返します。 - 参照対象が unpickle 化できるなら例外を発生します。 
 - 
__repr__()¶
- プロキシオブジェクトのオブジェクト表現を返します。 
 - 
__str__()¶
- 参照対象のオブジェクト表現を返します。 
 
- 
クリーンアップ¶
プロキシオブジェクトは弱参照(weakref)コールバックを使用します。プロキシオブジェクトがガベージコレクトされるときにその参照対象が所有するマネージャーからその登録を取り消せるようにするためです。
共有オブジェクトはプロキシが参照しなくなったときにマネージャープロセスから削除されます。
プロセスプール¶
Pool クラスでタスクを実行するプロセスのプールを作成することができます。
- 
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶
- プロセスプールオブジェクトは、ジョブを送り込めるワーカープロセスのプールを制御します。タイムアウトやコールバックのある非同期の実行をサポートし、並列 map 実装を持ちます。 - processes は使用するワーカープロセスの数です。processes が - Noneの場合- os.cpu_count()が返す値を使用します。- initializer が - Noneではない場合、各ワーカープロセスは開始時に- initializer(*initargs)を呼び出します。- maxtasksperchild は、ワーカープロセスが exit して新たなワーカープロセスと置き替えられるまでの間に、ワーカープロセスが完了することのできるタスクの数です。この設定により未利用のリソースが解放されるようなります。デフォルトの maxtasksperchild は - Noneで、これはワーカープロセスがプールと同じ期間だけ生き続けるということを意味します。- context はワーカープロセスを開始するために使用されるコンテキストの指定に使用できます。通常プールは関数 - multiprocessing.Pool()かコンテキストオブジェクトの- Pool()メソッドを使用して作成されます。どちらの場合でも context は適切に設定されます。- プールオブジェクトのメソッドは、そのプールを作成したプロセスのみが呼び出すべきです。 - 警告 - multiprocessing.poolobjects have internal resources that need to be properly managed (like any other resource) by using the pool as a context manager or by calling- close()and- terminate()manually. Failure to do this can lead to the process hanging on finalization.- Note that is not correct to rely on the garbage colletor to destroy the pool as CPython does not assure that the finalizer of the pool will be called (see - object.__del__()for more information).- バージョン 3.2 で追加: maxtasksperchild - バージョン 3.4 で追加: context - 注釈 - Pool中のワーカープロセスは、典型的にはプールのワークキューの存続期間とちょうど同じだけ生き続けます。ワーカーに確保されたリソースを解放するために (Apache, mod_wsgi, などのような) 他のシステムによく見られるパターンは、プール内のワーカーが設定された量だけの仕事を完了したら exit とクリーンアップを行い、古いプロセスを置き換えるために新しいプロセスを生成するというものです。- Poolの maxtasksperchild 引数は、この能力をエンドユーザーに提供します。- 
apply(func[, args[, kwds]])¶
- 引数 args とキーワード引数 kwds を伴って func を呼びます。結果が準備できるまでブロックします。このブロックがあるため、 - apply_async()の方が並行作業により適しています。加えて、 func は、プール内の1つのワーカーだけで実行されます。
 - 
apply_async(func[, args[, kwds[, callback[, error_callback]]]])¶
- apply()メソッドの派生版で- AsyncResultオブジェクトを返します。- callback が指定された場合、それは単一の引数を受け取る呼び出し可能オブジェクトでなければなりません。結果を返せるようになったときに callback が結果オブジェクトに対して適用されます。ただし呼び出しが失敗した場合は、代わりに error_callback が適用されます。 - error_callback が指定された場合、それは単一の引数を受け取る呼び出し可能オブジェクトでなければなりません。対象の関数が失敗した場合、例外インスタンスを伴って error_callback が呼ばれます。 - コールバックは直ちに完了すべきです。なぜなら、そうしなければ、結果を扱うスレッドがブロックするからです。 
 - 
map(func, iterable[, chunksize])¶
- map()組み込み関数の並列版です (iterable な引数を1つだけサポートするという違いはあります。もしも複数のイテラブルを使いたいのならば:meth:`starmap`を参照)。結果が出るまでブロックします。- このメソッドはイテラブルをいくつものチャンクに分割し、プロセスプールにそれぞれ独立したタスクとして送ります。(概算の) チャンクサイズは chunksize を正の整数に設定することで指定できます。 - Note that it may cause high memory usage for very long iterables. Consider using - imap()or- imap_unordered()with explicit chunksize option for better efficiency.
 - 
map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
- map()メソッドの派生版で- AsyncResultオブジェクトを返します。- callback が指定された場合、それは単一の引数を受け取る呼び出し可能オブジェクトでなければなりません。結果を返せるようになったときに callback が結果オブジェクトに対して適用されます。ただし呼び出しが失敗した場合は、代わりに error_callback が適用されます。 - error_callback が指定された場合、それは単一の引数を受け取る呼び出し可能オブジェクトでなければなりません。対象の関数が失敗した場合、例外インスタンスを伴って error_callback が呼ばれます。 - コールバックは直ちに完了すべきです。なぜなら、そうしなければ、結果を扱うスレッドがブロックするからです。 
 - 
imap(func, iterable[, chunksize])¶
- map()の遅延評価版です。- chunksize 引数は - map()メソッドで使用されるものと同じです。 引数 iterable がとても長いなら chunksize に大きな値を指定して使用する方がデフォルト値の- 1を使用するよりもジョブの完了が かなり 速くなります。- また chunksize が - 1の場合- imap()メソッドが返すイテレーターの- next()メソッドはオプションで timeout パラメーターを持ちます。- next(timeout)は、その結果が timeout 秒以内に返されないときに- multiprocessing.TimeoutErrorを発生させます。
 - 
imap_unordered(func, iterable[, chunksize])¶
- イテレーターが返す結果の順番が任意の順番で良いと見なされることを除けば - imap()と同じです。 (ワーカープロセスが1つしかない場合のみ "正しい" 順番になることが保証されます。)
 - 
starmap(func, iterable[, chunksize])¶
- Like - map()except that the elements of the iterable are expected to be iterables that are unpacked as arguments.- そのため、iterable が - [(1,2), (3, 4)]なら、結果は- [func(1,2), func(3,4)]になります。- バージョン 3.3 で追加. 
 - 
starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
- starmap()と- map_async()の組み合わせです。 イテレート可能オブジェクトの iterable をイテレートして、 unpack したイテレート可能オブジェクトを伴って func を呼び出します。結果オブジェクトを返します。- バージョン 3.3 で追加. 
 - 
close()¶
- これ以上プールでタスクが実行されないようにします。すべてのタスクが完了した後でワーカープロセスが終了します。 
 - 
terminate()¶
- 実行中の処理を完了させずにワーカープロセスをすぐに停止します。プールオブジェクトがガベージコレクトされるときに - terminate()が呼び出されます。
 - 
join()¶
- ワーカープロセスが終了するのを待ちます。 - join()を使用する前に- close()か- terminate()を呼び出さなければなりません。
 - バージョン 3.3 で追加: Pool オブジェクトがコンテキストマネージメント・プロトコルをサポートするようになりました。 -- コンテキストマネージャ型 を参照してください。 - __enter__()は Pool オブジェクトを返します。また- __exit__()は- terminate()を呼び出します。
- 
- 
class multiprocessing.pool.AsyncResult¶
- Pool.apply_async()や- Pool.map_async()で返される結果のクラスです。- 
get([timeout])¶
- 結果を受け取ったときに返します。 timeout が - Noneではなくて、その結果が timeout 秒以内に受け取れない場合- multiprocessing.TimeoutErrorが発生します。リモートの呼び出しが例外を発生させる場合、その例外は- get()が再発生させます。
 - 
wait([timeout])¶
- その結果が有効になるか timeout 秒経つまで待ちます。 
 - 
ready()¶
- その呼び出しが完了しているかどうかを返します。 
 - 
successful()¶
- その呼び出しが例外を発生させることなく完了したかどうかを返します。その結果が返せる状態でない場合 - ValueErrorが発生します。- バージョン 3.7 で変更: If the result is not ready, - ValueErroris raised instead of- AssertionError.
 
- 
次の例はプールの使用例を紹介します:
from multiprocessing import Pool
import time
def f(x):
    return x*x
if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow
        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"
        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow
        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError
リスナーとクライアント¶
通常、プロセス間でメッセージを渡すにはキューを使用するか Pipe() が返す Connection オブジェクトを使用します。
しかし multiprocessing.connection モジュールにはさらに柔軟な仕組みがあります。 このモジュールは、基本的にはソケットもしくは Windows の名前付きパイプを扱う高レベルのメッセージ指向 API を提供します。また、 hmac モジュールを使用した ダイジェスト認証 や同時の複数接続のポーリングもサポートします。
- 
multiprocessing.connection.deliver_challenge(connection, authkey)¶
- ランダム生成したメッセージをコネクションの相手側へ送信して応答を待ちます。 - その応答がキーとして authkey を使用するメッセージのダイジェストと一致する場合、 コネクションの相手側へ歓迎メッセージを送信します。 そうでなければ - AuthenticationErrorを発生させます。
- 
multiprocessing.connection.answer_challenge(connection, authkey)¶
- メッセージを受信して、そのキーとして authkey を使用するメッセージのダイジェストを計算し、ダイジェストを送り返します。 - 歓迎メッセージを受け取れない場合 - AuthenticationErrorが発生します。
- 
multiprocessing.connection.Client(address[, family[, authkey]])¶
- address で渡したアドレスを使用するリスナーに対してコネクションを確立しようとして - Connectionを返します。- コネクション種別は family 引数で決定しますが、一般的には address のフォーマットから推測できるので、これは指定されません。 (アドレスフォーマット を参照してください) - If authkey is given and not None, it should be a byte string and will be used as the secret key for an HMAC-based authentication challenge. No authentication is done if authkey is None. - AuthenticationErroris raised if authentication fails. See 認証キー.
- 
class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])¶
- コネクションを '待ち受ける' 束縛されたソケットか Windows の名前付きパイプのラッパーです。 - address はリスナーオブジェクトの束縛されたソケットか名前付きパイプが使用するアドレスです。 - 注釈 - '0.0.0.0' のアドレスを使用する場合、Windows 上の終点へ接続することができません。終点へ接続したい場合は '127.0.0.1' を使用すべきです。 - family は使用するソケット(名前付きパイプ)の種別です。これは - 'AF_INET'(TCP ソケット),- 'AF_UNIX'(Unix ドメインソケット) または- 'AF_PIPE'(Windows 名前付きパイプ) という文字列のどれか1つになります。これらのうち- 'AF_INET'のみが利用可能であることが保証されています。 family が- Noneの場合 address のフォーマットから推測されたものが使用されます。 address も- Noneの場合はデフォルトが選択されます。詳細は アドレスフォーマット を参照してください。 family が- 'AF_UNIX'で address が- Noneの場合- tempfile.mkstemp()を使用して作成されたプライベートな一時ディレクトリにソケットが作成されます。- リスナーオブジェクトがソケットを使用する場合、ソケットに束縛されるときに backlog (デフォルトでは1つ) がソケットの - listen()メソッドに対して渡されます。- If authkey is given and not None, it should be a byte string and will be used as the secret key for an HMAC-based authentication challenge. No authentication is done if authkey is None. - AuthenticationErroris raised if authentication fails. See 認証キー.- 
accept()¶
- リスナーオブジェクトの名前付きパイプか束縛されたソケット上でコネクションを 受け付けて - Connectionオブジェクトを返します。 認証が失敗した場合- AuthenticationErrorが発生します。
 - 
close()¶
- リスナーオブジェクトの名前付きパイプか束縛されたソケットをクローズします。これはリスナーがガベージコレクトされるときに自動的に呼ばれます。そうは言っても、明示的に close() を呼び出す方が望ましいです。 
 - リスナーオブジェクトは次の読み出し専用属性を持っています: - 
address¶
- リスナーオブジェクトが使用中のアドレスです。 
 - 
last_accepted¶
- 最後にコネクションを受け付けたアドレスです。有効なアドレスがない場合は - Noneになります。
 - バージョン 3.3 で追加: Listener オブジェクトがコンテキストマネージメント・プロトコルをサポートするようになりました。 -- コンテキストマネージャ型 を参照してください。 - __enter__()はリスナーオブジェクトを返します。また- __exit__()は- close()を呼び出します。
- 
- 
multiprocessing.connection.wait(object_list, timeout=None)¶
- object_list 中のオブジェクトが準備ができるまで待機します。準備ができた object_list 中のオブジェクトのリストを返します。timeout が浮動小数点なら、最大でその秒数だけ呼び出しがブロックします。timeout が - Noneの場合、無制限の期間ブロックします。負のタイムアウトは0と等価です。- Unix と Windows の両方で、 object_list には以下のオブジェクトを含めることが出来ます - 読み取り可能な - Connectionオブジェクト;
- 接続された読み取り可能な - socket.socketオブジェクト; または
 - 読み取ることのできるデータがある場合、あるいは相手側の端が閉じられている場合、コネクションまたはソケットオブジェクトは準備ができています。 - Unix: - wait(object_list, timeout)は- select.select(object_list, [], [], timeout)とほとんど等価です。違いは、- select.select()がシグナルによって中断される場合、- EINTRのエラー番号付きで- OSErrorを上げるということです。- wait()はそのようなことは行いません。- Windows: object_list の要素は、 (Win32 関数 - WaitForMultipleObjects()のドキュメントで使われている定義から) wait 可能な整数ハンドルか、ソケットハンドルまたはパイプハンドルを返す- fileno()メソッドを持つオブジェクトのどちらかでなければなりません。 (パイプハンドルとソケットハンドラーは wait 可能なハンドルでは ない ことに注意してください。)- バージョン 3.3 で追加. 
例
次のサーバーコードは認証キーとして 'secret password' を使用するリスナーを作成します。このサーバーはコネクションを待ってクライアントへデータを送信します:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000)     # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)
        conn.send([2.25, None, 'junk', float])
        conn.send_bytes(b'hello')
        conn.send_bytes(array('i', [42, 1729]))
次のコードはサーバーへ接続して、サーバーからデータを受信します:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]
    print(conn.recv_bytes())            # => 'hello'
    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])
次のコードは wait() を使って複数のプロセスからのメッセージを同時に待ちます:
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()
if __name__ == '__main__':
    readers = []
    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()
    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)
アドレスフォーマット¶
- 'AF_INET'アドレスは- (hostname, port)のタプルになります。 hostname は文字列で port は整数です。
- 'AF_UNIX'アドレスはファイルシステム上のファイル名の文字列です。
- An - 'AF_PIPE'address is a string of the form- r'\.\pipe{PipeName}'. To use- Client()to connect to a named pipe on a remote computer called ServerName one should use an address of the form- r'\ServerName\pipe{PipeName}'instead.
デフォルトでは、2つのバックスラッシュで始まる文字列は 'AF_UNIX' よりも 'AF_PIPE' として推測されることに注意してください。
認証キー¶
Connection.recv を使用するとき、データは自動的に unpickle されて受信します。 信頼できない接続元からのデータを unpickle することはセキュリティリスクがあります。 そのため Listener や Client() はダイジェスト認証を提供するために hmac モジュールを使用します。
認証キーはパスワードとして見なされるバイト文字列です。コネクションが確立すると、双方の終点で正しい接続先であることを証明するために 知っているお互いの認証キーを要求します。(双方の終点が同じキーを使用して通信しようとしても、コネクション上でそのキーを送信することは できません。)
認証が要求されているにもかかわらず認証キーが指定されていない場合 current_process().authkey の返す値が使用されます。 (詳細は Process を参照してください。) この値はカレントプロセスを作成する Process オブジェクトによって自動的に継承されます。 これは(デフォルトでは)複数プロセスのプログラムの全プロセスが相互にコネクションを 確立するときに使用される1つの認証キーを共有することを意味します。
適当な認証キーを os.urandom() を使用して生成することもできます。
ログ記録¶
ロギングのためにいくつかの機能が利用可能です。しかし logging パッケージは、 (ハンドラー種別に依存して)違うプロセスからのメッセージがごちゃ混ぜになるので、プロセスの共有ロックを使用しないことに注意してください。
- 
multiprocessing.get_logger()¶
- multiprocessingが使用するロガーを返します。必要に応じて新たなロガーを作成します。- 最初に作成するとき、ロガーはレベルに - logging.NOTSETが設定されていてデフォルトハンドラーがありません。このロガーへ送られるメッセージはデフォルトではルートロガーへ伝播されません。- Windows 上では子プロセスが親プロセスのロガーレベルを継承しないことに注意してください。さらにその他のロガーのカスタマイズ内容もすべて継承されません。 
- 
multiprocessing.log_to_stderr(level=None)¶
- This function performs a call to - get_logger()but in addition to returning the logger created by get_logger, it adds a handler which sends output to- sys.stderrusing format- '[%(levelname)s/%(processName)s] %(message)s'. You can modify- levelnameof the logger by passing a- levelargument.
以下にロギングを有効にした例を紹介します:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
完全なロギングレベルの表については logging モジュールを参照してください。
multiprocessing.dummy モジュール¶
multiprocessing.dummy は multiprocessing の API を複製しますが threading モジュールのラッパーでしかありません。
In particular, the Pool function provided by multiprocessing.dummy
returns an instance of ThreadPool, which is a subclass of
Pool that supports all the same method calls but uses a pool of
worker threads rather than worker processes.
- 
class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])¶
- A thread pool object which controls a pool of worker threads to which jobs can be submitted. - ThreadPoolinstances are fully interface compatible with- Poolinstances, and their resources must also be properly managed, either by using the pool as a context manager or by calling- close()and- terminate()manually.- processes は使用するワーカースレッドの数です。processes が - Noneの場合- os.cpu_count()が返す値を使用します。- initializer が - Noneではない場合、各ワーカープロセスは開始時に- initializer(*initargs)を呼び出します。- Unlike - Pool, maxtasksperchild and context cannot be provided.- 注釈 - A - ThreadPoolshares the same interface as- Pool, which is designed around a pool of processes and predates the introduction of the- concurrent.futuresmodule. As such, it inherits some operations that don't make sense for a pool backed by threads, and it has its own type for representing the status of asynchronous jobs,- AsyncResult, that is not understood by any other libraries.- Users should generally prefer to use - concurrent.futures.ThreadPoolExecutor, which has a simpler interface that was designed around threads from the start, and which returns- concurrent.futures.Futureinstances that are compatible with many other libraries, including- asyncio.
プログラミングガイドライン¶
multiprocessing を使用するときに守るべき一定のガイドラインとイディオムを挙げます。
すべての開始方式について¶
以下はすべての開始方式に当てはまります。
共有状態を避ける
できるだけプロセス間で巨大なデータを移動することは避けるようにすべきです。
プロセス間の通信には、
threadingモジュールの低レベルな同期プリミティブを使うのではなく、キューやパイプを使うのが良いでしょう。
pickle 化の可能性
プロキシのメソッドへの引数は、 pickle 化できるものにしてください。
プロキシのスレッドセーフ性
1 つのプロキシオブジェクトは、ロックで保護しないかぎり、2 つ以上のスレッドから使用してはいけません。
(異なるプロセスで 同じ プロキシを使用することは問題ではありません。)
ゾンビプロセスを join する
Unix 上ではプロセスが終了したときに join しないと、そのプロセスはゾンビになります。新たなプロセスが開始する (または
active_children()が呼ばれる) ときに、join されていないすべての完了プロセスが join されるので、あまり多くにはならないでしょう。また、終了したプロセスのProcess.is_aliveはそのプロセスを join します。そうは言っても、自分で開始したすべてのプロセスを明示的に join することはおそらく良いプラクティスです。
pickle/unpickle より継承する方が良い
開始方式に spawn あるいは forkserver を使用している場合、
multiprocessingから多くの型を pickle 化する必要があるため子プロセスはそれらを使うことができます。しかし、一般にパイプやキューを使用して共有オブジェクトを他のプロセスに送信することは避けるべきです。代わりに、共有リソースにアクセスする必要のあるプロセスは上位プロセスからそれらを継承するようにすべきです。
プロセスの強制終了を避ける
あるプロセスを停止するために
Process.terminateメソッドを使用すると、そのプロセスが現在使用されている (ロック、セマフォ、パイプやキューのような) 共有リソースを破壊したり他のプロセスから利用できない状態を引き起こし易いです。そのため、共有リソースを使用しないプロセスでのみ
Process.terminateを使用することを考慮することがおそらく最善の方法です。
キューを使用するプロセスを join する
キューに要素を追加するプロセスは、すべてのバッファーされた要素が "feeder" スレッドによって下位層のパイプに対してフィードされるまで終了を待つということを覚えておいてください。 (子プロセスはこの動作を避けるためにキューの
Queue.cancel_join_threadメソッドを呼ぶことができます。)これはキューを使用するときに、キューに追加されたすべての要素が最終的にそのプロセスが join される前に削除されていることを確認する必要があることを意味します。そうしないと、そのキューに要素が追加したプロセスの終了を保証できません。デーモンではないプロセスは自動的に join されることも覚えておいてください。
次の例はデッドロックを引き起こします:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()修正するには最後の2行を入れ替えます(または単純に
p.join()の行を削除します)。
明示的に子プロセスへリソースを渡す
Unix で開始方式に fork を使用している場合、子プロセスはグローバルリソースを使用した親プロセス内で作成された共有リソースを使用できます。しかし、オブジェクトを子プロセスのコンストラクターに引数として渡すべきです。
Windows や他の開始方式と (将来的にでも) 互換性のあるコードを書く場合は別として、これは子プロセスが実行中である限りは親プロセス内でオブジェクトがガベージコレクトされないことも保証します。これは親プロセス内でオブジェクトがガベージコレクトされたときに一部のリソースが開放されてしまう場合に重要かもしれません。
そのため、例えば
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()は、次のように書き直すべきです
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
sys.stdin を file-like オブジェクトに置き換えることに注意する
multiprocessingは元々無条件に:os.close(sys.stdin.fileno())を
multiprocessing.Process._bootstrap()メソッドの中で呼び出していました --- これはプロセス内プロセス (processes-in-processes) で問題が起こしてしまいます。そこで、これは以下のように変更されました:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)これによってプロセス同士が衝突して bad file descripter エラーを起こすという根本的な問題は解決しましたが、アプリケーションの出力バッファーを
sys.stdin()から "file-like オブジェクト" に置き換えるという潜在的危険を持ち込んでしまいました。危険というのは、複数のプロセスが file-like オブジェクトのclose()を呼び出すと、オブジェクトに同じデータが何度もフラッシュされ、破損してしまう可能性がある、というものです。もし file-like オブジェクトを書いて独自のキャッシュを実装するなら、キャッシュするときに常に pid を記録しておき、pid が変わったらキュッシュを捨てることで、フォークセーフにできます。例:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache
開始方式が spawn および forkserver の場合¶
開始方式に fork を適用しない場合にいくつかの追加の制限事項があります。
さらなる pickle 化の可能性
Process.__init__()へのすべての引数は pickle 化できることを確認してください。またProcessをサブクラス化する場合、そのインスタンスがProcess.startメソッドが呼ばれたときに pickle 化できるようにしてください。
グローバル変数
子プロセスで実行されるコードがグローバル変数にアクセスしようとする場合、子プロセスが見るその値は
Process.startが呼ばれたときの親プロセスの値と同じではない可能性があります。しかし、単にモジュールレベルの定数であるグローバル変数なら問題にはなりません。
メインモジュールの安全なインポート
新たな Python インタプリタによるメインモジュールのインポートが、意図しない副作用 (新たなプロセスを開始する等) を起こさずできるようにしてください。
例えば、開始方式に spawn あるいは forkserver を使用した場合に以下のモジュールを実行すると
RuntimeErrorで失敗します:from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()代わりに、次のように
if __name__ == '__main__':を使用してプログラムの "エントリポイント" を保護すべきです:from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(プログラムをフリーズせずに通常通り実行するなら
freeze_support()行は取り除けます。)これは新たに生成された Python インタープリターがそのモジュールを安全にインポートして、モジュールの
foo()関数を実行します。プールまたはマネージャーがメインモジュールで作成される場合に似たような制限が適用されます。
使用例¶
カスタマイズされたマネージャーやプロキシの作成方法と使用方法を紹介します:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')
# A simple generator function
def baz():
    for i in range(10):
        yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
    return operator
##
class MyManager(BaseManager):
    pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
    manager = MyManager()
    manager.start()
    print('-' * 20)
    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])
    print('-' * 20)
    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])
    print('-' * 20)
    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()
    print('-' * 20)
    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
    freeze_support()
    test()
Pool を使用する例です:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )
def calculatestar(args):
    return calculate(*args)
def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b
def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b
def f(x):
    return 1.0 / (x - 5.0)
def pow3(x):
    return x ** 3
def noop(x):
    pass
#
# Test code
#
def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)
    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #
        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]
        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()
        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()
        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()
        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()
        #
        # Test error handling
        #
        print('Testing error handling:')
        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')
        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')
        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')
        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')
        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()
        #
        # Testing timeouts
        #
        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()
        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()
if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()
ワーカープロセスのコレクションに対してタスクをフィードしてその結果をまとめるキューの使い方の例を紹介します:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b
def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b
#
#
#
def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]
    # Create queues
    task_queue = Queue()
    done_queue = Queue()
    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)
    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()
    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())
    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)
    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())
    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')
if __name__ == '__main__':
    freeze_support()
    test()