concurrent.futures -- 並列タスク実行¶
バージョン 3.2 で追加.
ソースコード: Lib/concurrent/futures/thread.py および Lib/concurrent/futures/process.py
concurrent.futures モジュールは、非同期に実行できる呼び出し可能オブジェクトの高水準のインターフェースを提供します。
非同期実行は ThreadPoolExecutor を用いてスレッドで実行することも、  ProcessPoolExecutor を用いて別々のプロセスで実行することもできます.  どちらも Executor 抽象クラスで定義された同じインターフェースを実装します。
Executor オブジェクト¶
- 
class concurrent.futures.Executor¶
- 非同期呼び出しを実行するためのメソッドを提供する抽象クラスです。このクラスを直接使ってはならず、具象サブクラスを介して使います。 - 
submit(fn, /, *args, **kwargs)¶
- Schedules the callable, fn, to be executed as - fn(*args, **kwargs)and returns a- Futureobject representing the execution of the callable.- with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result()) 
 - 
map(func, *iterables, timeout=None, chunksize=1)¶
- Similar to - map(func, *iterables)except:- the iterables are collected immediately rather than lazily; 
- func is executed asynchronously and several calls to func may be made concurrently. 
 - もし - __next__()が呼ばれその結果が元々の呼び出しから timeout 秒経った後も利用できない場合、返されるイテレータは- concurrent.futures.TimeoutErrorを送出します。timeout は整数または浮動小数点数です。もし timeout が指定されないか の場合、待ち時間に制限はありません。- もし func の呼び出しが例外を送出した場合、その例外はイテレータから値を受け取る時に送出されます。 - When using - ProcessPoolExecutor, this method chops iterables into a number of chunks which it submits to the pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer. For very long iterables, using a large value for chunksize can significantly improve performance compared to the default size of 1. With- ThreadPoolExecutor, chunksize has no effect.- バージョン 3.5 で変更: chunksize 引数が追加されました。 
 - 
shutdown(wait=True, *, cancel_futures=False)¶
- executor に対して、現在保留中のフューチャーが実行された後で、使用中のすべての資源を解放するように伝えます。シャットダウンにより後に - Executor.submit()と- Executor.map()を呼び出すと- RuntimeErrorが送出されます。- wait が - Trueの場合、すべての未完了のフューチャの実行が完了して Executor に関連付けられたリソースが解放されるまで、このメソッドは返りません。 wait が- Falseの場合、このメソッドはすぐに返り、すべての未完了のフューチャの実行が完了したときに、 Executor に関連付けられたリソースが解放されます。 wait の値に関係なく、すべての未完了のフューチャの実行が完了するまで Python プログラム全体は終了しません。- If cancel_futures is - True, this method will cancel all pending futures that the executor has not started running. Any futures that are completed or running won't be cancelled, regardless of the value of cancel_futures.- If both cancel_futures and wait are - True, all futures that the executor has started running will be completed prior to this method returning. The remaining futures are cancelled.- with文を使用することで、このメソッドを明示的に呼ばないようにできます。- with文は- Executorをシャットダウンします (wait を- Trueにセットして- Executor.shutdown()が呼ばれたかのように待ちます)。- import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt') - バージョン 3.9 で変更: cancel_futures が追加されました。 
 
- 
ThreadPoolExecutor¶
ThreadPoolExecutor はスレッドのプールを使用して非同期に呼び出しを行う、 Executor のサブクラスです。
Future に関連づけられた呼び出し可能オブジェクトが、別の Future の結果を待つ時にデッドロックすることがあります。例:
import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5
def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
以下でも同様です:
def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
- 
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())¶
- 最大で max_workers 個のスレッドを非同期実行に使う - Executorのサブクラスです。- initializer is an optional callable that is called at the start of each worker thread; initargs is a tuple of arguments passed to the initializer. Should initializer raise an exception, all currently pending jobs will raise a - BrokenThreadPool, as well as any attempt to submit more jobs to the pool.- バージョン 3.5 で変更: max_workers が - Noneか指定されない場合のデフォルト値はマシンのプロセッサの数に 5 を掛けたものになります。これは、- ThreadPoolExecutorは CPU の処理ではなく I/O をオーバーラップするのによく使用されるため、- ProcessPoolExecutorのワーカーの数よりもこのワーカーの数を増やすべきであるという想定に基づいています。- バージョン 3.6 で追加: thread_name_prefix 引数が追加され、デバッグしやすくなるようにプールから作られたワーカスレッド - threading.Threadの名前を管理できるようになりました。- バージョン 3.7 で変更: initializer と initargs 引数が追加されました。 - バージョン 3.8 で変更: Default value of max_workers is changed to - min(32, os.cpu_count() + 4). This default value preserves at least 5 workers for I/O bound tasks. It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using very large resources implicitly on many-core machines.- ThreadPoolExecutor now reuses idle worker threads before starting max_workers worker threads too. 
ThreadPoolExecutor の例¶
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistant-subdomain.python.org/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor¶
ProcessPoolExecutor  はプロセスプールを使って非同期呼び出しを実施する Executor のサブクラスです。ProcessPoolExecutor は multiprocessing モジュールを利用します。このため Global Interpreter Lock を回避することができますが、pickle 化できるオブジェクトしか実行したり返したりすることができません。
__main__ モジュールはワーカサブプロセスでインポート可能でなければなりません。
すなわち、 ProcessPoolExecutor は対話的インタープリタでは動きません。
ProcessPoolExecutor に渡された呼び出し可能オブジェクトから Executor や Future メソッドを呼ぶとデッドロックに陥ります。
- 
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())¶
- An - Executorsubclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is- Noneor not given, it will default to the number of processors on the machine. If max_workers is less than or equal to- 0, then a- ValueErrorwill be raised. On Windows, max_workers must be less than or equal to- 61. If it is not then- ValueErrorwill be raised. If max_workers is- None, then the default chosen will be at most- 61, even if more processors are available. mp_context can be a multiprocessing context or None. It will be used to launch the workers. If mp_context is- Noneor not given, the default multiprocessing context is used.- initializer is an optional callable that is called at the start of each worker process; initargs is a tuple of arguments passed to the initializer. Should initializer raise an exception, all currently pending jobs will raise a - BrokenProcessPool, as well as any attempt to submit more jobs to the pool.- バージョン 3.3 で変更: ワーカプロセスの1つが突然終了した場合、 - BrokenProcessPoolエラーが送出されるようになりました。 以前は挙動は未定義でしたが、 executor や futures がフリーズしたりデッドロックを起こすことがしばしばでした。- バージョン 3.7 で変更: The mp_context argument was added to allow users to control the start_method for worker processes created by the pool. - initializer と initargs 引数が追加されました。 
ProcessPoolExecutor の例¶
import concurrent.futures
import math
PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]
def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True
def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
    main()
Future オブジェクト¶
Future クラスは呼び出し可能オブジェクトの非同期実行をカプセル化します。 Future のインスタンスは Executor.submit() によって生成されます。
- 
class concurrent.futures.Future¶
- 呼び出し可能オブジェクトの非同期実行をカプセル化します。 - Futureインスタンスは- Executor.submit()で生成され、テストを除いて直接生成すべきではありません。- 
cancel()¶
- Attempt to cancel the call. If the call is currently being executed or finished running and cannot be cancelled then the method will return - False, otherwise the call will be cancelled and the method will return- True.
 - 
cancelled()¶
- 呼び出しが正常にキャンセルされた場合 - Trueを返します。
 - 
running()¶
- 現在呼び出しが実行中でキャンセルできない場合 - Trueを返します。
 - 
done()¶
- 呼び出しが正常にキャンセルされたか終了した場合 - Trueを返します。
 - 
result(timeout=None)¶
- 呼び出しによって返された値を返します。呼び出しがまだ完了していない場合、このメソッドは timeout 秒の間待機します。呼び出しが timeout 秒間の間に完了しない場合、 - concurrent.futures.TimeoutErrorが送出されます。 timeout にはintかfloatを指定できます。timeout が指定されていないか、- Noneである場合、待機時間に制限はありません。- future が完了する前にキャンセルされた場合 - CancelledErrorが送出されます。- If the call raised an exception, this method will raise the same exception. 
 - 
exception(timeout=None)¶
- 呼び出しによって送出された例外を返します。呼び出しがまだ完了していない場合、このメソッドは timeout 秒だけ待機します。呼び出しが timeout 秒の間に完了しない場合、 - concurrent.futures.TimeoutErrorが送出されます。 timeout にはintかfloatを指定できます。 timeout が指定されていないか、- Noneである場合、待機時間に制限はありません。- future が完了する前にキャンセルされた場合 - CancelledErrorが送出されます。- 呼び出しが例外を送出することなく完了した場合、 - Noneを返します。
 - 
add_done_callback(fn)¶
- 呼び出し可能な fn オブジェクトを future にアタッチします。futureがキャンセルされたか、実行を終了した際に、future をそのただ一つの引数として fn が呼び出されます。 - 追加された呼び出し可能オブジェクトは、追加された順番で呼びだされ、追加を行ったプロセスに属するスレッド中で呼び出されます。もし呼び出し可能オブジェクトが - Exceptionのサブクラスを送出した場合、それはログに記録され無視されます。呼び出し可能オブジェクトが- BaseExceptionのサブクラスを送出した場合の動作は未定義です。- もしfutureがすでに完了しているか、キャンセル済みであれば、fn は即座に実行されます。 
 - 以下の - Futureメソッドは、ユニットテストでの使用と- Executorを実装することを意図しています。- 
set_running_or_notify_cancel()¶
- このメソッドは、 - Futureに関連付けられたワークやユニットテストによるワークの実行前に、- Executorの実装によってのみ呼び出してください。- このメソッドが - Falseを返す場合、- Futureはキャンセルされています。つまり、- Future.cancel()が呼び出されて True が返っています。- Futureの完了を (- as_completed()または- wait()により) 待機するすべてのスレッドが起動します。- このメソッドが - Trueを返す場合、- Futureはキャンセルされて、実行状態に移行されています。つまり、- Future.running()を呼び出すと True が返ります。- このメソッドは、一度だけ呼び出すことができ、 - Future.set_result()または- Future.set_exception()がキャンセルされた後には呼び出すことができません。
 - 
set_result(result)¶
- Futureに関連付けられたワークの結果を result に設定します。- このメソッドは、 - Executorの実装またはユニットテストによってのみ使用してください。- バージョン 3.8 で変更: This method raises - concurrent.futures.InvalidStateErrorif the- Futureis already done.
 
- 
モジュール関数¶
- 
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)¶
- Wait for the - Futureinstances (possibly created by different- Executorinstances) given by fs to complete. Duplicate futures given to fs are removed and will be returned only once. Returns a named 2-tuple of sets. The first set, named- done, contains the futures that completed (finished or cancelled futures) before the wait completed. The second set, named- not_done, contains the futures that did not complete (pending or running futures).- timeout で結果を返すまで待機する最大秒数を指定できます。timeout は整数か浮動小数点数をとります。timeout が指定されないか - Noneの場合、無期限に待機します。- return_when でこの関数がいつ結果を返すか指定します。指定できる値は以下の 定数のどれか一つです: - 定数 - 説明 - FIRST_COMPLETED- いずれかのフューチャが終了したかキャンセルされたときに返します。 - FIRST_EXCEPTION- いずれかのフューチャが例外の送出で終了した場合に返します。例外を送出したフューチャがない場合は、 - ALL_COMPLETEDと等価になります。- ALL_COMPLETED- すべてのフューチャが終了したかキャンセルされたときに返します。 
- 
concurrent.futures.as_completed(fs, timeout=None)¶
- Returns an iterator over the - Futureinstances (possibly created by different- Executorinstances) given by fs that yields futures as they complete (finished or cancelled futures). Any futures given by fs that are duplicated will be returned once. Any futures that completed before- as_completed()is called will be yielded first. The returned iterator raises a- concurrent.futures.TimeoutErrorif- __next__()is called and the result isn't available after timeout seconds from the original call to- as_completed(). timeout can be an int or float. If timeout is not specified or- None, there is no limit to the wait time.
参考
- PEP 3148 -- futures - execute computations asynchronously
- この機能を Python 標準ライブラリに含めることを述べた提案です。 
例外クラス¶
- 
exception concurrent.futures.CancelledError¶
- future がキャンセルされたときに送出されます。 
- 
exception concurrent.futures.TimeoutError¶
- future の操作が与えられたタイムアウトを超過したときに送出されます。 
- 
exception concurrent.futures.BrokenExecutor¶
- Derived from - RuntimeError, this exception class is raised when an executor is broken for some reason, and cannot be used to submit or execute new tasks.- バージョン 3.7 で追加. 
- 
exception concurrent.futures.InvalidStateError¶
- Raised when an operation is performed on a future that is not allowed in the current state. - バージョン 3.8 で追加. 
- 
exception concurrent.futures.thread.BrokenThreadPool¶
- Derived from - BrokenExecutor, this exception class is raised when one of the workers of a- ThreadPoolExecutorhas failed initializing.- バージョン 3.7 で追加. 
- 
exception concurrent.futures.process.BrokenProcessPool¶
- BrokenExecutorから派生しています(以前は- RuntimeErrorでした)。 この例外クラスは- ProcessPoolExecutorのワーカの1つが正常に終了されなかったとき (例えば外部から kill されたとき) に送出されます。- バージョン 3.3 で追加.