16.6. multiprocessing --- プロセスベースの "並列処理" インタフェース¶
バージョン 2.6 で追加.
16.6.1. はじめに¶
multiprocessing は、 threading と似た API で複数のプロセスを生成をサポートするパッケージです。 multiprocessing パッケージは、ローカルとリモート両方の並行処理を提供します。また、このパッケージはスレッドの代わりにサブプロセスを使用することにより、グローバルインタープリタロック の問題を避ける工夫が行われています。このような特徴があるため multiprocessing モジュールを使うことで、マルチプロセッサーマシンの性能を最大限に活用することができるでしょう。なお、このモジュールは Unix と Windows で動作します。
multiprocessing モジュールでは threading モジュールには似たものがない API も導入しています。その最たるものが Pool オブジェクトです。これは複数の入力データに対して、サブプロセス群に入力データを分配して並列に関数実行する (データ並列) のに便利な手段を提供します。以下の例では、モジュール内で関数を定義するのに、子プロセスがそのモジュールをつつがなくインポート出来るようにする一般的な慣例を使っています。 Pool を用いたデータ並列の基礎的な実例はこのようなものです:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [1, 2, 3]))
標準出力に以下が出力されます:
[1, 4, 9]
16.6.1.1. Process クラス¶
multiprocessing モジュールでは、プロセスは以下の手順によって生成されます。はじめに Process のオブジェクトを作成し、続いて start() メソッドを呼び出します。この Process クラスは threading.Thread クラスと同様の API を持っています。まずは、簡単な例をもとにマルチプロセスを使用したプログラムについてみていきましょう
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
実行された個々のプロセス ID を表示するために拡張したサンプルコードを以下に例を示します:
from multiprocessing import Process
import os
def info(title):
print title
print 'module name:', __name__
if hasattr(os, 'getppid'): # only available on Unix
print 'parent process:', os.getppid()
print 'process id:', os.getpid()
def f(name):
info('function f')
print 'hello', name
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
(Windows 環境で) if __name__ == '__main__' という記述が必要な理由については、 プログラミングガイドライン を参照してください。
16.6.1.2. プロセス間でのオブジェクト交換¶
multiprocessing モジュールでは、プロセス間通信の手段が2つ用意されています。それぞれ以下に詳細を示します:
キュー (Queue)
QueueクラスはQueue.Queueクラスとほとんど同じように使うことができます。以下に例を示します。from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print q.get() # prints "[42, None, 'hello']" p.join()キューはスレッドセーフであり、プロセスセーフです。
パイプ (Pipe)
Pipe()関数はパイプで繋がれたコネクションオブジェクトのペアを返します。デフォルトでは双方向性パイプを返します。以下に例を示します:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print parent_conn.recv() # prints "[42, None, 'hello']" p.join()パイプのそれぞれの端を表す2つのコネクションオブジェクトが
Pipe()関数から返されます。各コネクションオブジェクトには、send()、recv()、その他のメソッドがあります。2つのプロセス (またはスレッド) がパイプの 同じ 端で同時に読み込みや書き込みを行うと、パイプ内のデータが破損してしまうかもしれないことに注意してください。もちろん、各プロセスがパイプの別々の端を同時に使用するならば、データが破壊される危険性はありません。
16.6.1.3. プロセス間の同期¶
multiprocessing は threading モジュールと等価な同期プリミティブを備えています。以下の例では、ロックを使用して、一度に1つのプロセスしか標準出力に書き込まないようにしています:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
print 'hello world', i
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
ロックを使用しないで標準出力に書き込んだ場合は、各プロセスからの出力がごちゃまぜになってしまいます。
16.6.1.4. プロセス間での状態の共有¶
これまでの話の流れで触れたとおり、並行プログラミングを行うときには、できるかぎり状態を共有しないのが定石です。複数のプロセスを使用するときは特にそうでしょう。
しかし、どうしてもプロセス間のデータ共有が必要な場合のために multiprocessing モジュールには2つの方法が用意されています。
共有メモリ (Shared memory)
データを共有メモリ上に保持するために
Valueクラス、もしくはArrayクラスを使用することができます。以下のサンプルコードを使って、この機能についてみていきましょうfrom multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print num.value print arr[:]このサンプルコードを実行すると以下のように表示されます
3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
numとarrを生成するときに使用されている、引数'd'と'i'はarrayモジュールにより使用される種別の型コードです。ここで使用されている'd'は倍精度浮動小数、'i'は符号付整数を表します。これらの共有オブジェクトは、プロセスセーフでありスレッドセーフです。共有メモリを使用して、さらに柔軟なプログラミングを行うには
multiprocessing.sharedctypesモジュールを使用します。このモジュールは共有メモリから割り当てられた任意の ctypes オブジェクトの生成をサポートします。
サーバープロセス (Server process)
Manager()関数により生成されたマネージャーオブジェクトはサーバープロセスを管理します。マネージャーオブジェクトは Python のオブジェクトを保持して、他のプロセスがプロキシ経由でその Python オブジェクトを操作することができます。
Manager()関数が返すマネージャはlist、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Queue、Value、Arrayをサポートします。以下にサンプルコードを示します。from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': manager = Manager() d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print d print lこのサンプルコードを実行すると以下のように表示されます
{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]サーバープロセスのマネージャーオブジェクトは共有メモリのオブジェクトよりも柔軟であるといえます。それは、どのような型のオブジェクトでも使えるからです。また、1つのマネージャーオブジェクトはネットワーク経由で他のコンピューター上のプロセスによって共有することもできます。しかし、共有メモリより動作が遅いという欠点があります。
16.6.1.5. ワーカープロセスのプールを使用¶
Pool クラスは、ワーカープロセスをプールする機能を備えています。このクラスには、異なる方法でワーカープロセスへタスクを割り当てるいくつかのメソッドがあります。
例えば:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
# print "[0, 1, 4,..., 81]"
print pool.map(f, range(10))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print i
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print res.get(timeout=1) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print res.get(timeout=1) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print [res.get(timeout=1) for res in multiple_results]
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print res.get(timeout=1)
except TimeoutError:
print "We lacked patience and got a multiprocessing.TimeoutError"
プールオブジェクトのメソッドは、そのプールを作成したプロセスのみが呼び出すべきです。
注釈
このパッケージに含まれる機能を使用するためには、子プロセスから __main__ モジュールをインポートできる必要があります。このことについては プログラミングガイドライン で触れていますが、ここであらためて強調しておきます。なぜかというと、いくつかのサンプルコード、例えば multiprocessing.pool.Pool のサンプルはインタラクティブシェル上では動作しないからです。以下に例を示します:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(このサンプルを試すと、3つのトレースバックすべてがほぼランダムに交互に重なって表示されます。そうなったら、なんとかしてマスタープロセスを止めましょう。)
16.6.2. リファレンス¶
multiprocessing パッケージは threading モジュールの API とほとんど同じです。
16.6.2.1. Process クラスと例外¶
-
class
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})¶ Process オブジェクトは各プロセスの処理を表します。
Processクラスはthreading.Threadクラスのすべてのメソッドと同じインタフェースを提供します。コンストラクタは必ずキーワード引数で呼び出すべきです。引数 group には必ず
Noneを渡してください。この引数はthreading.Threadクラスとの互換性のためだけに残されています。引数 target には、呼び出し可能オブジェクト (Callable Object) を渡します。このオブジェクトはrun()メソッドから呼び出されます。この引数はデフォルトでNoneとなっており、何も呼び出されません。引数 name にはプロセス名を渡します。デフォルトでは、自動でユニークな名前が割り当てられます。命名規則は、 'Process-N1:N2:...:Nk' となります。ここで N1,N2,...,Nk は整数の数列で、 作成した プロセス数に対応します。引数 args は target で指定された呼び出し可能オブジェクトへの引数を渡します。同じく、引数 kwargs はキーワード引数を渡します。デフォルトでは、 target には引数が渡されないようになっています。サブクラスがコンストラクターをオーバーライドする場合は、そのプロセスに対する処理を行う前に基底クラスのコンストラクター (
Process.__init__()) を実行しなければなりません。-
run()¶ プロセスが実行する処理を表すメソッドです。
このメソッドはサブクラスでオーバーライドすることができます。標準の
run()メソッドは、コンストラクターの target 引数として渡された呼び出し可能オブジェクトを呼び出します。もしコンストラクターに args もしくは kwargs 引数が渡されていれば、呼び出すオブジェクトにこれらの引数を渡します。
-
start()¶ プロセスの処理を開始するためのメソッドです。
各 Process オブジェクトに対し、このメソッドが2回以上呼び出されてはいけません。各プロセスでオブジェクトの
run()メソッドを呼び出す準備を行います。
-
join([timeout])¶ join()されたプロセスが terminate を呼び出すまで、もしくはオプションで指定したタイムアウトが発生するまで呼び出し側のスレッドをブロックします。timeout が
Noneならタイムアウトは設定されません。1つのプロセスは何回も join されることができます。
プロセスは自分自身を join することはできません。それはデッドロックを引き起こすことがあるからです。プロセスが start される前に join しようとするとエラーが発生します。
-
name¶ プロセス名です。
この名前は文字列で、プロセスの識別にのみ使用されます。特別な命名規則はありません。複数のプロセスが同じ名前を持つ場合もあります。また、この名前はコンストラクタにより初期化されます。
-
daemon¶ デーモンプロセスであるかのフラグであり、ブール値です。この属性は
start()が呼び出される前に設定されている必要があります。初期値は作成するプロセスから継承します。
あるプロセスが終了するとき、そのプロセスはその子プロセスであるデーモンプロセスすべてを終了させようとします。
デーモンプロセスは子プロセスを作成できないことに注意してください。もし作成できてしまうと、そのデーモンプロセスの親プロセスが終了したときにデーモンプロセスの子プロセスが孤児になってしまう場合があるからです。さらに言えば、デーモンプロセスはUnix デーモンやサービスでは なく 通常のプロセスであり、非デーモンプロセスが終了すると終了されます (そして join されません)。
threading.Threadクラスの API に加えてProcessクラスのオブジェクトには以下の属性およびメソッドがあります:-
pid¶ プロセスIDを返します。プロセスの生成前は
Noneが設定されています。
-
exitcode¶ 子プロセスの終了コードです。子プロセスがまだ終了していない場合は
Noneが返されます。負の値 -N は子プロセスがシグナル N で終了したことを表します。
-
authkey¶ プロセスの認証キーです (バイト文字列です)。
multiprocessingモジュールがメインプロセスにより初期化される場合には、os.urandom()関数を使用してランダムな値が設定されます。Processクラスのオブジェクトの作成時にその親プロセスから認証キーを継承します。もしくはauthkeyに別のバイト文字列を設定することもできます。詳細は 認証キー を参照してください。
-
terminate()¶ プロセスを終了します。Unix 環境では
SIGTERMシグナルを、 Windows 環境ではTerminateProcess()を使用して終了させます。終了ハンドラーや finally 節などは、実行されないことに注意してください。このメソッドにより終了するプロセスの子孫プロセスは、終了 しません 。そういった子孫プロセスは単純に孤児になります。
警告
このメソッドの使用時に、関連付けられたプロセスがパイプやキューを使用している場合には、使用中のパイプやキューが破損して他のプロセスから使用できなくなる可能性があります。同様に、プロセスがロックやセマフォなどを取得している場合には、このプロセスが終了してしまうと他のプロセスのデッドロックの原因になるでしょう。
プロセスオブジェクトが作成したプロセスのみが
start(),join(),is_alive(),terminate()とexitcodeのメソッドを呼び出すべきです。以下の例では
Processのメソッドの使い方を示しています:>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print p, p.is_alive() <Process(Process-1, initial)> False >>> p.start() >>> print p, p.is_alive() <Process(Process-1, started)> True >>> p.terminate() >>> time.sleep(0.1) >>> print p, p.is_alive() <Process(Process-1, stopped[SIGTERM])> False >>> p.exitcode == -signal.SIGTERM True
-
-
exception
multiprocessing.BufferTooShort¶ この例外は
Connection.recv_bytes_into()によって発生し、バッファーオブジェクトが小さすぎてメッセージが読み込めないことを示します。eがBufferTooShortのインスタンスとすると、e.args[0]はバイト文字列でそのメッセージを取得できます。
16.6.2.2. パイプ (Pipe) とキュー (Queue)¶
複数のプロセスを使う場合、一般的にはメッセージパッシングをプロセス間通信に使用し、ロックのような同期プリミティブを使用しないようにします。
メッセージのやりとりのために Pipe() (2つのプロセス間の通信用)、もしくはキュー (複数のメッセージ生成プロセス (producer)、消費プロセス (consumer) の実現用) を使うことができます。
Queue, multiprocessing.queues.SimpleQueue, JoinableQueue は複数プロセスから生成/消費を行う FIFO キューです。これらのキューは標準ライブラリの Queue.Queue を模倣しています。 Queue には Python 2.5 の Queue.Queue クラスで導入された task_done() と join() メソッドがないことが違う点です。
もし JoinableQueue を使用するなら、キューから削除される各タスクのために JoinableQueue.task_done() を呼び出さなければ なりません 。さもないと、いつか完了していないタスクを数えるためのセマフォがオーバーフローし、例外を発生させるでしょう。
管理オブジェクトを使用することで共有キューを作成できることも覚えておいてください。詳細は マネージャー を参照してください。
注釈
multiprocessing は通常の Queue.Empty と、タイムアウトのシグナルを送るために Queue.Full 例外を使用します。それらは Queue からインポートする必要があるので multiprocessing の名前空間では利用できません。
注釈
オブジェクトがキューに追加される際、そのオブジェクトは pickle 化されています。そのため、バックグラウンドのスレッドが後になって下位層のパイプに pickle 化されたデータをフラッシュすることがあります。これにより、少し驚くような結果になりますが、実際に問題になることはないはずです。これが問題になるような状況では、かわりに manager を使ってキューを作成することができるからです。
空のキューの中にオブジェクトを追加した後、キューの
empty()メソッドがFalseを返すまでの間にごくわずかな遅延が起きることがあり、get_nowait()がQueue.Emptyを発生させることなく制御が呼び出し元に返ってしまうことがあります。複数のプロセスがオブジェクトをキューに詰めている場合、キューの反対側ではオブジェクトが詰められたのとは違う順序で取得される可能性があります。ただし、同一のプロセスから詰め込まれたオブジェクトは、それらのオブジェクト間では、必ず期待どおりの順序になります。
警告
Queue を利用しようとしている最中にプロセスを Process.terminate() や os.kill() で終了させる場合、キューにあるデータは破損し易くなります。終了した後で他のプロセスがキューを利用しようとすると、例外を発生させる可能性があります。
警告
上述したように、もし子プロセスがキューへ要素を追加するなら (かつ JoinableQueue.cancel_join_thread を使用しないなら) そのプロセスはバッファーされたすべての要素がパイプへフラッシュされるまで終了しません。
これは、そのプロセスを join しようとする場合、キューに追加されたすべての要素が消費されたことが確実でないかぎり、デッドロックを発生させる可能性があることを意味します。似たような現象で、子プロセスが非デーモンプロセスの場合、親プロセスは終了時に非デーモンのすべての子プロセスを join しようとしてハングアップする可能性があります。
マネージャーを使用して作成されたキューではこの問題はありません。詳細は プログラミングガイドライン を参照してください。
プロセス間通信におけるキューの使用例を知りたいなら 例 を参照してください。
-
multiprocessing.Pipe([duplex])¶ パイプの両端を表す
Connectionオブジェクトのペア(conn1, conn2)を返します。duplex が
True(デフォルト) ならパイプは双方向性です。duplex がFalseならパイプは一方向性で、conn1はメッセージの受信専用、conn2はメッセージの送信専用になります。
-
class
multiprocessing.Queue([maxsize])¶ パイプや2~3個のロック/セマフォを使用して実装されたプロセス共有キューを返します。あるプロセスが最初に要素をキューへ追加するとき、バッファーからパイプの中へオブジェクトを転送する供給スレッドが開始されます。
標準ライブラリの
Queueモジュールからの通常のQueue.EmptyやQueue.Full例外はタイムアウトのシグナルを送るために発生します。Queueはtask_done()やjoin()を除くQueue.Queueの全てのメソッドを実装します。-
qsize()¶ おおよそのキューのサイズを返します。マルチスレッディング/マルチプロセスの特性上、この数値は信用できません。
これは
sem_getvalue()が実装されていない Mac OS X のような Unix プラットホーム上でNotImplementedErrorを発生させる可能性があることを覚えておいてください。
-
empty()¶ キューが空っぽなら
Trueを、そうでなければFalseを返します。マルチスレッディング/マルチプロセシングの特性上、これは信用できません。
-
full()¶ キューがいっぱいなら
Trueを、そうでなければFalseを返します。マルチスレッディング/マルチプロセシングの特性上、これは信用できません。
-
put(obj[, block[, timeout]])¶ キューの中へ要素を追加します。オプションの引数 block が
True(デフォルト) 且つ timeout がNone(デフォルト) なら、空きスロットが利用可能になるまで必要であればブロックします。 timeout が正の数なら、最大 timeout 秒ブロックして、その時間内に空きスロットが利用できなかったらQueue.Full例外を発生させます。それ以外 ( block がFalse) で、空きスロットがすぐに利用可能な場合はキューに要素を追加します。そうでなければQueue.Full例外が発生します(その場合 timeout は無視されます)。
-
put_nowait(obj)¶ put(obj, False)と等価です。
-
get([block[, timeout]])¶ キューから要素を取り出して削除します。オプションの引数 block が
True(デフォルト) 且つ timeout がNone(デフォルト) なら、要素が取り出せるまで必要であればブロックします。 timeout が正の数なら、最大 timeout 秒ブロックして、その時間内に要素が取り出せなかったらQueue.Empty例外を発生させます。それ以外 ( block がFalse) で、要素がすぐに取り出せる場合は要素を返します。そうでなければQueue.Empty例外が発生します(その場合 timeout は無視されます)。
-
get_nowait()¶ get(False)と等価です。
QueueはQueue.Queueにはない追加メソッドがあります。これらのメソッドは通常、ほとんどのコードに必要ありません。-
close()¶ カレントプロセスからこのキューへそれ以上データが追加されないことを表します。バックグラウンドスレッドはパイプへバッファーされたすべてのデータをフラッシュするとすぐに終了します。これはキューがガベージコレクトされるときに自動的に呼び出されます。
-
join_thread()¶ バックグラウンドスレッドを join します。このメソッドは
close()が呼び出された後でのみ使用されます。バッファーされたすべてのデータがパイプへフラッシュされるのを保証するため、バックグラウンドスレッドが終了するまでブロックします。デフォルトでは、あるプロセスがキューを作成していない場合、終了時にキューのバックグラウンドスレッドを join しようとします。そのプロセスは
join_thread()が何もしないようにcancel_join_thread()を呼び出すことができます。
-
cancel_join_thread()¶ join_thread()がブロッキングするのを防ぎます。特にこれはバックグラウンドスレッドがそのプロセスの終了時に自動的に join されるのを防ぎます。詳細はjoin_thread()を参照してください。このメソッドは
allow_exit_without_flush()という名前のほうがよかったかもしれません。キューに追加されたデータが失われてしまいがちなため、このメソッドを使う必要はほぼ確実にないでしょう。本当にこれが必要になるのは、キューに追加されたデータを下位層のパイプにフラッシュすることなくカレントプロセスを直ちに終了する必要があり、かつ失われるデータに関心がない場合です。
注釈
このクラスに含まれる機能には、ホストとなるオペレーティングシステム上で動作している共有セマフォ (shared semaphore) を使用しているものがあります。これが使用できない場合には、このクラスが無効になり、
Queueをインスタンス化する時にImportErrorが発生します。詳細は bpo-3770 を参照してください。同様のことが、以下に列挙されている特殊なキューでも成り立ちます。-
-
class
multiprocessing.queues.SimpleQueue¶ 単純化された
Queue型です。ロックされたPipeに非常に似ています。-
empty()¶ もしキューが空ならば
Trueを、そうでなければFalseを返します。
-
get()¶ キューからアイテムを取り除いて返します。
-
put(item)¶ item をキューに追加します。
-
-
class
multiprocessing.JoinableQueue([maxsize])¶ JoinableQueueはQueueのサブクラスであり、task_done()やjoin()メソッドが追加されているキューです。-
task_done()¶ 以前にキューへ追加されたタスクが完了したことを表します。キュー消費スレッドによって使用されます。タスクをフェッチするために使用されるそれぞれの
get()では、次にtask_done()を呼び出してタスクの処理が完了したことをキューへ伝えます。もし
join()がブロッキング状態なら、全ての要素が処理されたときに復帰します(task_done()呼び出しが全ての要素からキュー内へput()されたと受け取ったことを意味します)。キューにある要素より多く呼び出された場合
ValueErrorが発生します。
-
join()¶ キューにあるすべてのアイテムが取り出されて処理されるまでブロックします。
キューに要素が追加されると未終了タスク数が増えます。キューの要素が取り出されて全て処理が完了したことを表す
task_done()を消費スレッドが呼び出すと数が減ります。未終了タスク数がゼロになるとjoin()はブロッキングを解除します。
-
16.6.2.3. その他の関数¶
-
multiprocessing.active_children()¶ カレントプロセスのアクティブな子プロセスのすべてのリストを返します。
これを呼び出すと "join" してすでに終了しているプロセスには副作用があります。
-
multiprocessing.cpu_count()¶ システムの CPU 数を返します。
NotImplementedErrorが送出される場合があります。
-
multiprocessing.current_process()¶ カレントプロセスに対応する
Processオブジェクトを返します。threading.current_thread()とよく似た関数です。
-
multiprocessing.freeze_support()¶ multiprocessingを使用するプログラムが Windows の実行可能形式を生成しようとして固まったときのサポートを追加します。(py2exe , PyInstaller や cx_Freeze でテストされています。)メインモジュールの
if __name__ == '__main__'の直後にこの関数を呼び出す必要があります。以下に例を示します:from multiprocessing import Process, freeze_support def f(): print 'hello world!' if __name__ == '__main__': freeze_support() Process(target=f).start()
もし
freeze_support()の行がない場合、固まった実行可能形式を実行しようとするとRuntimeErrorを発生させます。freeze_support()の呼び出しは Windows 以外の OS では効果がありません。さらに、もしモジュールが Windows の通常の Python インタプリタによって実行されているならば(プログラムがフリーズされていなければ)freeze_support()は効果がありません。
-
multiprocessing.set_executable()¶ 子プロセスを開始するときに、使用する Python インタープリターのパスを設定します。(デフォルトでは
sys.executableが使用されます)。コードに組み込むときは、おそらく次のようにする必要がありますset_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
子プロセスを生成する前に行なってください。 (Windows only)
16.6.2.4. Connection オブジェクト¶
Connection オブジェクトは pickle でシリアライズ可能なオブジェクトか文字列を送ったり、受け取ったりします。そういったオブジェクトはメッセージ指向の接続ソケットと考えられます。
Connection objects are usually created using
Pipe -- see also
リスナーとクライアント.
-
class
Connection¶ -
send(obj)¶ コネクションの相手側へ
recv()を使用して読み込むオブジェクトを送ります。オブジェクトは pickle でシリアライズ可能でなければなりません。 pickle が極端に大きすぎる (OS にも依りますが、およそ 32 MB+) と、
ValueError例外が送出されることがあります。
-
recv()¶ コネクションの相手側から
send()を使用して送られたオブジェクトを返します。 何か受け取るまでブロックします。何も受け取らずにコネクションの相手側でクローズされた場合EOFErrorが発生します。
-
fileno()¶ コネクションが使用するハンドラーか、ファイル記述子を返します。
-
close()¶ コネクションをクローズします。
コネクションがガベージコレクトされるときに自動的に呼び出されます。
-
poll([timeout])¶ 読み込み可能なデータがあるかどうかを返します。
timeout が指定されていなければすぐに返します。 timeout に数値を指定すると、最大指定した秒数をブロッキングします。 timeout に
Noneを指定するとタイムアウトせずにずっとブロッキングします。
-
send_bytes(buffer[, offset[, size]])¶ バッファインタフェースをサポートするオブジェクトから完全なメッセージとしてバイトデータを送ります。
offset が指定されると buffer のその位置からデータが読み込まれます。 size が指定されるとバッファーからその量のデータが読み込まれます。非常に大きなバッファー (OS に依存しますが、およそ 32MB+) を指定すると、
ValueError例外が発生するかもしれません。
-
recv_bytes([maxlength])¶ コネクションの相手側から送られたバイトデータの完全なメッセージを文字列として返します。何か受け取るまでブロックします。受け取るデータが何も残っておらず、相手側がコネクションを閉じていた場合、
EOFErrorが送出されます。maxlength を指定して、且つ maxlength よりメッセージが長い場合、
IOErrorを発生させて、それ以上はコネクションから読み込めなくなります。
-
recv_bytes_into(buffer[, offset])¶ コネクションの相手側から送られたバイトデータを buffer に読み込み、メッセージのバイト数を返します。 何か受け取るまでブロックします。何も受け取らずにコネクションの相手側でクローズされた場合
EOFErrorが発生します。buffer は書き込み可能なバッファインタフェースを備えたオブジェクトでなければなりません。 offset が与えられたら、その位置からバッファへメッセージが書き込まれます。オフセットは buffer バイトよりも小さい正の数でなければなりません。
バッファーがあまりに小さいと
BufferTooShort例外が発生します。eが例外インスタンスとすると完全なメッセージはe.args[0]で確認できます。
-
例えば:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes('thank you')
>>> a.recv_bytes()
'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告
Connection.recv() メソッドは受信したデータを自動的に unpickle 化します。それはメッセージを送ったプロセスが信頼できる場合を除いてセキュリティリスクになります。
そのため Pipe() を使用してコネクションオブジェクトを生成する場合を除いて、何らかの認証処理を実行した後で recv() や send() メソッドのみを使用すべきです。詳細は 認証キー を参照してください。
警告
もしプロセスがパイプの読み込みまたは書き込み中に kill されると、メッセージの境界がどこなのか分からなくなってしまうので、そのパイプ内のデータは破損してしまいがちです。
16.6.2.5. 同期プリミティブ¶
一般的にマルチプロセスプログラムは、マルチスレッドプログラムほどは同期プリミティブを必要としません。詳細は threading モジュールのドキュメントを参照してください。
マネージャーオブジェクトを使用して同期プリミティブを作成できることも覚えておいてください。詳細は マネージャー を参照してください。
-
class
multiprocessing.BoundedSemaphore([value])¶ 有限セマフォオブジェクト:
threading.BoundedSemaphoreのクローンです。インターフェイスがスレッドのものと一つだけ違います:
acquireメソッドの最初の引数は block 、オプション引数として timeout を取ります。これはLock.acquire()でのものと同じです。注釈
Mac OS X では
sem_getvalue()が実装されていないのでSemaphoreと区別がつきません。
-
class
multiprocessing.Condition([lock])¶ 状態変数:
threading.Conditionのクローンです。lock を指定するなら
multiprocessingのLockかRLockオブジェクトにすべきです。
-
class
multiprocessing.Event¶ threading.Eventのクローンです。このメソッドは、終了時の内部セマフォの状態を返すので、タイムアウトが与えられ、実際にオペレーションがタイムアウトしたのでなければ、必ずTrueを返します。バージョン 2.7 で変更: 以前はこのメソッドは常に
Noneを返していました。
-
class
multiprocessing.Lock¶ 再帰しないロックオブジェクトで、
threading.Lock相当のものです。プロセスやスレッドがロックをいったん獲得 (acquire) すると、それに続くほかのプロセスやスレッドが獲得しようとする際、それが解放 (release) されるまではブロックされます。解放はどのプロセス、スレッドからも行えます。スレッドに対して適用されるthreading.Lockのコンセプトと振る舞いは、特筆すべきものがない限り、プロセスとスレッドに適用されるmultiprocessing.Lockに引き継がれています。Lockは実際にはファクトリ関数で、デフォルトコンテキストで初期化されたmultiprocessing.synchronize.Lockのインスタンスを返すことに注意してください。Lockは context manager プロトコルをサポートしています。つまりwith文で使うことができます。-
acquire(block=True, timeout=None)¶ ブロックあり、またはブロックなしでロックを獲得します。
引数 block を
True(デフォルト) に設定して呼び出した場合、ロックがアンロック状態になるまでブロックします。ブロックから抜けるとそれをロック状態にしてからTrueを返します。threading.Lock.acquire()の最初の引数とは名前が違っているので注意してください。引数 block の値を
Falseにして呼び出すとブロックしません。 現在ロック状態であれば、直ちにFalseを返します。それ以外の場合には、ロックをロック状態にしてTrueを返します。timeout として正の浮動小数点数を与えて呼び出すと、ロックが獲得出来ないあいだ最大でこれで指定した秒数だけブロックします。 timeout 値の負数はゼロと同じです。 timeout 値の
None(デフォルト) は無限にブロックすることを意味します。 block がFalseの場合には timeout には実際的な意味はないので無視されます。ロックを獲得するとTrue、タイムアウトした場合はFalseで戻ります。 timeout 引数は類似品のthreading.Lock.acquire()にはないことに注意してください。
-
release()¶ ロックを解放します。これはロックを獲得したプロセスやスレッドだけでなく、任意のプロセスやスレッドから呼ぶことができます。
threading.Lock.release()と同じように振舞いますが、ロックされていない場合に呼び出すとValueErrorとなる点だけが違います。
-
-
class
multiprocessing.RLock¶ 再帰ロックオブジェクトで、
threading.RLock相当のものです。再帰ロックオブジェクトはそれを獲得 (acquire) したプロセスやスレッドが解放 (release) しなければなりません。プロセスやスレッドがロックをいったん獲得すると、同じプロセスやスレッドはブロックされずに再度獲得出来ます。そのプロセスやスレッドは獲得した回数ぶん解放しなければなりません。RLockは実際にはファクトリ関数で、デフォルトコンテキストで初期化されたmultiprocessing.synchronize.Lockのインスタンスを返すことに注意してください。RLockは context manager プロトコルをサポートしています。つまりwith文で使うことができます。-
acquire(block=True, timeout=None)¶ ブロックあり、またはブロックなしでロックを獲得します。
block 引数を
Trueにして呼び出した場合、ロックが既にカレントプロセスもしくはカレントスレッドが既に所有していない限りは、アンロック状態 (どのプロセス、スレッドも所有していない状態) になるまでブロックします。ブロックから抜けるとカレントプロセスもしくはカレントスレッドが (既に持っていなければ) 所有権を得て、再帰レベルをインクリメントし、Trueで戻ります。threading.RLock.acquire()の実装とはこの最初の引数の振る舞いが、その名前自身を始めとしていくつか違うので注意してください。block 引数を
Falseにして呼び出した場合、ブロックしません。ロックが他のプロセスもしくはスレッドにより獲得済み (つまり所有されている) であれば、カレントプロセスまたはカレントスレッドは所有権を得ず、再帰レベルも変更せずに、Falseで戻ります。ロックがアンロック状態の場合、カレントプロセスもしくはカレントスレッドは所有権を得て再帰レベルがインクリメントされ、Trueで戻ります。(---訳注: block の True/False 関係なくここでの説明では「所有権を持っている場合の2度目以降の aquire」の説明が欠けています。2度目以降の acquire では再帰レベルがインクリメントされて即座に返ります。全体読めばわかるとは思いますが一応。---)timeout 引数の使い方と振る舞いは
Lock.acquire()と同じです。 timeout 引数は類似品のthreading.RLock.acquire()にはないことに注意してください。
-
release()¶ 再帰レベルをデクリメントしてロックを解放します。デクリメント後に再帰レベルがゼロになった場合、ロックの状態をアンロック (いかなるプロセス、いかなるスレッドにも所有されていない状態) にリセットし、ロックの状態がアンロックになるのを待ってブロックしているプロセスもしくはスレッドがある場合にはその中のただ一つだけが処理を進行できるようにします。デクリメント後も再帰レベルがゼロでない場合、ロックの状態はロックのままで、呼び出し側のプロセスもしくはスレッドに所有されたままになります。
このメソッドは呼び出しプロセスあるいはスレッドがロックを所有している場合に限り呼び出してください。所有者でないプロセスもしくはスレッドによって呼ばれるか、あるいはアンロック (未所有) 状態で呼ばれた場合、
AssertionErrorが送出されます。同じ状況でのthreading.RLock.release()実装とは例外の型が異なるので注意してください。
-
-
class
multiprocessing.Semaphore([value])¶ セマフォオブジェクト:
threading.Semaphoreのクローンです。インターフェイスがスレッドのものと一つだけ違います:
acquireメソッドの最初の引数は block 、オプション引数として timeout を取ります。これはLock.acquire()でのものと同じです。
注釈
BoundedSemaphore, Lock, RLock と Semaphore の acquire() メソッドは threading ではサポートされていないタイムアウトパラメータを取ります。その引数はキーワード引数で受け取れる acquire(block=True, timeout=None) です。 block が True 且つ timeout が None ではないなら、タイムアウトが秒単位で設定されます。 block が False なら timeout は無視されます。
Mac OS X では sem_timedwait がサポートされていないので、acquire() にタイムアウトを与えて呼ぶと、ループ内でスリープすることでこの関数がエミュレートされます。
注釈
メインスレッドが BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() 又は Condition.wait() を呼び出してブロッキング状態のときに Ctrl-C で生成される SIGINT シグナルを受け取ると、その呼び出しはすぐに中断されて KeyboardInterrupt が発生します。
これは同等のブロッキング呼び出しが実行中のときに SIGINT が無視される threading の振る舞いとは違っています。
注釈
このパッケージに含まれる機能には、ホストとなるオペレーティングシステム上で動作している共有セマフォを使用しているものがあります。これが使用できない場合には、multiprocessing.synchronize モジュールが無効になり、このモジュールのインポート時に ImportError が発生します。詳細は bpo-3770 を参照してください。
16.6.2.7. マネージャー¶
Manager は別のプロセス間で共有されるデータの作成方法を提供します。マネージャオブジェクトは 共有オブジェクト を管理するサーバプロセスを制御します。他のプロセスはプロキシ経由で共有オブジェクトへアクセスすることができます。
プロセス間でオブジェクトを共有するために使用される
SyncManagerオブジェクトを返します。返されたマネージャーオブジェクトは生成される子プロセスに対応付けられ、共有オブジェクトを作成するメソッドや、共有オブジェクトに対応するプロキシを返すメソッドを持ちます。
マネージャープロセスは親プロセスが終了するか、ガベージコレクトされると停止します。マネージャークラスは multiprocessing.managers モジュールで定義されています:
-
class
multiprocessing.managers.BaseManager([address[, authkey]])¶ BaseManager オブジェクトを作成します。
作成後、
start()かget_server().serve_forever()を呼び出して、マネージャーオブジェクトが確実に開始されたマネージャープロセスを参照するようにしてください。address はマネージャープロセスが新たなコネクションを待ち受けるアドレスです。address が
Noneの場合、任意のアドレスが設定されます。authkey はサーバプロセスへ接続しようとするコネクションの正当性を検証するために使用される認証キーです。 authkey が
Noneの場合current_process().authkeyが使用されます。 authkey を使用する場合は文字列でなければなりません。-
start([initializer[, initargs]])¶ マネージャーを開始するためにサブプロセスを開始します。initializer が
Noneでなければ、サブプロセスは開始時にinitializer(*initargs)を呼び出します。
-
get_server()¶ マネージャーの制御下にある実際のサーバーを表す
Serverオブジェクトを返します。Serverオブジェクトはserve_forever()メソッドをサポートします:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey='abc') >>> server = manager.get_server() >>> server.serve_forever()
Serverはさらにaddress属性も持っています。
-
connect()¶ ローカルからリモートのマネージャーオブジェクトへ接続します:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 5000), authkey='abc') >>> m.connect()
-
register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶ マネージャークラスで呼び出し可能オブジェクト(callable)や型を登録するために使用されるクラスメソッドです。
typeid は特に共有オブジェクトの型を識別するために使用される "型識別子" です。これは文字列でなければなりません。
callable はこの型識別子のオブジェクトを作成するために使用される呼び出し可能オブジェクトです。マネージャインスタンスが
from_address()クラスメソッドを使用して作成されるか、 create_method 引数がFalseの場合はNoneでも構いません。proxytype はこの typeid で共有オブジェクトのプロキシを作成するために使用される
BaseProxyのサブクラスです。Noneの場合、プロキシクラスは自動的に作成されます。exposed は
BaseProxy._callmethod()を使用したアクセスが許されるべき typeid をプロキシするメソッド名のシーケンスを指定するために使用されます (exposed がNoneの場合proxytype._exposed_が存在すればそれが代わりに使用されます)。exposed リストが指定されない場合、共有オブジェクトのすべての "パブリックメソッド" がアクセス可能になります。 (ここでいう "パブリックメソッド" とは__call__()メソッドを持つものと名前が'_'で始まらないあらゆる属性を意味します。)method_to_typeid はプロキシが返す exposed メソッドの返り値の型を指定するために使用されるマッピングで、メソッド名を typeid 文字列にマップします。 (method_to_typeid が
Noneの場合proxytype._method_to_typeid_が存在すれば、それが代わりに使用されます。) メソッド名がこのマッピングのキーではないか、マッピングがNoneの場合、そのメソッドによって返されるオブジェクトが値として (by value) コピーされます。create_method は、共有オブジェクトを作成し、それに対するプロキシを返すようサーバープロセスに伝える、名前 typeid のメソッドを作成するかを決定します。デフォルトでは
Trueです。
BaseManagerインスタンスも読み取り専用属性を1つ持っています:-
address¶ マネージャーが使用するアドレスです。
-
-
class
multiprocessing.managers.SyncManager¶ プロセス間の同期のために使用される
BaseManagerのサブクラスです。multiprocessing.Manager()はこの型のオブジェクトを返します。また共有のリストやディクショナリの作成もサポートします。
-
BoundedSemaphore([value])¶ 共有
threading.BoundedSemaphoreオブジェクトを作成して、そのプロキシを返します。
-
Condition([lock])¶ 共有
threading.Conditionオブジェクトを作成して、そのプロキシを返します。lock が提供される場合
threading.Lockかthreading.RLockオブジェクトのためのプロキシになります。
-
Event()¶ 共有
threading.Eventオブジェクトを作成して、そのプロキシを返します。
-
Lock()¶ 共有
threading.Lockオブジェクトを作成して、そのプロキシを返します。
-
Queue([maxsize])¶ 共有
Queue.Queueオブジェクトを作成して、そのプロキシを返します。
-
RLock()¶ 共有
threading.RLockオブジェクトを作成して、そのプロキシを返します。
-
Semaphore([value])¶ 共有
threading.Semaphoreオブジェクトを作成して、そのプロキシを返します。
-
Array(typecode, sequence)¶ 配列を作成して、そのプロキシを返します。
-
Value(typecode, value)¶ 書き込み可能な
value属性を作成して、そのプロキシを返します。
-
dict()¶ -
dict(mapping) -
dict(sequence) 共有
dictオブジェクトを作成して、そのプロキシを返します。
-
list()¶ -
list(sequence) 共有
listオブジェクトを作成して、そのプロキシを返します。
注釈
プロキシには、自身の持つ辞書とリストのミュータブルな値や項目がいつ変更されたのかを知る方法がないため、これらの値や項目の変更はマネージャーを通して伝播しません。このような要素を変更するには、コンテナーのプロキシに変更されたオブジェクトを再代入してください:
# create a list proxy and append a mutable object (a dictionary) lproxy = manager.list() lproxy.append({}) # now mutate the dictionary d = lproxy[0] d['a'] = 1 d['b'] = 2 # at this point, the changes to d are not yet synced, but by # reassigning the dictionary, the proxy is notified of the change lproxy[0] = d
-
-
class
multiprocessing.managers.Namespace¶ SyncManagerに登録することのできる型です。Namespace オブジェクトにはパブリックなメソッドはありませんが、書き込み可能な属性を持ちます。そのオブジェクト表現はその属性の値を表示します。
しかし、Namespace オブジェクトのためにプロキシを使用するとき
'_'が先頭に付く属性はプロキシの属性になり、参照対象の属性にはなりません:>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print Global Namespace(x=10, y='hello')
16.6.2.7.1. カスタマイズされたマネージャー¶
独自のマネージャーを作成するには、BaseManager のサブクラスを作成して、 マネージャークラスで呼び出し可能なオブジェクトか新たな型を登録するために register() クラスメソッドを使用します。例えば:
from multiprocessing.managers import BaseManager
class MathsClass(object):
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
manager = MyManager()
manager.start()
maths = manager.Maths()
print maths.add(4, 3) # prints 7
print maths.mul(7, 8) # prints 56
16.6.2.7.2. リモートマネージャーを使用する¶
あるマシン上でマネージャーサーバーを実行して、他のマシンからそのサーバーを使用するクライアントを持つことができます(ファイアウォールを通過できることが前提)。
次のコマンドを実行することでリモートクライアントからアクセスを受け付ける1つの共有キューのためにサーバーを作成します:
>>> from multiprocessing.managers import BaseManager
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
あるクライアントからサーバーへのアクセスは次のようになります:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
別のクライアントもそれを使用することができます:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
ローカルプロセスもそのキューへアクセスすることができます。クライアント上で上述のコードを使用してアクセスします:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super(Worker, self).__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
16.6.2.8. Proxy オブジェクト¶
プロキシは別のプロセスで(おそらく)有効な共有オブジェクトを 参照する オブジェクトです。共有オブジェクトはプロキシの 参照対象 になるということができます。複数のプロキシオブジェクトが同じ参照対象を持つ可能性もあります。
プロキシオブジェクトはその参照対象が持つ対応メソッドを実行するメソッドを持ちます。(そうは言っても、参照対象のすべてのメソッドが必ずしもプロキシ経由で利用可能ではありません) プロキシは通常その参照対象ができることと同じ方法で使用されます:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print l
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print repr(l)
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
プロキシに str() を適用すると参照対象のオブジェクト表現を返すのに対して、 repr() を適用するとプロキシのオブジェクト表現を返すことに注意してください。
プロキシオブジェクトの重要な機能はプロセス間で受け渡し可能な pickle 化ができることです。しかし、プロキシが対応するマネージャープロセスに対して送信される場合、そのプロキシを unpickle するとその参照対象を生成することを覚えておいてください。例えば、これはある共有オブジェクトに別の共有オブジェクトが含められることを意味します:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print a, b
[[]] []
>>> b.append('hello')
>>> print a, b
[['hello']] ['hello']
注釈
multiprocessing のプロキシ型は値による比較に対して何もサポートしません。そのため、例えば以下のようになります:
>>> manager.list([1,2,3]) == [1,2,3]
False
比較を行いたいときは参照対象のコピーを使用してください。
-
class
multiprocessing.managers.BaseProxy¶ プロキシオブジェクトは
BaseProxyのサブクラスのインスタンスです。-
_callmethod(methodname[, args[, kwds]])¶ プロキシの参照対象のメソッドの実行結果を返します。
proxyがプロキシで、プロキシ内の参照対象がobjならこの式proxy._callmethod(methodname, args, kwds)
はこの式を評価します
getattr(obj, methodname)(*args, **kwds)
(マネージャープロセス内の)。
返される値はその呼び出し結果のコピーか、新たな共有オブジェクトに対するプロキシになります。詳細は
BaseManager.register()の method_to_typeid 引数のドキュメントを参照してください。その呼び出しによって例外が発生した場合、
_callmethod()によってその例外は再送出されます。他の例外がマネージャープロセスで発生したなら、RemoteError例外に変換されたものが_callmethod()によって送出されます。特に methodname が 公開 されていない場合は例外が発生することに注意してください。
_callmethod()の使用例になります:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getslice__', (2, 7)) # equiv to `l[2:7]` [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equiv to `l[20]` Traceback (most recent call last): ... IndexError: list index out of range
-
_getvalue()¶ 参照対象のコピーを返します。
参照対象が unpickle 化できるなら例外を発生します。
-
__repr__()¶ プロキシオブジェクトのオブジェクト表現を返します。
-
__str__()¶ 参照対象のオブジェクト表現を返します。
-
16.6.2.8.1. クリーンアップ¶
プロキシオブジェクトは弱参照(weakref)コールバックを使用します。プロキシオブジェクトがガベージコレクトされるときにその参照対象が所有するマネージャーからその登録を取り消せるようにするためです。
共有オブジェクトはプロキシが参照しなくなったときにマネージャープロセスから削除されます。
16.6.2.9. プロセスプール¶
Pool クラスでタスクを実行するプロセスのプールを作成することができます。
-
class
multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])¶ プロセスプールオブジェクトは、ジョブを送り込めるワーカープロセスのプールを制御します。タイムアウトやコールバックのある非同期の実行をサポートし、並列 map 実装を持ちます。
processes は使用するワーカープロセスの数です。 processes が
Noneの場合cpu_count()が返す数を使用します。 initializer がNoneでない場合、各ワーカープロセスが開始時にinitializer(*initargs)を呼び出します。プールオブジェクトのメソッドは、そのプールを作成したプロセスのみが呼び出すべきです。
バージョン 2.7 で追加: maxtasksperchild は、ワーカープロセスが exit して新たなワーカープロセスと置き替えられるまでの間に、ワーカープロセスが完了することのできるタスクの数です。この設定により未利用のリソースが解放されるようなります。デフォルトの maxtasksperchild は
Noneで、これはワーカープロセスがプールと同じ期間だけ生き続けるということを意味します。注釈
Pool中のワーカープロセスは、典型的にはプールのワークキューの存続期間とちょうど同じだけ生き続けます。ワーカーに確保されたリソースを解放するために (Apache, mod_wsgi, などのような) 他のシステムによく見られるパターンは、プール内のワーカーが設定された量だけの仕事を完了したら exit とクリーンアップを行い、古いプロセスを置き換えるために新しいプロセスを生成するというものです。Poolの maxtasksperchild 引数は、この能力をエンドユーザーに提供します。-
apply(func[, args[, kwds]])¶ 組み込み関数
apply()の同等品で、結果が取得出来るようになるまでブロックします。ですから作業を並列に実行するのにより相応しいのはapply_async()です。加えて func はプールのワーカーのうち一つの中で実行されるに過ぎません。
-
apply_async(func[, args[, kwds[, callback]]])¶ apply()メソッドの派生版で結果オブジェクトを返します。callback を指定する場合は 1 つの引数を受け取る呼び出し可能オブジェクトでなければなりません。その結果を返せるようになったときに callback が結果オブジェクトに対して (その呼び出しが失敗しない限り)適用されます。コールバックは直ちに完了すべきです。なぜなら、そうしなければ、結果を扱うスレッドがブロックするからです。
-
map(func, iterable[, chunksize])¶ map()組み込み関数の並列版です (iterable な引数を1つだけサポートするという違いはありますが)。結果が出るまでブロックします。このメソッドはイテラブルをいくつものチャンクに分割し、プロセスプールにそれぞれ独立したタスクとして送ります。(概算の) チャンクサイズは chunksize を正の整数に設定することで指定できます。
-
map_async(func, iterable[, chunksize[, callback]])¶ map()メソッドの派生版で結果オブジェクトを返します。callback を指定する場合は 1 つの引数を受け取る呼び出し可能オブジェクトでなければなりません。その結果を返せるようになったときに callback が結果オブジェクトに対して (その呼び出しが失敗しない限り)適用されます。コールバックは直ちに完了すべきです。なぜなら、そうしなければ、結果を扱うスレッドがブロックするからです。
-
imap(func, iterable[, chunksize])¶ itertools.imap()と同じです。chunksize 引数は
map()メソッドで使用されるものと同じです。 引数 iterable がとても長いなら chunksize に大きな値を指定して使用する方がデフォルト値の1を使用するよりもジョブの完了が かなり 速くなります。また chunksize が
1の場合imap()メソッドが返すイテレーターのnext()メソッドはオプションで timeout パラメーターを持ちます。next(timeout)は、その結果が timeout 秒以内に返されないときにmultiprocessing.TimeoutErrorを発生させます。
-
imap_unordered(func, iterable[, chunksize])¶ イテレーターが返す結果の順番が任意の順番で良いと見なされることを除けば
imap()と同じです。 (ワーカープロセスが1つしかない場合のみ "正しい" 順番になることが保証されます。)
-
close()¶ これ以上プールでタスクが実行されないようにします。すべてのタスクが完了した後でワーカープロセスが終了します。
-
terminate()¶ 実行中の処理を完了させずにワーカープロセスをすぐに停止します。プールオブジェクトがガベージコレクトされるときに
terminate()が呼び出されます。
-
join()¶ ワーカープロセスが終了するのを待ちます。
join()を使用する前にclose()かterminate()を呼び出さなければなりません。
-
-
class
multiprocessing.pool.AsyncResult¶ Pool.apply_async()やPool.map_async()で返される結果のクラスです。-
get([timeout])¶ 結果を受け取ったときに返します。 timeout が
Noneではなくて、その結果が timeout 秒以内に受け取れない場合multiprocessing.TimeoutErrorが発生します。リモートの呼び出しが例外を発生させる場合、その例外はget()が再発生させます。
-
wait([timeout])¶ その結果が有効になるか timeout 秒経つまで待ちます。
-
ready()¶ その呼び出しが完了しているかどうかを返します。
-
successful()¶ その呼び出しが例外を発生させることなく完了したかどうかを返します。その結果が返せる状態でない場合
AssertionErrorが発生します。
-
次の例はプールの使用例を紹介します:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print it.next() # prints "0"
print it.next() # prints "1"
print it.next(timeout=1) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print result.get(timeout=1) # raises multiprocessing.TimeoutError
16.6.2.10. リスナーとクライアント¶
Usually message passing between processes is done using queues or by using
Connection objects returned by Pipe().
しかし multiprocessing.connection モジュールはさらに柔軟な仕組みがあります。基本的にはソケットもしくは Windows の名前付きパイプを扱う高レベルのメッセージ指向 API を提供して hmac モジュールを使用して ダイジェスト認証 もサポートします。
-
multiprocessing.connection.deliver_challenge(connection, authkey)¶ ランダム生成したメッセージをコネクションの相手側へ送信して応答を待ちます。
その応答がキーとして authkey を使用するメッセージのダイジェストと一致する場合、コネクションの相手側へ歓迎メッセージを送信します。そうでなければ
AuthenticationErrorを発生させます。
-
multiprocessing.connection.answer_challenge(connection, authkey)¶ メッセージを受信して、そのキーとして authkey を使用するメッセージのダイジェストを計算し、ダイジェストを送り返します。
歓迎メッセージを受け取れない場合
AuthenticationErrorが発生します。
-
multiprocessing.connection.Client(address[, family[, authenticate[, authkey]]])¶ Attempt to set up a connection to the listener which is using address address, returning a
Connection.コネクション種別は family 引数で決定しますが、一般的には address のフォーマットから推測できるので、これは指定されません。 (アドレスフォーマット を参照してください)
authenticate が
Trueか authkey が文字列の場合、ダイジェスト認証が使用されます。認証に使用されるキーは authkey 、又は authkey がNoneの場合はcurrent_process().authkeyのどちらかです。認証が失敗した場合AuthenticationErrorが発生します。 認証キー を参照してください。
-
class
multiprocessing.connection.Listener([address[, family[, backlog[, authenticate[, authkey]]]]])¶ コネクションを '待ち受ける' 束縛されたソケットか Windows の名前付きパイプのラッパーです。
address はリスナーオブジェクトの束縛されたソケットか名前付きパイプが使用するアドレスです。
注釈
'0.0.0.0' のアドレスを使用する場合、Windows 上の終点へ接続することができません。終点へ接続したい場合は '127.0.0.1' を使用すべきです。
family は使用するソケット(名前付きパイプ)の種別です。これは
'AF_INET'(TCP ソケット),'AF_UNIX'(Unix ドメインソケット) または'AF_PIPE'(Windows 名前付きパイプ) という文字列のどれか1つになります。これらのうち'AF_INET'のみが利用可能であることが保証されています。 family がNoneの場合 address のフォーマットから推測されたものが使用されます。 address もNoneの場合はデフォルトが選択されます。詳細は アドレスフォーマット を参照してください。 family が'AF_UNIX'で address がNoneの場合tempfile.mkstemp()を使用して作成されたプライベートな一時ディレクトリにソケットが作成されます。リスナーオブジェクトがソケットを使用する場合、ソケットに束縛されるときに backlog (デフォルトでは1つ) がソケットの
listen()メソッドに対して渡されます。authenticate が
True(デフォルトではFalse) か authkey がNoneではない場合、ダイジェスト認証が使用されます。authkey が文字列の場合、認証キーとして使用されます。そうでない場合は
Noneでなければいけません。authkey が
None且つ authenticate がTrueの場合current_process().authkeyが認証キーとして使用されます。 authkey がNone且つ authentication がFalseの場合、認証は行われません。もし認証が失敗した場合AuthenticationErrorが発生します。詳細 認証キー を参照してください。-
accept()¶ Accept a connection on the bound socket or named pipe of the listener object and return a
Connectionobject. If authentication is attempted and fails, thenAuthenticationErroris raised.
-
close()¶ リスナーオブジェクトの名前付きパイプか束縛されたソケットをクローズします。これはリスナーがガベージコレクトされるときに自動的に呼ばれます。そうは言っても、明示的に close() を呼び出す方が望ましいです。
リスナーオブジェクトは次の読み取り専用属性を持っています:
-
address¶ リスナーオブジェクトが使用中のアドレスです。
-
last_accepted¶ 最後にコネクションを受け付けたアドレスです。有効なアドレスがない場合は
Noneになります。
-
The module defines the following exceptions:
-
exception
multiprocessing.connection.ProcessError¶ すべての
multiprocessing例外の基底クラスです。
-
exception
multiprocessing.connection.BufferTooShort¶ この例外は
Connection.recv_bytes_into()によって発生し、バッファーオブジェクトが小さすぎてメッセージが読み込めないことを示します。
-
exception
multiprocessing.connection.AuthenticationError¶ 認証エラーがあった場合に送出されます。
-
exception
multiprocessing.connection.TimeoutError¶ タイムアウトをサポートするメソッドでタイムアウトが過ぎたときに送出されます。
例
次のサーバーコードは認証キーとして 'secret password' を使用するリスナーを作成します。このサーバーはコネクションを待ってクライアントへデータを送信します:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
listener = Listener(address, authkey='secret password')
conn = listener.accept()
print 'connection accepted from', listener.last_accepted
conn.send([2.25, None, 'junk', float])
conn.send_bytes('hello')
conn.send_bytes(array('i', [42, 1729]))
conn.close()
listener.close()
次のコードはサーバーへ接続して、サーバーからデータを受信します:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
conn = Client(address, authkey='secret password')
print conn.recv() # => [2.25, None, 'junk', float]
print conn.recv_bytes() # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print conn.recv_bytes_into(arr) # => 8
print arr # => array('i', [42, 1729, 0, 0, 0])
conn.close()
16.6.2.10.1. アドレスフォーマット¶
'AF_INET'アドレスは(hostname, port)のタプルになります。 hostname は文字列で port は整数です。'AF_UNIX'アドレスはファイルシステム上のファイル名の文字列です。'AF_PIPE'アドレスは、次の形式を持つ文字列ですr'\.\pipe{PipeName}'。 ServerName という名前のリモートコンピューター上の名前付きパイプに接続するためにClient()を使用するには、代わりにr'\ServerName\pipe{PipeName}'形式のアドレスを使用する必要があります。
デフォルトでは、2つのバックスラッシュで始まる文字列は 'AF_UNIX' よりも 'AF_PIPE' として推測されることに注意してください。
16.6.2.11. 認証キー¶
When one uses Connection.recv(), the
data received is automatically
unpickled. Unfortunately unpickling data from an untrusted source is a security
risk. Therefore Listener and Client() use the hmac module
to provide digest authentication.
認証キーはパスワードとしてみなされる文字列です。コネクションが確立すると、双方の終点で正しい接続先であることを証明するために知っているお互いの認証キーを要求します。 (双方の終点が同じキーを使用して通信しようとしても、コネクション上でそのキーを送信することは できません 。)
認証が要求されているにもかかわらず認証キーが指定されていない場合 current_process().authkey の返す値が使用されます。 (詳細は Process を参照してください。) この値はカレントプロセスを作成する Process オブジェクトによって自動的に継承されます。 これは(デフォルトでは)複数プロセスのプログラムの全プロセスが相互にコネクションを 確立するときに使用される1つの認証キーを共有することを意味します。
適当な認証キーを os.urandom() を使用して生成することもできます。
16.6.2.12. ログ記録¶
ロギングのためにいくつかの機能が利用可能です。しかし logging パッケージは、 (ハンドラー種別に依存して)違うプロセスからのメッセージがごちゃ混ぜになるので、プロセスの共有ロックを使用しないことに注意してください。
-
multiprocessing.get_logger()¶ multiprocessingが使用するロガーを返します。必要に応じて新たなロガーを作成します。最初に作成するとき、ロガーはレベルに
logging.NOTSETが設定されていてデフォルトハンドラーがありません。このロガーへ送られるメッセージはデフォルトではルートロガーへ伝播されません。Windows 上では子プロセスが親プロセスのロガーレベルを継承しないことに注意してください。さらにその他のロガーのカスタマイズ内容もすべて継承されません。
-
multiprocessing.log_to_stderr()¶ この関数は
get_logger()に対する呼び出しを実行しますが、 get_logger によって作成されるロガーを返すことに加えて、'[%(levelname)s/%(processName)s] %(message)s'のフォーマットを使用してsys.stderrへ出力を送るハンドラーを追加します。
以下にロギングを有効にした例を紹介します:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
これらの2つのロギング関数があることに加えて、 multiprocessing モジュールも2つの追加ロギングレベル属性を提供します。それは SUBWARNING と SUBDEBUG です。次の表は通常のレベル階層にうまく適合していることを表します。
レベル |
Numeric value |
|---|---|
|
25 |
|
5 |
完全なロギングレベルの表については logging モジュールを参照してください。
こういった追加のロギングレベルは主に multiprocessing モジュールの信頼できるデバッグメッセージのために使用されます。以下に上述の例に SUBDEBUG を有効にしたものを紹介します:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(multiprocessing.SUBDEBUG)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../pymp-djGBXN/listener-...'
>>> del m
[SUBDEBUG/MainProcess] finalizer calling ...
[INFO/MainProcess] sending shutdown message to manager
[DEBUG/SyncManager-...] manager received shutdown message
[SUBDEBUG/SyncManager-...] calling <Finalize object, callback=unlink, ...
[SUBDEBUG/SyncManager-...] finalizer calling <built-in function unlink> ...
[SUBDEBUG/SyncManager-...] calling <Finalize object, dead>
[SUBDEBUG/SyncManager-...] finalizer calling <function rmtree at 0x5aa730> ...
[INFO/SyncManager-...] manager exiting with exitcode 0
16.6.2.13. multiprocessing.dummy モジュール¶
multiprocessing.dummy は multiprocessing の API を複製しますが threading モジュールのラッパーでしかありません。
16.6.3. プログラミングガイドライン¶
multiprocessing を使用するときに守るべき一定のガイドラインとイディオムを挙げます。
16.6.3.1. 全てのプラットホーム¶
共有状態を避ける
できるだけプロセス間で巨大なデータを移動することは避けるようにすべきです。
プロセス間の通信には、
threadingモジュールの低レベルな同期プリミティブを使うのではなく、キューやパイプを使うのが良いでしょう。
pickle 化の可能性
プロキシのメソッドへの引数は、 pickle 化できるものにしてください。
プロキシのスレッドセーフ性
1 つのプロキシオブジェクトは、ロックで保護しないかぎり、2 つ以上のスレッドから使用してはいけません。
(異なるプロセスで 同じ プロキシを使用することは問題ではありません。)
ゾンビプロセスを join する
Unix 上ではプロセスが終了したときに join しないと、そのプロセスはゾンビになります。新たなプロセスが開始する (または
active_children()が呼ばれる) ときに、join されていないすべての完了プロセスが join されるので、あまり多くにはならないでしょう。また、終了したプロセスのProcess.is_aliveはそのプロセスを join します。そうは言っても、自分で開始したすべてのプロセスを明示的に join することはおそらく良いプラクティスです。
pickle/unpickle より継承する方が良い
Windows 上では
multiprocessingの多くの型は子プロセスが使用できるようにするため pickle 化できなければなりません。しかし、パイプやキューを使用して他のプロセスへ共有オブジェクトを送ることは一般的に避けるべきです。その代わり、どこかで作成された共有リソースへのアクセスが必要なプロセスは、祖先のプロセスからそれを継承するようにプログラムを変更すべきです。
プロセスの強制終了を避ける
あるプロセスを停止するために
Process.terminateメソッドを使用すると、そのプロセスが現在使用されている (ロック、セマフォ、パイプやキューのような) 共有リソースを破壊したり他のプロセスから利用できない状態を引き起こし易いです。そのため、共有リソースを使用しないプロセスでのみ
Process.terminateを使用することを考慮することがおそらく最善の方法です。
キューを使用するプロセスを join する
キューに要素を追加するプロセスは、全てのバッファされた要素が "feeder" スレッドによって下位層のパイプに対してフィードされるまで終了を待つということを覚えておいてください。 (子プロセスはこの動作を避けるためにキューの
cancel_join_thread()メソッドを呼ぶことができます。)これはキューを使用するときに、キューに追加されたすべての要素が最終的にそのプロセスが join される前に削除されていることを確認する必要があることを意味します。そうしないと、そのキューに要素が追加したプロセスの終了を保証できません。デーモンではないプロセスは自動的に join されることも覚えておいてください。
次の例はデッドロックを引き起こします:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()修正するには最後の2行を入れ替えます(または単純に
p.join()の行を削除します)。
明示的に子プロセスへリソースを渡す
Unix 上では子プロセスはグローバルなリソースを使用する親プロセスが作成した共有リソースを使用することができます。しかし、引数としてそのオブジェクトを子プロセスのコンストラクタへ渡す方が良いです。
これにより、コードが (潜在的に) Windows 互換になるだけでなく、子プロセスが生き続ける限り、そのオブジェクトは親プロセスでガベージコレクトされないことも保証されます。これは親プロセスでそのオブジェクトがガベージコレクトされるときにリソースが解放される場合に重要になるでしょう。
そのため、例えば
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()は、次のように書き直すべきです
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
sys.stdin を file-like オブジェクトに置き換えることに注意する
multiprocessingは元々無条件に:os.close(sys.stdin.fileno())を
multiprocessing.Process._bootstrap()メソッドの中で呼び出していました --- これはプロセス内プロセス (processes-in-processes) で問題が起こしてしまいます。そこで、これは以下のように変更されました:sys.stdin.close() sys.stdin = open(os.devnull)これによってプロセス同士が衝突して bad file descripter エラーを起こすという根本的な問題は解決しましたが、アプリケーションの出力バッファーを
sys.stdin()から "file-like オブジェクト" に置き換えるという潜在的危険を持ち込んでしまいました。危険というのは、複数のプロセスが file-like オブジェクトのclose()を呼び出すと、オブジェクトに同じデータが何度もフラッシュされ、破損してしまう可能性がある、というものです。もし file-like オブジェクトを書いて独自のキャッシュを実装するなら、キャッシュするときに常に pid を記録しておき、pid が変わったらキュッシュを捨てることで、フォークセーフにできます。例:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache
16.6.3.2. Windows¶
Windows には os.fork() がないのでいくつか追加制限があります:
さらなる pickle 化の可能性
Process.__init__()へ渡す全ての引数は、pickle化できるものにしてください。これはとりわけ、Windows 上で、束縛メソッドや非束縛メソッドを直接target引数として使ってはならない、ということを意味します。メソッドではなく、関数を使うしかありません。また
Processをサブクラス化する場合、そのインスタンスがProcess.startメソッドが呼ばれたときに pickle 化できるようにしてください。
グローバル変数
子プロセスで実行されるコードがグローバル変数にアクセスしようとする場合、子プロセスが見るその値は
Process.startが呼ばれたときの親プロセスの値と同じではない可能性があります。しかし、単にモジュールレベルの定数であるグローバル変数なら問題にはなりません。
メインモジュールの安全なインポート
新たな Python インタプリタによるメインモジュールのインポートが、意図しない副作用 (新たなプロセスを開始する等) を起こさずできるようにしてください。
例えば Windows で次のモジュールを実行しようとすると
RuntimeErrorで失敗します:from multiprocessing import Process def foo(): print 'hello' p = Process(target=foo) p.start()代わりに、次のように
if __name__ == '__main__':を使用してプログラムの "エントリポイント" を保護すべきです:from multiprocessing import Process, freeze_support def foo(): print 'hello' if __name__ == '__main__': freeze_support() p = Process(target=foo) p.start()(
freeze_support()行はプログラムが固まらずに通常どおり実行されるなら取り除けます。)これは新たに生成された Python インタープリターがそのモジュールを安全にインポートして、モジュールの
foo()関数を実行します。プールまたはマネージャーがメインモジュールで作成される場合に似たような制限が適用されます。
16.6.4. 例¶
カスタマイズされたマネージャーやプロキシの作成方法と使用方法を紹介します:
#
# This module shows how to use arbitrary callables with a subclass of
# `BaseManager`.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo(object):
def f(self):
print 'you called Foo.f()'
def g(self):
print 'you called Foo.g()'
def _h(self):
print 'you called Foo._h()'
# A simple generator function
def baz():
for i in xrange(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ('next', '__next__')
def __iter__(self):
return self
def next(self):
return self._callmethod('next')
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print '-' * 20
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print '-' * 20
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print '-' * 20
it = manager.baz()
for i in it:
print '<%d>' % i,
print
print '-' * 20
op = manager.operator()
print 'op.add(23, 45) =', op.add(23, 45)
print 'op.pow(2, 94) =', op.pow(2, 94)
print 'op.getslice(range(10), 2, 6) =', op.getslice(range(10), 2, 6)
print 'op.repeat(range(5), 3) =', op.repeat(range(5), 3)
print 'op._exposed_ =', op._exposed_
##
if __name__ == '__main__':
freeze_support()
test()
Pool を使用する例です:
#
# A test of `multiprocessing.Pool` class
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
def f(x):
return 1.0 / (x-5.0)
def pow3(x):
return x**3
def noop(x):
pass
#
# Test code
#
def test():
print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
#
# Create pool
#
PROCESSES = 4
print 'Creating pool with %d processes\n' % PROCESSES
pool = multiprocessing.Pool(PROCESSES)
print 'pool = %s' % pool
print
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print 'Ordered results using pool.apply_async():'
for r in results:
print '\t', r.get()
print
print 'Ordered results using pool.imap():'
for x in imap_it:
print '\t', x
print
print 'Unordered results using pool.imap_unordered():'
for x in imap_unordered_it:
print '\t', x
print
print 'Ordered results using pool.map() --- will block till complete:'
for x in pool.map(calculatestar, TASKS):
print '\t', x
print
#
# Simple benchmarks
#
N = 100000
print 'def pow3(x): return x**3'
t = time.time()
A = map(pow3, xrange(N))
print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
(N, time.time() - t)
t = time.time()
B = pool.map(pow3, xrange(N))
print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
(N, time.time() - t)
t = time.time()
C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
' seconds' % (N, N//8, time.time() - t)
assert A == B == C, (len(A), len(B), len(C))
print
L = [None] * 1000000
print 'def noop(x): pass'
print 'L = [None] * 1000000'
t = time.time()
A = map(noop, L)
print '\tmap(noop, L):\n\t\t%s seconds' % \
(time.time() - t)
t = time.time()
B = pool.map(noop, L)
print '\tpool.map(noop, L):\n\t\t%s seconds' % \
(time.time() - t)
t = time.time()
C = list(pool.imap(noop, L, chunksize=len(L)//8))
print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
(len(L)//8, time.time() - t)
assert A == B == C, (len(A), len(B), len(C))
print
del A, B, C, L
#
# Test error handling
#
print 'Testing error handling:'
try:
print pool.apply(f, (5,))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from pool.apply()'
else:
raise AssertionError('expected ZeroDivisionError')
try:
print pool.map(f, range(10))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from pool.map()'
else:
raise AssertionError('expected ZeroDivisionError')
try:
print list(pool.imap(f, range(10)))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from list(pool.imap())'
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, range(10))
for i in range(10):
try:
x = it.next()
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
print
#
# Testing timeouts
#
print 'Testing ApplyResult.get() with timeout:',
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print
print
print 'Testing IMapIterator.next() with timeout:',
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print
print
#
# Testing callback
#
print 'Testing callback:'
A = []
B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
r = pool.apply_async(mul, (7, 8), callback=A.append)
r.wait()
r = pool.map_async(pow3, range(10), callback=A.extend)
r.wait()
if A == B:
print '\tcallbacks succeeded\n'
else:
print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
#
# Check there are no outstanding tasks
#
assert not pool._cache, 'cache = %r' % pool._cache
#
# Check close() methods
#
print 'Testing close():'
for worker in pool._pool:
assert worker.is_alive()
result = pool.apply_async(time.sleep, [0.5])
pool.close()
pool.join()
assert result.get() is None
for worker in pool._pool:
assert not worker.is_alive()
print '\tclose() succeeded\n'
#
# Check terminate() method
#
print 'Testing terminate():'
pool = multiprocessing.Pool(2)
DELTA = 0.1
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
pool.terminate()
pool.join()
for worker in pool._pool:
assert not worker.is_alive()
print '\tterminate() succeeded\n'
#
# Check garbage collection
#
print 'Testing garbage collection:'
pool = multiprocessing.Pool(2)
DELTA = 0.1
processes = pool._pool
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
results = pool = None
time.sleep(DELTA * 2)
for worker in processes:
assert not worker.is_alive()
print '\tgarbage collection succeeded\n'
if __name__ == '__main__':
multiprocessing.freeze_support()
assert len(sys.argv) in (1, 2)
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print ' Using processes '.center(79, '-')
elif sys.argv[1] == 'threads':
print ' Using threads '.center(79, '-')
import multiprocessing.dummy as multiprocessing
else:
print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
raise SystemExit(2)
test()
ロック、コンディションやキューのような同期の例を紹介します:
#
# A test file for the `multiprocessing` package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import time, sys, random
from Queue import Empty
import multiprocessing # may get overwritten
#### TEST_VALUE
def value_func(running, mutex):
random.seed()
time.sleep(random.random()*4)
mutex.acquire()
print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
running.value -= 1
mutex.release()
def test_value():
TASKS = 10
running = multiprocessing.Value('i', TASKS)
mutex = multiprocessing.Lock()
for i in range(TASKS):
p = multiprocessing.Process(target=value_func, args=(running, mutex))
p.start()
while running.value > 0:
time.sleep(0.08)
mutex.acquire()
print running.value,
sys.stdout.flush()
mutex.release()
print
print 'No more running processes'
#### TEST_QUEUE
def queue_func(queue):
for i in range(30):
time.sleep(0.5 * random.random())
queue.put(i*i)
queue.put('STOP')
def test_queue():
q = multiprocessing.Queue()
p = multiprocessing.Process(target=queue_func, args=(q,))
p.start()
o = None
while o != 'STOP':
try:
o = q.get(timeout=0.3)
print o,
sys.stdout.flush()
except Empty:
print 'TIMEOUT'
print
#### TEST_CONDITION
def condition_func(cond):
cond.acquire()
print '\t' + str(cond)
time.sleep(2)
print '\tchild is notifying'
print '\t' + str(cond)
cond.notify()
cond.release()
def test_condition():
cond = multiprocessing.Condition()
p = multiprocessing.Process(target=condition_func, args=(cond,))
print cond
cond.acquire()
print cond
cond.acquire()
print cond
p.start()
print 'main is waiting'
cond.wait()
print 'main has woken up'
print cond
cond.release()
print cond
cond.release()
p.join()
print cond
#### TEST_SEMAPHORE
def semaphore_func(sema, mutex, running):
sema.acquire()
mutex.acquire()
running.value += 1
print running.value, 'tasks are running'
mutex.release()
random.seed()
time.sleep(random.random()*2)
mutex.acquire()
running.value -= 1
print '%s has finished' % multiprocessing.current_process()
mutex.release()
sema.release()
def test_semaphore():
sema = multiprocessing.Semaphore(3)
mutex = multiprocessing.RLock()
running = multiprocessing.Value('i', 0)
processes = [
multiprocessing.Process(target=semaphore_func,
args=(sema, mutex, running))
for i in range(10)
]
for p in processes:
p.start()
for p in processes:
p.join()
#### TEST_JOIN_TIMEOUT
def join_timeout_func():
print '\tchild sleeping'
time.sleep(5.5)
print '\n\tchild terminating'
def test_join_timeout():
p = multiprocessing.Process(target=join_timeout_func)
p.start()
print 'waiting for process to finish'
while 1:
p.join(timeout=1)
if not p.is_alive():
break
print '.',
sys.stdout.flush()
#### TEST_EVENT
def event_func(event):
print '\t%r is waiting' % multiprocessing.current_process()
event.wait()
print '\t%r has woken up' % multiprocessing.current_process()
def test_event():
event = multiprocessing.Event()
processes = [multiprocessing.Process(target=event_func, args=(event,))
for i in range(5)]
for p in processes:
p.start()
print 'main is sleeping'
time.sleep(2)
print 'main is setting event'
event.set()
for p in processes:
p.join()
#### TEST_SHAREDVALUES
def sharedvalues_func(values, arrays, shared_values, shared_arrays):
for i in range(len(values)):
v = values[i][1]
sv = shared_values[i].value
assert v == sv
for i in range(len(values)):
a = arrays[i][1]
sa = list(shared_arrays[i][:])
assert a == sa
print 'Tests passed'
def test_sharedvalues():
values = [
('i', 10),
('h', -2),
('d', 1.25)
]
arrays = [
('i', range(100)),
('d', [0.25 * i for i in range(100)]),
('H', range(1000))
]
shared_values = [multiprocessing.Value(id, v) for id, v in values]
shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
p = multiprocessing.Process(
target=sharedvalues_func,
args=(values, arrays, shared_values, shared_arrays)
)
p.start()
p.join()
assert p.exitcode == 0
####
def test(namespace=multiprocessing):
global multiprocessing
multiprocessing = namespace
for func in [ test_value, test_queue, test_condition,
test_semaphore, test_join_timeout, test_event,
test_sharedvalues ]:
print '\n\t######## %s\n' % func.__name__
func()
ignore = multiprocessing.active_children() # cleanup any old processes
if hasattr(multiprocessing, '_debug_info'):
info = multiprocessing._debug_info()
if info:
print info
raise ValueError('there should be no positive refcounts left')
if __name__ == '__main__':
multiprocessing.freeze_support()
assert len(sys.argv) in (1, 2)
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print ' Using processes '.center(79, '-')
namespace = multiprocessing
elif sys.argv[1] == 'manager':
print ' Using processes and a manager '.center(79, '-')
namespace = multiprocessing.Manager()
namespace.Process = multiprocessing.Process
namespace.current_process = multiprocessing.current_process
namespace.active_children = multiprocessing.active_children
elif sys.argv[1] == 'threads':
print ' Using threads '.center(79, '-')
import multiprocessing.dummy as namespace
else:
print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
raise SystemExit(2)
test(namespace)
ワーカープロセスのコレクションに対してタスクをフィードしてその結果をまとめるキューの使い方の例を紹介します:
#
# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue. If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS1)):
print '\t', done_queue.get()
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()
ワーカープロセスのプールが1つのソケットを共有してそれぞれの SimpleHTTPServer.HttpServer インスタンスを実行する方法の例を紹介します。
#
# Example where a pool of http servers share a single listening socket
#
# On Windows this module depends on the ability to pickle a socket
# object so that the worker processes can inherit a copy of the server
# object. (We import `multiprocessing.reduction` to enable this pickling.)
#
# Not sure if we should synchronize access to `socket.accept()` method by
# using a process-shared lock -- does not seem to be necessary.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import os
import sys
from multiprocessing import Process, current_process, freeze_support
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
if sys.platform == 'win32':
import multiprocessing.reduction # make sockets pickable/inheritable
def note(format, *args):
sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args))
class RequestHandler(SimpleHTTPRequestHandler):
# we override log_message() to show which process is handling the request
def log_message(self, format, *args):
note(format, *args)
def serve_forever(server):
note('starting server')
try:
server.serve_forever()
except KeyboardInterrupt:
pass
def runpool(address, number_of_processes):
# create a single server object -- children will each inherit a copy
server = HTTPServer(address, RequestHandler)
# create child processes to act as workers
for i in range(number_of_processes-1):
Process(target=serve_forever, args=(server,)).start()
# main process also acts as a worker
serve_forever(server)
def test():
DIR = os.path.join(os.path.dirname(__file__), '..')
ADDRESS = ('localhost', 8000)
NUMBER_OF_PROCESSES = 4
print 'Serving at http://%s:%d using %d worker processes' % \
(ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES)
print 'To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32']
os.chdir(DIR)
runpool(ADDRESS, NUMBER_OF_PROCESSES)
if __name__ == '__main__':
freeze_support()
test()
multiprocessing と threading を比較した簡単なベンチマークです:
#
# Simple benchmarks for the multiprocessing package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import time, sys, multiprocessing, threading, Queue, gc
if sys.platform == 'win32':
_timer = time.clock
else:
_timer = time.time
delta = 1
#### TEST_QUEUESPEED
def queuespeed_func(q, c, iterations):
a = '0' * 256
c.acquire()
c.notify()
c.release()
for i in xrange(iterations):
q.put(a)
q.put('STOP')
def test_queuespeed(Process, q, c):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = Process(target=queuespeed_func, args=(q, c, iterations))
c.acquire()
p.start()
c.wait()
c.release()
result = None
t = _timer()
while result != 'STOP':
result = q.get()
elapsed = _timer() - t
p.join()
print iterations, 'objects passed through the queue in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_PIPESPEED
def pipe_func(c, cond, iterations):
a = '0' * 256
cond.acquire()
cond.notify()
cond.release()
for i in xrange(iterations):
c.send(a)
c.send('STOP')
def test_pipespeed():
c, d = multiprocessing.Pipe()
cond = multiprocessing.Condition()
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = multiprocessing.Process(target=pipe_func,
args=(d, cond, iterations))
cond.acquire()
p.start()
cond.wait()
cond.release()
result = None
t = _timer()
while result != 'STOP':
result = c.recv()
elapsed = _timer() - t
p.join()
print iterations, 'objects passed through connection in',elapsed,'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_SEQSPEED
def test_seqspeed(seq):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
t = _timer()
for i in xrange(iterations):
a = seq[5]
elapsed = _timer()-t
print iterations, 'iterations in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_LOCK
def test_lockspeed(l):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
t = _timer()
for i in xrange(iterations):
l.acquire()
l.release()
elapsed = _timer()-t
print iterations, 'iterations in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_CONDITION
def conditionspeed_func(c, N):
c.acquire()
c.notify()
for i in xrange(N):
c.wait()
c.notify()
c.release()
def test_conditionspeed(Process, c):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
c.acquire()
p = Process(target=conditionspeed_func, args=(c, iterations))
p.start()
c.wait()
t = _timer()
for i in xrange(iterations):
c.notify()
c.wait()
elapsed = _timer()-t
c.release()
p.join()
print iterations * 2, 'waits in', elapsed, 'seconds'
print 'average number/sec:', iterations * 2 / elapsed
####
def test():
manager = multiprocessing.Manager()
gc.disable()
print '\n\t######## testing Queue.Queue\n'
test_queuespeed(threading.Thread, Queue.Queue(),
threading.Condition())
print '\n\t######## testing multiprocessing.Queue\n'
test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
multiprocessing.Condition())
print '\n\t######## testing Queue managed by server process\n'
test_queuespeed(multiprocessing.Process, manager.Queue(),
manager.Condition())
print '\n\t######## testing multiprocessing.Pipe\n'
test_pipespeed()
print
print '\n\t######## testing list\n'
test_seqspeed(range(10))
print '\n\t######## testing list managed by server process\n'
test_seqspeed(manager.list(range(10)))
print '\n\t######## testing Array("i", ..., lock=False)\n'
test_seqspeed(multiprocessing.Array('i', range(10), lock=False))
print '\n\t######## testing Array("i", ..., lock=True)\n'
test_seqspeed(multiprocessing.Array('i', range(10), lock=True))
print
print '\n\t######## testing threading.Lock\n'
test_lockspeed(threading.Lock())
print '\n\t######## testing threading.RLock\n'
test_lockspeed(threading.RLock())
print '\n\t######## testing multiprocessing.Lock\n'
test_lockspeed(multiprocessing.Lock())
print '\n\t######## testing multiprocessing.RLock\n'
test_lockspeed(multiprocessing.RLock())
print '\n\t######## testing lock managed by server process\n'
test_lockspeed(manager.Lock())
print '\n\t######## testing rlock managed by server process\n'
test_lockspeed(manager.RLock())
print
print '\n\t######## testing threading.Condition\n'
test_conditionspeed(threading.Thread, threading.Condition())
print '\n\t######## testing multiprocessing.Condition\n'
test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
print '\n\t######## testing condition managed by a server process\n'
test_conditionspeed(multiprocessing.Process, manager.Condition())
gc.enable()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
