multiprocessing — Process-based parallelism

Вихідний код: Lib/multiprocessing/


Вступ

multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

Модуль multiprocessing також представляє API, які не мають аналогів у модулі threading. Яскравим прикладом цього є об’єкт Pool, який пропонує зручний засіб розпаралелювання виконання функції для кількох вхідних значень, розподіляючи вхідні дані між процесами (паралелізм даних). Наступний приклад демонструє звичайну практику визначення таких функцій у модулі, щоб дочірні процеси могли успішно імпортувати цей модуль. Цей базовий приклад паралелізму даних з використанням Pool,

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

друкуватиме стандартний вихід

[1, 4, 9]

Клас Process

У multiprocessing процеси породжуються шляхом створення об’єкта Process і виклику його методу start(). Process відповідає API threading.Thread. Тривіальним прикладом багатопроцесорної програми є:

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Щоб показати ідентифікатори окремих процесів, ось розширений приклад:

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Щоб отримати пояснення, чому потрібна частина if __name__ == '__main__'', див. Інструкції з програмування.

Контексти та методи запуску

Залежно від платформи multiprocessing підтримує три способи запуску процесу. Це методи запуску

спаун

The parent process starts a fresh python interpreter process. The child process will only inherit those resources necessary to run the process object’s run() method. In particular, unnecessary file descriptors and handles from the parent process will not be inherited. Starting a process using this method is rather slow compared to using fork or forkserver.

Available on Unix and Windows. The default on Windows and macOS.

вилка

Батьківський процес використовує os.fork() для розгалуження інтерпретатора Python. Дочірній процес, коли він починається, фактично ідентичний батьківському процесу. Усі ресурси батьківського процесу успадковуються дочірнім процесом. Зверніть увагу, що безпечне розгалуження багатопотокового процесу є проблематичним.

Available on Unix only. The default on Unix.

форксервер

When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use os.fork(). No unnecessary resources are inherited.

Available on Unix platforms which support passing file descriptors over Unix pipes.

Змінено в версії 3.8: У macOS метод запуску spawn тепер є типовим. Метод запуску fork слід вважати небезпечним, оскільки він може призвести до збою підпроцесу. Див. bpo-33725.

Змінено в версії 3.4: spawn added on all unix platforms, and forkserver added for some unix platforms. Child processes no longer inherit all of the parents inheritable handles on Windows.

On Unix using the spawn or forkserver start methods will also start a resource tracker process which tracks the unlinked named system resources (such as named semaphores or SharedMemory objects) created by processes of the program. When all processes have exited the resource tracker unlinks any remaining tracked object. Usually there should be none, but if a process was killed by a signal there may be some «leaked» resources. (Neither leaked semaphores nor shared memory segments will be automatically unlinked until the next reboot. This is problematic for both objects because the system allows only a limited number of named semaphores, and shared memory segments occupy some space in the main memory.)

Щоб вибрати метод запуску, ви використовуєте set_start_method() в пункті if __name__ == '__main__' головного модуля. Наприклад:

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() не слід використовувати більше одного разу в програмі.

Крім того, ви можете використовувати get_context(), щоб отримати об’єкт контексту. Контекстні об’єкти мають той самий API, що й багатопроцесорний модуль, і дозволяють використовувати кілька методів запуску в одній програмі.

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

Зауважте, що об’єкти, пов’язані з одним контекстом, можуть бути несумісними з процесами для іншого контексту. Зокрема, блокування, створені за допомогою контексту fork, не можна передати процесам, запущеним за допомогою методів запуску spawn або forkserver.

Бібліотека, яка хоче використовувати певний метод запуску, ймовірно, повинна використовувати get_context(), щоб уникнути втручання у вибір користувача бібліотеки.

Попередження

The 'spawn' and 'forkserver' start methods cannot currently be used with «frozen» executables (i.e., binaries produced by packages like PyInstaller and cx_Freeze) on Unix. The 'fork' start method does work.

Обмін об’єктами між процесами

multiprocessing підтримує два типи каналів зв’язку між процесами:

Черги

Клас Queue є майже клоном queue.Queue. Наприклад:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

Queues are thread and process safe.

Труби

Функція Pipe() повертає пару об’єктів з’єднання, з’єднаних трубою, яка за умовчанням є дуплексною (двосторонньою). Наприклад:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Два об’єкти з’єднання, які повертає Pipe(), представляють два кінці труби. Кожен об’єкт підключення має методи send() і recv() (серед інших). Зауважте, що дані в каналі можуть бути пошкоджені, якщо два процеси (або потоки) намагаються зчитувати або писати в той самий кінець каналу одночасно. Звичайно, немає ризику пошкодження через процеси, що використовують різні кінці труби одночасно.

Синхронізація між процесами

multiprocessing містить еквіваленти всіх примітивів синхронізації з threading. Наприклад, можна використовувати блокування, щоб гарантувати, що лише один процес друкує на стандартний вивід одночасно:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

Без використання блокування вихідні дані різних процесів можуть переплутатися.

Спільне використання стану між процесами

Як згадувалося вище, під час паралельного програмування зазвичай краще уникати використання спільного стану, наскільки це можливо. Це особливо актуально при використанні кількох процесів.

Однак, якщо вам справді потрібно використовувати спільні дані, тоді multiprocessing надає кілька способів зробити це.

Спільна пам’ять

Дані можна зберігати в спільній карті пам’яті за допомогою Value або Array. Наприклад, такий код:

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

надрукую

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Аргументи 'd' і 'i', що використовуються під час створення num і arr, є кодами типу, які використовуються модулем array: ' d'' вказує на число з плаваючою точкою подвійної точності, а 'i'' вказує на ціле число зі знаком. Ці спільні об’єкти будуть безпечними для процесу та потоків.

Для більшої гнучкості використання спільної пам’яті можна використовувати модуль multiprocessing.sharedctypes, який підтримує створення довільних об’єктів ctypes, виділених із спільної пам’яті.

Серверний процес

Об’єкт менеджера, який повертає Manager(), контролює серверний процес, який містить об’єкти Python і дозволяє іншим процесам маніпулювати ними за допомогою проксі-серверів.

Менеджер, який повертає Manager(), підтримуватиме типи list, dict, Namespace, Lock, RLock і Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. Наприклад,

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

надрукую

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Менеджери серверних процесів є більш гнучкими, ніж використання спільних об’єктів пам’яті, оскільки вони можуть підтримувати довільні типи об’єктів. Крім того, один менеджер може спільно використовуватися процесами на різних комп’ютерах у мережі. Однак вони повільніші, ніж використання спільної пам’яті.

Використання пулу працівників

Клас Pool представляє пул робочих процесів. Він має методи, які дозволяють розвантажувати завдання на робочі процеси кількома різними способами.

Наприклад:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

Зверніть увагу, що методи пулу повинні використовуватися тільки тим процесом, який його створив.

Примітка

Функціональність цього пакета вимагає, щоб модуль __main__ міг імпортуватися дочірніми елементами. Це описано в Інструкції з програмування, проте тут варто звернути увагу на це. Це означає, що деякі приклади, такі як приклади multiprocessing.pool.Pool не працюватимуть в інтерактивному інтерпретаторі. Наприклад:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> with p:
...   p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(Якщо ви спробуєте це, він фактично виведе три повні трасування, чергувані напіввипадковим чином, і тоді вам, можливо, доведеться якось зупинити батьківський процес.)

довідка

Пакет multiprocessing здебільшого повторює API модуля threading.

Process і винятки

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Об’єкти процесу представляють діяльність, яка виконується в окремому процесі. Клас Process має еквіваленти всіх методів threading.Thread.

The constructor should always be called with keyword arguments. group should always be None; it exists solely for compatibility with threading.Thread. target is the callable object to be invoked by the run() method. It defaults to None, meaning nothing is called. name is the process name (see name for more details). args is the argument tuple for the target invocation. kwargs is a dictionary of keyword arguments for the target invocation. If provided, the keyword-only daemon argument sets the process daemon flag to True or False. If None (the default), this flag will be inherited from the creating process.

By default, no arguments are passed to target.

Якщо підклас перевизначає конструктор, він повинен переконатися, що він викликає конструктор базового класу (Process.__init__()), перш ніж щось робити з процесом.

Змінено в версії 3.3: Added the daemon argument.

run()

Метод, що представляє діяльність процесу.

Ви можете перевизначити цей метод у підкласі. Стандартний метод run() викликає викликаний об’єкт, переданий конструктору об’єкта як цільовий аргумент, якщо такий є, з послідовними аргументами та ключовими аргументами, взятими з аргументів args і kwargs відповідно.

start()

Запустіть процес.

Це має бути викликано щонайбільше один раз на об’єкт процесу. Він організовує виклик методу run() об’єкта в окремому процесі.

join([timeout])

Якщо додатковий аргумент timeout має значення None (за замовчуванням), метод блокується, доки процес, чий метод join() викликається, не завершиться. Якщо timeout є додатним числом, воно блокує щонайбільше timeout секунд. Зауважте, що метод повертає None, якщо його процес завершується або якщо метод закінчився. Перевірте exitcode процесу, щоб визначити, чи він завершився.

До процесу можна приєднуватися багато разів.

Процес не може приєднатися до себе, оскільки це спричинить тупикову блокування. Спроба приєднатися до процесу до його запуску є помилкою.

name

Назва процесу. Ім’я - це рядок, який використовується лише для ідентифікації. Він не має семантики. Кілька процесів можуть мати однакові назви.

Початкове ім’я задається конструктором. Якщо конструктору не надано явної назви, ім’я у формі „Process-N1:N2:…:Nk“ побудований, де кожен Nk є N-м дочірнім елементом свого батька.

is_alive()

Повернути, чи процес активний.

Приблизно, об’єкт процесу живий з моменту повернення методу start() до завершення дочірнього процесу.

daemon

Прапор демона процесу, логічне значення. Це має бути встановлено перед викликом start().

Початкове значення успадковується від процесу створення.

Коли процес завершується, він намагається завершити всі свої демонічні дочірні процеси.

Зауважте, що демонічному процесу не дозволяється створювати дочірні процеси. Інакше демонічний процес залишив би своїх нащадків сиротами, якщо він буде припинений під час завершення процесу батьківського процесу. Крім того, це не демони чи служби Unix, це звичайні процеси, які будуть припинені (і не приєднані), якщо недемонічні процеси вийшли.

Окрім API threading.Thread, об’єкти Process також підтримують такі атрибути та методи:

pid

Поверніть ідентифікатор процесу. До того, як процес буде створено, це буде None.

exitcode

Код виходу дитини. Це буде None, якщо процес ще не завершено.

Якщо дочірній метод run() повернувся нормально, код виходу буде 0. Якщо він закінчився через sys.exit() із цілим аргументом N, код виходу буде N.

Якщо дочірній процес завершився через виняток, який не було перехоплено в run(), код виходу буде 1. Якщо його було завершено сигналом N, код виходу матиме від’ємне значення -N.

authkey

Ключ автентифікації процесу (рядок байтів).

Коли multiprocessing ініціалізовано, головному процесу призначається випадковий рядок за допомогою os.urandom().

Коли об’єкт Process створюється, він успадковує ключ автентифікації свого батьківського процесу, хоча це можна змінити, встановивши authkey інший байтовий рядок.

Перегляньте Ключі автентифікації.

sentinel

Числовий дескриптор системного об’єкта, який стане «готовим» після завершення процесу.

You can use this value if you want to wait on several events at once using multiprocessing.connection.wait(). Otherwise calling join() is simpler.

On Windows, this is an OS handle usable with the WaitForSingleObject and WaitForMultipleObjects family of API calls. On Unix, this is a file descriptor usable with primitives from the select module.

Нове в версії 3.3.

terminate()

Terminate the process. On Unix this is done using the SIGTERM signal; on Windows TerminateProcess() is used. Note that exit handlers and finally clauses, etc., will not be executed.

Зверніть увагу, що процеси-нащадки процесу не будуть припинені – вони просто стануть сиротами.

Попередження

Якщо цей метод використовується, коли пов’язаний процес використовує канал або чергу, канал або чергу можуть бути пошкоджені та можуть стати непридатними для використання іншим процесом. Подібним чином, якщо процес отримав блокування або семафор тощо, його завершення може призвести до блокування інших процесів.

kill()

Same as terminate() but using the SIGKILL signal on Unix.

Нове в версії 3.7.

close()

Закрийте об’єкт Process, звільнивши всі пов’язані з ним ресурси. ValueError виникає, якщо основний процес все ще виконується. Після успішного повернення close() більшість інших методів і атрибутів об’єкта Process викличуть ValueError.

Нове в версії 3.7.

Зауважте, що методи start(), join(), is_alive(), terminate() і exitcode має викликати лише процес, який створив об’єкт процесу .

Приклад використання деяких методів Process:

 >>> import multiprocessing, time, signal
 >>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
 >>> print(p, p.is_alive())
 <Process ... initial> False
 >>> p.start()
 >>> print(p, p.is_alive())
 <Process ... started> True
 >>> p.terminate()
 >>> time.sleep(0.1)
 >>> print(p, p.is_alive())
 <Process ... stopped exitcode=-SIGTERM> False
 >>> p.exitcode == -signal.SIGTERM
 True
exception multiprocessing.ProcessError

Базовий клас усіх винятків multiprocessing.

exception multiprocessing.BufferTooShort

Exception raised by Connection.recv_bytes_into() when the supplied buffer object is too small for the message read.

Якщо e є екземпляром BufferTooShort, то e.args[0] видасть повідомлення як рядок байтів.

exception multiprocessing.AuthenticationError

Викликається, коли виникає помилка автентифікації.

exception multiprocessing.TimeoutError

Викликається методами з тайм-аутом, коли час очікування закінчується.

Труби та черги

При використанні кількох процесів зазвичай використовується передача повідомлень для зв’язку між процесами та уникається використання будь-яких примітивів синхронізації, таких як блокування.

Для передачі повідомлень можна використовувати Pipe() (для з’єднання між двома процесами) або чергу (що дозволяє використовувати кілька виробників і споживачів).

Типи Queue, SimpleQueue і JoinableQueue є чергами багатьох виробників і споживачів FIFO, змодельованими на основі queue.Queue клас у стандартній бібліотеці. Вони відрізняються тим, що Queue не має методів task_done() і join(), представлених у queue.Queue Python 2.5 клас.

Якщо ви використовуєте JoinableQueue, тоді ви має викликати JoinableQueue.task_done() для кожного завдання, вилученого з черги, інакше семафор, який використовується для підрахунку кількості незавершених завдань, може зрештою переповнитися, викликаючи виняток.

Зауважте, що можна також створити спільну чергу за допомогою об’єкта менеджера – див. Менеджери.

Примітка

multiprocessing використовує звичайні винятки queue.Empty і queue.Full, щоб сигналізувати про час очікування. Вони недоступні в просторі імен multiprocessing, тому їх потрібно імпортувати з queue.

Примітка

Коли об’єкт ставиться в чергу, об’єкт очищається, а фоновий потік пізніше скидає обрані дані в базовий канал. Це має деякі наслідки, які є трохи дивними, але не повинні викликати жодних практичних труднощів - якщо вони дійсно вас турбують, ви можете натомість використати чергу, створену за допомогою manager.

  1. Після розміщення об’єкта в порожній черзі може виникнути нескінченно мала затримка, перш ніж метод empty() черги поверне False, а get_nowait() зможе повернутися без виклику queue.Empty.

  2. Якщо кілька процесів ставлять об’єкти в чергу, об’єкти можуть бути отримані на іншому кінці не за порядком. Однак об’єкти, поставлені в чергу одним і тим же процесом, завжди будуть в очікуваному порядку один відносно одного.

Попередження

Якщо процес зупинено за допомогою Process.terminate() або os.kill() під час спроби використання Queue, то дані в черзі, ймовірно, будуть пошкоджені. Це може призвести до того, що будь-який інший процес отримає виняток, коли він спробує використати чергу пізніше.

Попередження

Як згадувалося вище, якщо дочірній процес поставив елементи в чергу (і він не використовував JoinableQueue.cancel_join_thread), тоді цей процес не завершиться, доки всі буферизовані елементи не будуть скинуті в канал.

Це означає, що якщо ви спробуєте приєднатися до цього процесу, ви можете отримати тупикову блокування, якщо ви не впевнені, що всі елементи, які були поставлені в чергу, використано. Так само, якщо дочірній процес є недемонічним, тоді батьківський процес може зависнути при виході, коли він намагається приєднатися до всіх своїх недемонічних дочірніх процесів.

Зауважте, що черга, створена за допомогою менеджера, не має цієї проблеми. Дивіться Інструкції з програмування.

Для прикладу використання черг для міжпроцесного зв’язку див. Приклади.

multiprocessing.Pipe([duplex])

Повертає пару об’єктів (conn1, conn2) Connection, що представляють кінці труби.

Якщо duplex має значення True (за замовчуванням), тоді канал є двонаправленим. Якщо duplex має значення False, тоді канал є односпрямованим: conn1 можна використовувати лише для отримання повідомлень, а conn2 — лише для надсилання повідомлень.

class multiprocessing.Queue([maxsize])

Повертає спільну чергу процесу, реалізовану за допомогою каналу та кількох блокувань/семафорів. Коли процес вперше ставить елемент у чергу, запускається потік, який передає об’єкти з буфера в канал.

Звичайні винятки queue.Empty і queue.Full із модуля queue стандартної бібліотеки створюються, щоб повідомити про час очікування.

Queue реалізує всі методи queue.Queue, крім task_done() і join().

qsize()

Повертає приблизний розмір черги. Через семантику багатопоточності/багатопроцесорності це число ненадійне.

Note that this may raise NotImplementedError on Unix platforms like macOS where sem_getvalue() is not implemented.

empty()

Повертає True, якщо черга порожня, False інакше. Через семантику багатопоточності/багатопроцесорності це ненадійно.

full()

Повертає True, якщо черга заповнена, False інакше. Через семантику багатопоточності/багатопроцесорності це ненадійно.

put(obj[, block[, timeout]])

Помістіть obj у чергу. Якщо необов’язковий аргумент block має значення True (за замовчуванням), а timeout має значення None (за замовчуванням), за потреби блокуйте, доки не з’явиться вільний слот. Якщо timeout є додатним числом, він блокує щонайбільше timeout секунд і викликає виняток queue.Full, якщо протягом цього часу не було вільного місця. В іншому випадку (block має значення False), помістіть елемент у чергу, якщо вільний слот є негайно доступним, інакше викликайте виняток queue.Full (timeout у цьому випадку ігнорується).

Змінено в версії 3.8: Якщо чергу закрито, замість AssertionError виникає ValueError.

put_nowait(obj)

Еквівалент put(obj, False).

get([block[, timeout]])

Видалити та повернути елемент із черги. Якщо необов’язкові аргументи block мають значення True (за замовчуванням), а timeoutNone (за замовчуванням), за потреби блокуйте, доки елемент не стане доступним. Якщо timeout є додатним числом, він блокує щонайбільше timeout секунд і викликає виняток queue.Empty, якщо жоден елемент не був доступний протягом цього часу. В іншому випадку (блок має значення False), повертає елемент, якщо він одразу доступний, інакше викликає виняток queue.Empty (у цьому випадку timeout ігнорується).

Змінено в версії 3.8: Якщо чергу закрито, замість OSError виникає ValueError.

get_nowait()

Еквівалент get(False).

multiprocessing.Queue має кілька додаткових методів, яких немає в queue.Queue. Ці методи зазвичай непотрібні для більшості коду:

close()

Укажіть, що поточний процес більше не додаватиме дані до цієї черги. Фоновий потік завершиться, коли всі буферизовані дані буде скинуто в канал. Це викликається автоматично, коли в черзі збирається сміття.

join_thread()

Приєднайтеся до фонової нитки. Це можна використовувати лише після виклику close(). Він блокується, доки не завершиться фоновий потік, гарантуючи, що всі дані в буфері скинуті в канал.

За замовчуванням, якщо процес не є творцем черги, після виходу він спробує приєднатися до фонового потоку черги. Процес може викликати cancel_join_thread(), щоб змусити join_thread() нічого не робити.

cancel_join_thread()

Запобігти блокуванню join_thread(). Зокрема, це запобігає автоматичному приєднанню фонового потоку під час завершення процесу – див. join_thread().

A better name for this method might be allow_exit_without_flush(). It is likely to cause enqueued data to lost, and you almost certainly will not need to use it. It is really only there if you need the current process to exit immediately without waiting to flush enqueued data to the underlying pipe, and you don’t care about lost data.

Примітка

Функціональність цього класу вимагає функціонуючої спільної реалізації семафора в головній операційній системі. Без нього функціональні можливості цього класу будуть вимкнені, а спроби створити екземпляр Queue призведуть до ImportError. Додаткову інформацію див. bpo-3770. Те саме стосується будь-якого зі спеціалізованих типів черги, перелічених нижче.

class multiprocessing.SimpleQueue

Це спрощений тип Queue, дуже схожий на заблокований Pipe.

close()

Закрийте чергу: звільніть внутрішні ресурси.

Після закриття чергу більше не можна використовувати. Наприклад, методи get(), put() і empty() більше не можна викликати.

Нове в версії 3.9.

empty()

Повертає True, якщо черга порожня, False інакше.

get()

Видалити та повернути елемент із черги.

put(item)

Поставте товар в чергу.

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue, підклас Queue, це черга, яка додатково має методи task_done() і join().

task_done()

Вказує на те, що завдання, яке раніше було в черзі, виконано. Використовується споживачами черги. Для кожного get(), який використовується для отримання завдання, наступний виклик task_done() повідомляє черзі, що обробку завдання завершено.

Якщо join() зараз блокується, воно відновиться, коли всі елементи будуть оброблені (це означає, що виклик task_done() отримано для кожного елемента, який був put() в чергу).

Викликає ValueError, якщо викликається стільки разів, скільки було елементів у черзі.

join()

Блокуйте, доки не буде отримано та оброблено всі елементи в черзі.

Кількість незавершених завдань зростає щоразу, коли елемент додається до черги. Підрахунок зменшується щоразу, коли споживач викликає task_done(), щоб вказати, що елемент отримано та вся робота над ним завершена. Коли кількість незавершених завдань падає до нуля, join() розблоковується.

Різне

multiprocessing.active_children()

Повернути список усіх живих дітей поточного процесу.

Виклик цього має побічний ефект «приєднання» до будь-яких процесів, які вже завершилися.

multiprocessing.cpu_count()

Повертає кількість процесорів у системі.

This number is not equivalent to the number of CPUs the current process can use. The number of usable CPUs can be obtained with len(os.sched_getaffinity(0))

Коли кількість ЦП не може бути визначена, виникає NotImplementedError.

Дивись також

os.cpu_count()

multiprocessing.current_process()

Повертає об’єкт Process, що відповідає поточному процесу.

Аналог threading.current_thread().

multiprocessing.parent_process()

Повертає об’єкт Process, що відповідає батьківському процесу current_process(). Для основного процесу parent_process буде None.

Нове в версії 3.8.

multiprocessing.freeze_support()

Додайте підтримку, коли програма, яка використовує multiprocessing, була заморожена для створення виконуваного файлу Windows. (Було перевірено за допомогою py2exe, PyInstaller і cx_Freeze.)

Цю функцію потрібно викликати відразу після рядка if __name__ == '__main__' головного модуля. Наприклад:

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

Якщо рядок freeze_support() пропущено, спроба запустити заморожений виконуваний файл викличе RuntimeError.

Виклик freeze_support() не має ефекту під час виклику в будь-якій операційній системі, крім Windows. Крім того, якщо модуль нормально запускається інтерпретатором Python у Windows (програма не була заморожена), то freeze_support() не має ефекту.

multiprocessing.get_all_start_methods()

Returns a list of the supported start methods, the first of which is the default. The possible start methods are 'fork', 'spawn' and 'forkserver'. On Windows only 'spawn' is available. On Unix 'fork' and 'spawn' are always supported, with 'fork' being the default.

Нове в версії 3.4.

multiprocessing.get_context(method=None)

Повертає об’єкт контексту, який має ті самі атрибути, що й модуль multiprocessing.

If method is None then the default context is returned. Otherwise method should be 'fork', 'spawn', 'forkserver'. ValueError is raised if the specified start method is not available.

Нове в версії 3.4.

multiprocessing.get_start_method(allow_none=False)

Повертає назву методу запуску, який використовується для запуску процесів.

Якщо метод запуску не було виправлено і allow_none має значення false, тоді метод запуску фіксується за замовчуванням і повертається ім’я. Якщо метод запуску не було виправлено і allow_none має значення true, тоді повертається None.

The return value can be 'fork', 'spawn', 'forkserver' or None. 'fork' is the default on Unix, while 'spawn' is the default on Windows and macOS.

Змінено в версії 3.8: У macOS метод запуску spawn тепер є типовим. Метод запуску fork слід вважати небезпечним, оскільки він може призвести до збою підпроцесу. Див. bpo-33725.

Нове в версії 3.4.

multiprocessing.set_executable(executable)

Встановіть шлях інтерпретатора Python для використання під час запуску дочірнього процесу. (За замовчуванням використовується sys.executable). Вбудовувачі, ймовірно, повинні будуть зробити щось на зразок:

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

перш ніж вони зможуть створити дочірні процеси.

Змінено в версії 3.4: Now supported on Unix when the 'spawn' start method is used.

multiprocessing.set_start_method(method)

Set the method which should be used to start child processes. method can be 'fork', 'spawn' or 'forkserver'.

Зверніть увагу, що це має бути викликано щонайбільше один раз, і його слід захистити всередині пропозиції if __name__ == '__main__' головного модуля.

Нове в версії 3.4.

Об’єкти підключення

Об’єкти підключення дозволяють надсилати та отримувати об’єкти або рядки, які можна вибрати. Їх можна розглядати як підключені сокети, орієнтовані на повідомлення.

Об’єкти підключення зазвичай створюються за допомогою Pipe – див. також Слухачі та клієнти.

class multiprocessing.connection.Connection
send(obj)

Надішліть об’єкт на інший кінець з’єднання, який слід прочитати за допомогою recv().

Об’єкт має бути маринованим. Дуже великі pickles (приблизно 32 MiB+, хоча це залежить від ОС) можуть викликати виняток ValueError.

recv()

Повернути об’єкт, надісланий з іншого кінця з’єднання за допомогою send(). Блокує, поки не буде що отримати. Викликає EOFError, якщо не залишилося нічого для отримання, а інший кінець був закритий.

fileno()

Повертає дескриптор файлу або дескриптор, який використовується підключенням.

close()

Закрийте з’єднання.

Це викликається автоматично, коли підключення збирається сміттям.

poll([timeout])

Повернути, чи є дані для читання.

Якщо timeout не вказано, він негайно повернеться. Якщо timeout є числом, це вказує максимальний час у секундах для блокування. Якщо timeout має значення None, тоді використовується нескінченний тайм-аут.

Зауважте, що кілька об’єктів з’єднання можна опитувати одночасно за допомогою multiprocessing.connection.wait().

send_bytes(buffer[, offset[, size]])

Надіслати байтові дані з bytes-like object як повне повідомлення.

Якщо вказано зсув, дані зчитуються з цієї позиції в буфері. Якщо задано size, то стільки байтів буде прочитано з буфера. Дуже великі буфери (приблизно 32 MiB+, хоча це залежить від ОС) можуть викликати виключення ValueError

recv_bytes([maxlength])

Повертає повне повідомлення байтових даних, надісланих з іншого кінця з’єднання, у вигляді рядка. Блокує, поки не буде що отримати. Викликає EOFError, якщо не залишилося нічого для отримання, а інший кінець закрито.

Якщо вказано maxlength і повідомлення довше, ніж maxlength, тоді виникає OSError і з’єднання більше не читається.

Змінено в версії 3.3: Раніше ця функція викликала IOError, який тепер є псевдонімом OSError.

recv_bytes_into(buffer[, offset])

Прочитайте в buffer повне повідомлення байтових даних, надісланих з іншого кінця з’єднання, і поверніть кількість байтів у повідомленні. Блокує, поки не буде що отримати. Викликає EOFError, якщо не залишилося нічого для отримання, а інший кінець був закритий.

buffer має бути доступним для запису bytes-like object. Якщо задано offset, повідомлення буде записано в буфер із цієї позиції. Зсув має бути невід’ємним цілим числом, меншим за довжину буфера (у байтах).

Якщо буфер закороткий, виникає виняток BufferTooShort, і повне повідомлення доступне як e.args[0], де e є винятком.

Змінено в версії 3.3: Самі об’єкти підключення тепер можна передавати між процесами за допомогою Connection.send() і Connection.recv().

Нове в версії 3.3: Connection objects now support the context management protocol – see Типи менеджера контексту. __enter__() returns the connection object, and __exit__() calls close().

Наприклад:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

Попередження

Метод Connection.recv() автоматично видаляє отримані дані, що може становити загрозу безпеці, якщо ви не можете довіряти процесу, який надіслав повідомлення.

Таким чином, якщо об’єкт підключення не було створено за допомогою Pipe(), ви повинні використовувати лише методи recv() і send() після виконання певної автентифікації. Перегляньте Ключі автентифікації.

Попередження

Якщо процес зупиняється під час спроби читання або запису в канал, то дані в каналі, ймовірно, будуть пошкоджені, тому що може стати неможливо точно визначити, де пролягають межі повідомлення.

Примітиви синхронізації

Зазвичай примітиви синхронізації не такі необхідні в багатопроцесовій програмі, як у багатопотоковій програмі. Перегляньте документацію для модуля threading.

Зауважте, що можна також створити примітиви синхронізації за допомогою об’єкта менеджера – див. Менеджери.

class multiprocessing.Barrier(parties[, action[, timeout]])

Бар’єрний об’єкт: клон threading.Barrier.

Нове в версії 3.3.

class multiprocessing.BoundedSemaphore([value])

Обмежений семафорний об’єкт: близький аналог threading.BoundedSemaphore.

Існує єдина відмінність від його близького аналога: перший аргумент методу acquire має назву block, що узгоджується з Lock.acquire().

Примітка

У macOS це неможливо відрізнити від Semaphore, оскільки sem_getvalue() не реалізовано на цій платформі.

class multiprocessing.Condition([lock])

Змінна умови: псевдонім для threading.Condition.

Якщо вказано lock, це має бути об’єкт Lock або RLock із multiprocessing.

Змінено в версії 3.3: Додано метод wait_for().

class multiprocessing.Event

Клон threading.Event.

class multiprocessing.Lock

Нерекурсивний об’єкт блокування: близький аналог threading.Lock. Після того, як процес або потік отримав блокування, наступні спроби отримати його від будь-якого процесу або потоку будуть блокуватися, доки його не буде звільнено; будь-який процес або потік може його звільнити. Концепції та поведінка threading.Lock, які застосовуються до потоків, відтворені тут у multiprocessing.Lock, оскільки вони застосовуються до процесів або потоків, за винятком зазначених випадків.

Зауважте, що Lock насправді є фабричною функцією, яка повертає примірник multiprocessing.synchronize.Lock, ініціалізований контекстом за замовчуванням.

Lock підтримує протокол context manager і тому може використовуватися в with операторах.

acquire(block=True, timeout=None)

Отримайте блокування, блокування або неблокування.

Якщо для аргументу block встановлено значення True (за замовчуванням), виклик методу блокуватиметься, доки блокування не перейде в розблокований стан, а потім встановлюватиме для нього значення True і повертатиме True. Зауважте, що назва цього першого аргументу відрізняється від такої в threading.Lock.acquire().

Якщо для аргументу block встановлено значення False, виклик методу не блокується. Якщо блокування зараз у заблокованому стані, поверніть False; інакше встановіть блокування в заблокований стан і поверніть True.

При виклику з додатним значенням з плаваючою комою для timeout, блокувати щонайбільше на кількість секунд, визначену timeout, доки не вдасться отримати блокування. Виклики з від’ємним значенням для timeout еквівалентні timeout рівному нулю. Виклики зі значенням timeout None (за замовчуванням) встановлюють період очікування як нескінченний. Зауважте, що обробка негативних значень або значень None для timeout відрізняється від реалізованої поведінки в threading.Lock.acquire(). Аргумент timeout не має практичного значення, якщо для аргументу block встановлено значення False і, таким чином, він ігнорується. Повертає True, якщо блокування отримано, або False, якщо період очікування минув.

release()

Відпустіть блокування. Це може бути викликано з будь-якого процесу або потоку, а не тільки процесу або потоку, який спочатку отримав блокування.

Поведінка така ж, як у threading.Lock.release(), за винятком того, що під час виклику для розблокованого блокування виникає ValueError.

class multiprocessing.RLock

Об’єкт рекурсивного блокування: близький аналог threading.RLock. Рекурсивне блокування має бути звільнено процесом або потоком, який його отримав. Після того як процес або потік отримав рекурсивне блокування, той самий процес або потік може отримати його знову без блокування; цей процес або потік повинен випускати його один раз за кожен раз, коли його було отримано.

Зауважте, що RLock насправді є фабричною функцією, яка повертає екземпляр multiprocessing.synchronize.RLock, ініціалізований контекстом за замовчуванням.

RLock підтримує протокол context manager і тому може використовуватися в with операторах.

acquire(block=True, timeout=None)

Отримайте блокування, блокування або неблокування.

При виклику з аргументом block, встановленим у значення True, блокувати, доки блокування не буде в розблокованому стані (не належить жодному процесу або потоку), якщо блокування вже не належить поточному процесу або потоку. Тоді поточний процес або потік отримує право власності на блокування (якщо він ще не має права власності), а рівень рекурсії всередині блокування збільшується на одиницю, що призводить до повернення значення True. Зауважте, що є кілька відмінностей у поведінці цього першого аргументу порівняно з реалізацією threading.RLock.acquire(), починаючи з назви самого аргументу.

При виклику з аргументом block, встановленим на False, не блокувати. Якщо блокування вже було отримано (і, отже, ним володіє) інший процес або потік, поточний процес або потік не приймає права власності, а рівень рекурсії в межах блокування не змінюється, що призводить до повернення значення False . Якщо блокування знаходиться в розблокованому стані, поточний процес або потік приймає право власності, і рівень рекурсії збільшується, що призводить до повернення значення True.

Використання та поведінка аргументу timeout такі ж, як і в Lock.acquire(). Зауважте, що деякі з цих дій timeout відрізняються від реалізованих у threading.RLock.acquire().

release()

Зніміть блокування, зменшивши рівень рекурсії. Якщо після зменшення рівень рекурсії дорівнює нулю, скиньте блокування до розблокованого (не належить жодному процесу чи потоку), а якщо будь-які інші процеси чи потоки заблоковані в очікуванні розблокування блокування, дозвольте рівно одному з них продовжити. Якщо після декременту рівень рекурсії все ще ненульовий, блокування залишається заблокованим і належить процесу або потоку, що викликає.

Викликайте цей метод лише тоді, коли процес або потік, що викликає, володіє блокуванням. Помилка AssertionError виникає, якщо цей метод викликається процесом або потоком, відмінним від власника, або якщо блокування знаходиться в розблокованому стані (не належить). Зауважте, що тип винятку, викликаного в цій ситуації, відрізняється від реалізованої поведінки в threading.RLock.release().

class multiprocessing.Semaphore([value])

Об’єкт семафор: близький аналог threading.Semaphore.

Існує єдина відмінність від його близького аналога: перший аргумент методу acquire має назву block, що узгоджується з Lock.acquire().

Примітка

У macOS sem_timedwait не підтримується, тому виклик acquire() із тайм-аутом буде емулювати поведінку цієї функції за допомогою циклу сну.

Примітка

If the SIGINT signal generated by Ctrl-C arrives while the main thread is blocked by a call to BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() or Condition.wait() then the call will be immediately interrupted and KeyboardInterrupt will be raised.

This differs from the behaviour of threading where SIGINT will be ignored while the equivalent blocking calls are in progress.

Примітка

Для деяких функцій цього пакета необхідна функціональна реалізація спільного семафора в головній операційній системі. Без нього модуль multiprocessing.synchronize буде вимкнено, а спроби його імпортувати призведуть до ImportError. Додаткову інформацію див. bpo-3770.

Спільні об’єкти ctypes

Можна створювати спільні об’єкти, використовуючи спільну пам’ять, яку можуть успадковувати дочірні процеси.

multiprocessing.Value(typecode_or_type, *args, lock=True)

Повертає об’єкт ctypes, виділений зі спільної пам’яті. За замовчуванням значення, що повертається, фактично є синхронізованою оболонкою для об’єкта. До самого об’єкта можна отримати доступ через атрибут value Value.

typecode_or_type визначає тип повернутого об’єкта: це або тип ctypes, або односимвольний код типу, який використовується модулем array. *args передається конструктору для типу.

Якщо lock має значення True (за замовчуванням), тоді створюється новий об’єкт рекурсивного блокування для синхронізації доступу до значення. Якщо lock є об’єктом Lock або RLock, то він використовуватиметься для синхронізації доступу до значення. Якщо lock має значення False, тоді доступ до повернутого об’єкта не буде автоматично захищений блокуванням, тому він не обов’язково буде «безпечним для процесу».

Такі операції, як +=, які передбачають читання та запис, не є атомарними. Отже, якщо, наприклад, ви хочете атомарно збільшити спільне значення, недостатньо просто зробити

counter.value += 1

Якщо припустити, що пов’язане блокування є рекурсивним (що є за замовчуванням), ви можете замість цього зробити:

with counter.get_lock():
    counter.value += 1

Зверніть увагу, що lock є аргументом лише для ключового слова.

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

Повертає масив ctypes, виділений зі спільної пам’яті. За замовчуванням значення, що повертається, фактично є синхронізованою оболонкою для масиву.

typecode_or_type визначає тип елементів повернутого масиву: це або тип ctypes, або односимвольний код типу, який використовується модулем array. Якщо size_or_initializer є цілим числом, воно визначає довжину масиву, і масив буде спочатку обнулений. В іншому випадку size_or_initializer — це послідовність, яка використовується для ініціалізації масиву, довжина якої визначає довжину масиву.

Якщо lock має значення True (за замовчуванням), тоді створюється новий об’єкт блокування для синхронізації доступу до значення. Якщо lock є об’єктом Lock або RLock, то він використовуватиметься для синхронізації доступу до значення. Якщо lock має значення False, тоді доступ до повернутого об’єкта не буде автоматично захищений блокуванням, тому він не обов’язково буде «безпечним для процесу».

Зауважте, що lock є лише ключовим аргументом.

Зауважте, що масив ctypes.c_char має атрибути value і raw, які дозволяють використовувати його для зберігання та отримання рядків.

Модуль multiprocessing.sharedctypes

Модуль multiprocessing.sharedctypes надає функції для виділення об’єктів ctypes зі спільної пам’яті, які можуть бути успадковані дочірніми процесами.

Примітка

Хоча можна зберігати вказівник у спільній пам’яті, пам’ятайте, що це посилатиметься на розташування в адресному просторі певного процесу. Однак вказівник, швидше за все, буде недійсним у контексті другого процесу, і спроба розіменувати вказівник з другого процесу може спричинити збій.

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

Повертає масив ctypes, виділений зі спільної пам’яті.

typecode_or_type визначає тип елементів повернутого масиву: це або тип ctypes, або односимвольний код типу, який використовується модулем array. Якщо size_or_initializer є цілим числом, воно визначає довжину масиву, і масив буде спочатку обнулено. В іншому випадку size_or_initializer — це послідовність, яка використовується для ініціалізації масиву і довжина якої визначає довжину масиву.

Зауважте, що встановлення й отримання елемента потенційно не є атомарним — замість цього використовуйте Array(), щоб переконатися, що доступ автоматично синхронізується за допомогою блокування.

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

Повертає об’єкт ctypes, виділений зі спільної пам’яті.

typecode_or_type визначає тип повернутого об’єкта: це або тип ctypes, або односимвольний код типу, який використовується модулем array. *args передається конструктору для типу.

Зауважте, що встановлення й отримання значення потенційно не є атомарним — замість цього використовуйте Value(), щоб переконатися, що доступ автоматично синхронізується за допомогою блокування.

Зауважте, що масив ctypes.c_char має атрибути value і raw, які дозволяють використовувати його для зберігання та отримання рядків - дивіться документацію для ctypes.

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

Те саме, що RawArray(), за винятком того, що залежно від значення lock замість необробленого масиву ctypes може повертатися обгортка синхронізації, безпечна для процесу.

Якщо lock має значення True (за замовчуванням), тоді створюється новий об’єкт блокування для синхронізації доступу до значення. Якщо lock є об’єктом Lock або RLock, то він використовуватиметься для синхронізації доступу до значення. Якщо lock має значення False, тоді доступ до повернутого об’єкта не буде автоматично захищений блокуванням, тому він не обов’язково буде «безпечним для процесу».

Зверніть увагу, що lock є аргументом лише для ключового слова.

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

Те саме, що RawValue(), за винятком того, що залежно від значення lock замість необробленого об’єкта ctypes може повертатися безпечна для процесу оболонка синхронізації.

Якщо lock має значення True (за замовчуванням), тоді створюється новий об’єкт блокування для синхронізації доступу до значення. Якщо lock є об’єктом Lock або RLock, то він використовуватиметься для синхронізації доступу до значення. Якщо lock має значення False, тоді доступ до повернутого об’єкта не буде автоматично захищений блокуванням, тому він не обов’язково буде «безпечним для процесу».

Зверніть увагу, що lock є аргументом лише для ключового слова.

multiprocessing.sharedctypes.copy(obj)

Повертає об’єкт ctypes, виділений зі спільної пам’яті, який є копією об’єкта ctypes obj.

multiprocessing.sharedctypes.synchronized(obj[, lock])

Повертає безпечний для процесу об’єкт оболонки для об’єкта ctypes, який використовує lock для синхронізації доступу. Якщо lock має значення None (за замовчуванням), то об’єкт multiprocessing.RLock створюється автоматично.

Синхронізована оболонка матиме два методи на додаток до методів об’єкта, який вона обертає: get_obj() повертає обгорнутий об’єкт, а get_lock() повертає об’єкт блокування, який використовується для синхронізації.

Зверніть увагу, що доступ до об’єкта ctypes через оболонку може бути набагато повільнішим, ніж доступ до необробленого об’єкта ctypes.

Змінено в версії 3.5: Синхронізовані об’єкти підтримують протокол context manager.

У наведеній нижче таблиці порівнюється синтаксис для створення спільних об’єктів ctypes зі спільної пам’яті зі звичайним синтаксисом ctypes. (У таблиці MyStruct є деякий підклас ctypes.Structure.)

ctypes

sharedctypes за допомогою типу

sharedctypes з використанням коду типу

c_double (2,4)

RawValue(c_double, 2,4)

RawValue(„d“, 2,4)

MyStruct(4, 6)

RawValue(MyStruct, 4, 6)

(c_short * 7)()

RawArray(c_short, 7)

RawArray(„h“, 7)

(c_int * 3)(9, 2, 8)

RawArray(c_int, (9, 2, 8))

RawArray(„i“, (9, 2, 8))

Нижче наведено приклад, коли кілька об’єктів ctypes змінено дочірнім процесом:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

Надруковані результати:

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

Менеджери

Менеджери надають можливість створювати дані, якими можна ділитися між різними процесами, включно з загальним доступом через мережу між процесами, що виконуються на різних машинах. Об’єкт менеджера контролює серверний процес, який керує спільними об’єктами. Інші процеси можуть отримати доступ до спільних об’єктів за допомогою проксі-серверів.

multiprocessing.Manager()

Повертає запущений об’єкт SyncManager, який можна використовувати для спільного використання об’єктів між процесами. Повернений об’єкт менеджера відповідає породженому дочірньому процесу та має методи, які створюватимуть спільні об’єкти та повертатимуть відповідні проксі-сервери.

Процеси менеджера буде закрито, щойно їх буде зібрано сміття або їхній батьківський процес завершиться. Класи менеджерів визначені в модулі multiprocessing.managers:

class multiprocessing.managers.BaseManager([address[, authkey]])

Створіть об’єкт BaseManager.

Після створення потрібно викликати start() або get_server().serve_forever(), щоб переконатися, що об’єкт менеджера посилається на запущений процес менеджера.

адреса — це адреса, на якій процес менеджера прослуховує нові підключення. Якщо адреса має значення None, тоді вибирається довільна адреса.

authkey — це ключ автентифікації, який використовуватиметься для перевірки дійсності вхідних підключень до процесу сервера. Якщо authkey має значення None, тоді використовується current_process().authkey. В іншому випадку використовується authkey, і це має бути рядок байтів.

start([initializer[, initargs]])

Запустіть підпроцес, щоб запустити менеджер. Якщо initializer не None, тоді підпроцес викличе initializer(*initargs) під час запуску.

get_server()

Повертає об’єкт Server, який представляє фактичний сервер під керуванням менеджера. Об’єкт Server підтримує метод serve_forever():

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server додатково має атрибут address.

connect()

Підключіть об’єкт локального менеджера до віддаленого процесу менеджера:

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()
shutdown()

Зупиніть процес, який використовує менеджер. Це доступно, лише якщо start() було використано для запуску процесу сервера.

Це можна викликати кілька разів.

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

Метод класу, який можна використовувати для реєстрації типу або виклику в класі менеджера.

typeid — це «ідентифікатор типу», який використовується для ідентифікації певного типу спільного об’єкта. Це має бути рядок.

callable — це виклик, який використовується для створення об’єктів для ідентифікатора цього типу. Якщо екземпляр менеджера буде підключено до сервера за допомогою методу connect() або якщо аргумент create_method має значення False, тоді це можна залишити як None.

proxytype є підкласом BaseProxy, який використовується для створення проксі для спільних об’єктів із цим typeid. Якщо None, то проксі-клас створюється автоматично.

exposed використовується для визначення послідовності назв методів, до яких проксі-серверам для цього typeid має бути дозволено доступ за допомогою BaseProxy._callmethod(). (Якщо exposed має значення None, тоді замість нього використовується proxytype._exposed_, якщо він існує.) У випадку, коли відкритий список не вказано, усі «загальнодоступні методи» спільного об’єкта будуть доступними. . (Тут «публічний метод» означає будь-який атрибут, який має метод __call__() і ім’я якого не починається з '_'.)

method_to_typeid — це зіставлення, яке використовується для визначення типу повернення тих відкритих методів, які мають повертати проксі. Він відображає назви методів у рядках typeid. (Якщо method_to_typeid має значення None, тоді замість нього використовується proxytype._method_to_typeid_, якщо він існує.) Якщо назва методу не є ключем цього відображення або якщо відображення має значення None, тоді об’єкт, повернутий методом, буде скопійовано за значенням.

create_method визначає, чи слід створювати метод з іменем typeid, яке можна використовувати, щоб наказати серверному процесу створити новий спільний об’єкт і повернути для нього проксі. За замовчуванням це True.

Екземпляри BaseManager також мають одну властивість лише для читання:

address

Адреса, яку використовує менеджер.

Змінено в версії 3.3: Об’єкти менеджера підтримують протокол керування контекстом – див. Типи менеджера контексту. __enter__() запускає серверний процес (якщо він ще не запущений), а потім повертає об’єкт менеджера. __exit__() викликає shutdown().

У попередніх версіях __enter__() не запускав серверний процес менеджера, якщо він ще не був запущений.

class multiprocessing.managers.SyncManager

Підклас BaseManager, який можна використовувати для синхронізації процесів. Об’єкти цього типу повертає multiprocessing.Manager().

Його методи створюють і повертають Проксі об’єкти для ряду типів даних, які зазвичай використовуються, щоб синхронізувати між процесами. Це, зокрема, включає спільні списки та словники.

Barrier(parties[, action[, timeout]])

Створіть спільний об’єкт threading.Barrier і поверніть для нього проксі.

Нове в версії 3.3.

BoundedSemaphore([value])

Створіть спільний об’єкт threading.BoundedSemaphore і поверніть для нього проксі.

Condition([lock])

Створіть спільний об’єкт threading.Condition і поверніть для нього проксі.

Якщо вказано lock, це має бути проксі для об’єкта threading.Lock або threading.RLock.

Змінено в версії 3.3: Додано метод wait_for().

Event()

Створіть спільний об’єкт threading.Event і поверніть для нього проксі.

Lock()

Створіть спільний об’єкт threading.Lock і поверніть для нього проксі.

Namespace()

Створіть спільний об’єкт Namespace і поверніть для нього проксі.

Queue([maxsize])

Створіть спільний об’єкт queue.Queue і поверніть для нього проксі.

RLock()

Створіть спільний об’єкт threading.RLock і поверніть для нього проксі.

Semaphore([value])

Створіть спільний об’єкт threading.Semaphore і поверніть для нього проксі.

Array(typecode, sequence)

Створіть масив і поверніть для нього проксі.

Value(typecode, value)

Створіть об’єкт із доступним для запису атрибутом «значення» та поверніть для нього проксі-сервер.

dict()
dict(mapping)
dict(sequence)

Створіть спільний об’єкт dict і поверніть для нього проксі.

list()
list(sequence)

Створіть спільний об’єкт list і поверніть для нього проксі.

Змінено в версії 3.6: Спільні об’єкти можуть бути вкладеними. Наприклад, спільний об’єкт-контейнер, такий як спільний список, може містити інші спільні об’єкти, якими керуватиме та синхронізуватиме SyncManager.

class multiprocessing.managers.Namespace

Тип, який можна зареєструвати в SyncManager.

Об’єкт простору імен не має відкритих методів, але має атрибути, доступні для запису. Його подання показує значення його атрибутів.

Однак, коли використовується проксі для об’єкта простору імен, атрибут, що починається з '_'' буде атрибутом проксі, а не атрибутом референта:

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

Індивідуальні менеджери

Щоб створити власний менеджер, потрібно створити підклас BaseManager і використовувати метод класу register() для реєстрації нових типів або викликів у класі менеджера. Наприклад:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

Використання віддаленого менеджера

Можна запустити керуючий сервер на одній машині, а клієнти використовуватимуть його з інших машин (за умови, що задіяні брандмауери дозволяють це).

Виконання наступних команд створює сервер для однієї спільної черги, до якої мають доступ віддалені клієнти:

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Один клієнт може отримати доступ до сервера наступним чином:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

Інший клієнт також може використовувати його:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

Локальні процеси також можуть отримати доступ до цієї черги, використовуючи код вище на клієнті для доступу до неї віддалено:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super().__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Проксі об’єкти

Проксі — це об’єкт, який посилається на спільний об’єкт, який живе (імовірно) в іншому процесі. Спільний об’єкт називається референтом проксі. Кілька проксі-об’єктів можуть мати один і той же референт.

Проксі-об’єкт має методи, які викликають відповідні методи його референта (хоча не кожен метод референта обов’язково буде доступним через проксі). Таким чином, проксі можна використовувати так само, як і його референт:

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

Зауважте, що застосування str() до проксі поверне подання референта, тоді як застосування repr() поверне подання проксі.

Важливою особливістю проксі-об’єктів є те, що їх можна вибирати, тому їх можна передавати між процесами. Таким чином, референт може містити Проксі об’єкти. Це дозволяє вкладати ці керовані списки, dicts та інші Проксі об’єкти:

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

Подібним чином, проксі dict і список можуть бути вкладені один в одного:

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

Якщо стандартні (не проксі) list або dict об’єкти містяться в референті, модифікації цих змінних значень не поширюватимуться через менеджер, оскільки проксі не може дізнатися, коли значення що містяться в них, змінено. Однак збереження значення в проксі-контейнері (що запускає __setitem__ в проксі-об’єкті) поширюється через менеджер, тому для ефективної зміни такого елемента можна повторно призначити змінене значення проксі-серверу контейнера: :

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d

Цей підхід, можливо, менш зручний, ніж використання вкладених Проксі об’єкти для більшості випадків використання, але також демонструє рівень контролю над синхронізацією.

Примітка

Проксі-типи в multiprocessing не підтримують порівняння за значенням. Так, наприклад, ми маємо:

>>> manager.list([1,2,3]) == [1,2,3]
False

Під час порівнянь слід просто використовувати копію референта.

class multiprocessing.managers.BaseProxy

Проксі-об’єкти є екземплярами підкласів BaseProxy.

_callmethod(methodname[, args[, kwds]])

Виклик і повернення результату методу референта проксі.

Якщо proxy є проксі, референтом якого є obj, тоді вираз

proxy._callmethod(methodname, args, kwds)

обчислить вираз

getattr(obj, methodname)(*args, **kwds)

в процесі менеджера.

Поверненим значенням буде копія результату виклику або проксі для нового спільного об’єкта – див. документацію щодо аргументу method_to_typeid BaseManager.register().

Якщо виклик викликає виняток, він повторно викликається _callmethod(). Якщо в процесі менеджера виникає інший виняток, він перетворюється на виняток RemoteError і викликається _callmethod().

Зокрема, зауважте, що виняток буде створено, якщо methodname не було виявлено.

Приклад використання _callmethod():

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

Повернути копію референту.

Якщо референт неможливо вибрати, це спричинить виняток.

__repr__()

Повертає представлення проксі-об’єкта.

__str__()

Повернути представлення референта.

Прибирати

Проксі-об’єкт використовує зворотний виклик weakref, щоб, коли він збирає сміття, він скасовує реєстрацію в менеджері, якому належить його референт.

Спільний об’єкт видаляється з процесу менеджера, коли більше немає проксі-серверів, які посилаються на нього.

Пули процесів

Можна створити пул процесів, які виконуватимуть передані йому завдання за допомогою класу Pool.

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

Об’єкт пулу процесів, який керує пулом робочих процесів, до яких можна надсилати завдання. Він підтримує асинхронні результати з тайм-аутами та зворотними викликами та має реалізацію паралельної карти.

processes is the number of worker processes to use. If processes is None then the number returned by os.cpu_count() is used.

Якщо initializer не None, тоді кожен робочий процес викличе initializer(*initargs) під час свого запуску.

maxtasksperchild — це кількість завдань, які робочий процес може виконати, перш ніж він вийде та буде замінений новим робочим процесом, щоб звільнити невикористані ресурси. Типовим значенням maxtasksperchild є None, що означає, що робочі процеси живуть стільки ж, скільки пул.

context можна використовувати для визначення контексту, який використовується для запуску робочих процесів. Зазвичай пул створюється за допомогою функції multiprocessing.Pool() або методу Pool() контекстного об’єкта. В обох випадках контекст встановлено належним чином.

Зауважте, що методи об’єкта пулу має викликати тільки процес, який створив пул.

Попередження

Об’єкти multiprocessing.pool мають внутрішні ресурси, якими потрібно належним чином керувати (як і будь-яким іншим ресурсом), використовуючи пул як контекстний менеджер або викликаючи close() і terminate() вручну. Якщо цього не зробити, процес може призупинити завершення.

Note that is not correct to rely on the garbage colletor to destroy the pool as CPython does not assure that the finalizer of the pool will be called (see object.__del__() for more information).

Нове в версії 3.2: maxtasksperchild

Нове в версії 3.4: context

Примітка

Робочі процеси в межах Pool зазвичай живі протягом повної тривалості робочої черги пулу. Частий шаблон, який зустрічається в інших системах (таких як Apache, mod_wsgi тощо) для звільнення ресурсів, які зберігаються робочими засобами, полягає в тому, щоб дозволити робочому в межах пулу завершити лише певний обсяг роботи перед виходом, очищенням і породженням нового процесу на заміну старого. Аргумент maxtasksperchild для Pool надає цю можливість кінцевому користувачеві.

apply(func[, args[, kwds]])

Виклик func з аргументами args і ключовими аргументами kwds. Блокується, поки не буде готовий результат. Враховуючи ці блоки, apply_async() краще підходить для виконання роботи паралельно. Крім того, func виконується лише в одному з воркерів пулу.

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

Варіант методу apply(), який повертає об’єкт AsyncResult.

Якщо вказано callback, це має бути виклик, який приймає один аргумент. Коли результат стає готовим, до нього застосовується callback, якщо тільки виклик не вдався, у цьому випадку замість нього застосовується error_callback.

Якщо вказано error_callback, це має бути виклик, який приймає один аргумент. Якщо цільова функція дає збій, то error_callback викликається з екземпляром винятку.

Зворотні виклики мають завершитися негайно, інакше потік, який обробляє результати, буде заблоковано.

map(func, iterable[, chunksize])

Паралельний еквівалент вбудованої функції map() (хоча вона підтримує лише один аргумент iterable, для кількох ітерацій див. starmap()). Блокується, поки не буде готовий результат.

Цей метод розбиває iterable на декілька фрагментів, які він надсилає до пулу процесів як окремі завдання. (Приблизний) розмір цих фрагментів можна вказати, встановивши для chunksize додатне ціле число.

Зауважте, що це може спричинити велике використання пам’яті для дуже довгих ітерацій. Розгляньте можливість використання imap() або imap_unordered() з явним параметром chunksize для кращої ефективності.

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

Варіант методу map(), який повертає об’єкт AsyncResult.

Якщо вказано callback, це має бути виклик, який приймає один аргумент. Коли результат стає готовим, до нього застосовується callback, якщо тільки виклик не вдався, у цьому випадку замість нього застосовується error_callback.

Якщо вказано error_callback, це має бути виклик, який приймає один аргумент. Якщо цільова функція дає збій, то error_callback викликається з екземпляром винятку.

Зворотні виклики мають завершитися негайно, інакше потік, який обробляє результати, буде заблоковано.

imap(func, iterable[, chunksize])

Ленича версія map().

Аргумент chunksize такий самий, як той, який використовується методом map(). Для дуже довгих ітерацій використання великого значення для chunksize може зробити роботу завершеною набагато швидше, ніж використання значення за замовчуванням 1.

Крім того, якщо chunksize дорівнює 1, тоді метод next() ітератора, який повертає метод imap(), має додатковий параметр timeout: next(timeout) викличе multiprocessing.TimeoutError, якщо результат не може бути повернутий протягом часу очікування секунд.

imap_unordered(func, iterable[, chunksize])

Те саме, що imap(), за винятком того, що порядок результатів від повернутого ітератора слід вважати довільним. (Тільки коли є лише один робочий процес, порядок гарантовано буде «правильним».)

starmap(func, iterable[, chunksize])

Подібно до map(), за винятком того, що елементи iterable мають бути ітерованими, які розпаковуються як аргументи.

Тому ітерація [(1,2), (3, 4)] призводить до [func(1,2), func(3,4)].

Нове в версії 3.3.

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

Комбінація starmap() і map_async(), яка виконує ітерацію по iterable ітерацій і викликає func з розпакованими ітераціями. Повертає об’єкт результату.

Нове в версії 3.3.

close()

Запобігає надсиланню додаткових завдань до пулу. Після виконання всіх завдань робочі процеси завершаться.

terminate()

Негайно зупиняє робочі процеси, не завершуючи незавершену роботу. Коли об’єкт пулу збирається як сміття, негайно буде викликано terminate().

join()

Зачекайте, поки робочі процеси завершаться. Потрібно викликати close() або terminate() перед використанням join().

Нове в версії 3.3: Об’єкти пулу тепер підтримують протокол керування контекстом – див. Типи менеджера контексту. __enter__() повертає об’єкт пулу, а __exit__() викликає terminate().

class multiprocessing.pool.AsyncResult

Клас результату, який повертають Pool.apply_async() і Pool.map_async().

get([timeout])

Поверніть результат, коли він надійде. Якщо timeout не None і результат не надходить протягом timeout секунд, тоді виникає multiprocessing.TimeoutError. Якщо віддалений виклик викликав виняток, цей виняток буде повторно викликано get().

wait([timeout])

Зачекайте, поки буде доступний результат або поки не мине тайм-аут секунди.

ready()

Повідомити, чи завершено виклик.

successful()

Повертає, чи завершено виклик без виклику винятку. Викличе ValueError, якщо результат не готовий.

Змінено в версії 3.7: Якщо результат не готовий, замість AssertionError виникає ValueError.

Наступний приклад демонструє використання пулу:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

Слухачі та клієнти

Зазвичай передача повідомлень між процесами здійснюється за допомогою черг або за допомогою об’єктів Connection, які повертає Pipe().

Однак модуль multiprocessing.connection забезпечує додаткову гнучкість. По суті, це надає API високого рівня, орієнтований на повідомлення, для роботи з сокетами або іменованими каналами Windows. Він також підтримує дайджест-автентифікацію за допомогою модуля hmac і для опитування кількох з’єднань одночасно.

multiprocessing.connection.deliver_challenge(connection, authkey)

Надішліть випадково згенероване повідомлення на інший кінець з’єднання та дочекайтеся відповіді.

Якщо відповідь відповідає дайджесту повідомлення з використанням authkey як ключа, тоді на інший кінець з’єднання надсилається вітальне повідомлення. Інакше виникає AuthenticationError.

multiprocessing.connection.answer_challenge(connection, authkey)

Отримайте повідомлення, обчисліть дайджест повідомлення, використовуючи authkey як ключ, а потім надішліть дайджест назад.

Якщо вітальне повідомлення не отримано, виникає AuthenticationError.

multiprocessing.connection.Client(address[, family[, authkey]])

Спроба встановити з’єднання зі слухачем, який використовує адресу address, повертаючи Connection.

Тип з’єднання визначається аргументом family, але зазвичай його можна опустити, оскільки його зазвичай можна визначити з формату address. (Див. Формати адрес)

If authkey is given and not None, it should be a byte string and will be used as the secret key for an HMAC-based authentication challenge. No authentication is done if authkey is None. AuthenticationError is raised if authentication fails. See Ключі автентифікації.

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

Обгортка для пов’язаного сокета або каналу з іменем Windows, який «слухає» з’єднання.

address — це адреса, яка буде використовуватися зв’язаним сокетом або іменованим каналом об’єкта слухача.

Примітка

Якщо використовується адреса «0.0.0.0», ця адреса не буде кінцевою точкою підключення в Windows. Якщо вам потрібна підключена кінцева точка, вам слід використовувати «127.0.0.1».

сімейство — це тип розетки (або названої труби), яку слід використовувати. Це може бути один із рядків 'AF_INET' (для сокета TCP), 'AF_UNIX' (для сокета домену Unix) або 'AF_PIPE'' (для іменованого каналу Windows) . З них лише перший гарантовано доступний. Якщо family має значення None, тоді сім’я виводиться з формату address. Якщо адреса також None, тоді вибрано значення за замовчуванням. Це за замовчуванням сімейство, яке вважається найшвидшим із доступних. Дивіться Формати адрес. Зауважте, що якщо сімейство має значення 'AF_UNIX, а адреса None, то сокет буде створено в приватному тимчасовому каталозі, створеному за допомогою tempfile.mkstemp().

Якщо об’єкт слухача використовує сокет, тоді backlog (1 за замовчуванням) передається в метод listen() сокета після того, як його буде зв’язано.

If authkey is given and not None, it should be a byte string and will be used as the secret key for an HMAC-based authentication challenge. No authentication is done if authkey is None. AuthenticationError is raised if authentication fails. See Ключі автентифікації.

accept()

Прийняти підключення до зв’язаного сокета або іменованого каналу об’єкта слухача та повернути об’єкт Connection. Якщо спроба автентифікації не вдається, виникає AuthenticationError.

close()

Закрийте прив’язаний сокет або іменований канал об’єкта слухача. Це викликається автоматично, коли слухач збирає сміття. Однак бажано називати це явно.

Об’єкти слухача мають такі властивості лише для читання:

address

Адреса, яка використовується об’єктом Listener.

last_accepted

Адреса, з якої надійшло останнє прийняте підключення. Якщо це недоступно, це None.

Нове в версії 3.3: Об’єкти слухача тепер підтримують протокол керування контекстом – див. Типи менеджера контексту. __enter__() повертає об’єкт слухача, а __exit__() викликає close().

multiprocessing.connection.wait(object_list, timeout=None)

Зачекайте, поки об’єкт у object_list буде готовий. Повертає список тих об’єктів у object_list, які готові. Якщо timeout є числом з плаваючою точкою, виклик блокується щонайбільше на стільки секунд. Якщо timeout має значення None, тоді він блокуватиметься на необмежений період. Від’ємний тайм-аут еквівалентний нульовому тайм-ауту.

For both Unix and Windows, an object can appear in object_list if it is

Об’єкт з’єднання або сокета готовий, коли є доступні дані для читання з нього, або інший кінець закрито.

Unix: wait(object_list, timeout) almost equivalent select.select(object_list, [], [], timeout). The difference is that, if select.select() is interrupted by a signal, it can raise OSError with an error number of EINTR, whereas wait() will not.

Windows: An item in object_list must either be an integer handle which is waitable (according to the definition used by the documentation of the Win32 function WaitForMultipleObjects()) or it can be an object with a fileno() method which returns a socket handle or pipe handle. (Note that pipe handles and socket handles are not waitable handles.)

Нове в версії 3.3.

Приклади

Наступний код сервера створює прослуховувач, який використовує 'секретний пароль як ключ автентифікації. Потім він очікує з’єднання та надсилає деякі дані клієнту:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

Наступний код підключається до сервера та отримує деякі дані з сервера:

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

Наступний код використовує wait() для очікування повідомлень від кількох процесів одночасно:

import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

Формати адрес

  • Адреса «AF_INET» — це кортеж у формі «(ім’я хоста, порт)», де ім’я хоста — рядок, а порт — ціле число.

  • Адреса «AF_UNIX» — це рядок, що представляє назву файлу у файловій системі.

  • An 'AF_PIPE' address is a string of the form r'\.\pipe{PipeName}'. To use Client() to connect to a named pipe on a remote computer called ServerName one should use an address of the form r'\ServerName\pipe{PipeName}' instead.

Зауважте, що будь-який рядок, який починається двома зворотними похилими рисками, за замовчуванням вважається адресою 'AF_PIPE', а не адресою 'AF_UNIX'.

Ключі автентифікації

Коли використовується Connection.recv, отримані дані автоматично видаляються. На жаль, видалення даних із ненадійного джерела становить загрозу безпеці. Тому Listener і Client() використовують модуль hmac для забезпечення автентифікації дайджесту.

Ключ автентифікації — це рядок байтів, який можна розглядати як пароль: коли з’єднання встановлено, обидва кінці вимагатимуть підтвердження того, що інший знає ключ автентифікації. (Демонстрація того, що обидві сторони використовують той самий ключ, не передбачає надсилання ключа через з’єднання.)

Якщо автентифікація запитується, але ключ автентифікації не вказано, тоді використовується значення, що повертається current_process().authkey (див. Process). Це значення буде автоматично успадковано будь-яким об’єктом Process, який створює поточний процес. Це означає, що (за замовчуванням) усі процеси багатопроцесної програми спільно використовуватимуть один ключ автентифікації, який можна використовувати під час встановлення з’єднань між собою.

Відповідні ключі автентифікації також можна згенерувати за допомогою os.urandom().

Лісозаготівля

Доступна певна підтримка журналювання. Однак зауважте, що пакунок logging не використовує спільні блокування процесів, тому (залежно від типу обробника) повідомлення від різних процесів можуть переплутатися.

multiprocessing.get_logger()

Повертає реєстратор, який використовується multiprocessing. За потреби буде створено новий.

When first created the logger has level logging.NOTSET and no default handler. Messages sent to this logger will not by default propagate to the root logger.

Зауважте, що у Windows дочірні процеси успадковуватимуть лише рівень реєстратора батьківського процесу – будь-які інші налаштування реєстратора не успадковуватимуться.

multiprocessing.log_to_stderr(level=None)

Ця функція виконує виклик get_logger(), але окрім повернення реєстратора, створеного get_logger, вона додає обробник, який надсилає вихідні дані до sys.stderr у форматі '[%(levelname)s/%(processName)s] %(message)s ''. Ви можете змінити levelname реєстратора, передавши аргумент level.

Нижче наведено приклад сеансу з увімкненим журналюванням:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

Щоб отримати повну таблицю рівнів журналювання, перегляньте модуль logging.

Модуль multiprocessing.dummy

multiprocessing.dummy повторює API multiprocessing, але є не більше ніж обгорткою модуля threading.

Зокрема, функція Pool, надана multiprocessing.dummy, повертає екземпляр ThreadPool, який є підкласом Pool, який підтримує всі виклики методів, але використовує пул робочих потоків, а не робочих процесів.

class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])

Об’єкт пулу потоків, який керує пулом робочих потоків, до яких можна надсилати завдання. Екземпляри ThreadPool повністю сумісні з інтерфейсом екземплярів Pool, і їхніми ресурсами також потрібно правильно керувати, використовуючи пул як контекстний менеджер або викликаючи close() і terminate() вручну.

processes is the number of worker threads to use. If processes is None then the number returned by os.cpu_count() is used.

Якщо initializer не None, тоді кожен робочий процес викличе initializer(*initargs) під час свого запуску.

На відміну від Pool, maxtasksperchild і context не можна надати.

Примітка

ThreadPool має той самий інтерфейс, що й Pool, який розроблено навколо пулу процесів і передує появі модуля concurrent.futures. Таким чином, він успадковує деякі операції, які не мають сенсу для пулу, що підтримується потоками, і має власний тип для представлення статусу асинхронних завдань, AsyncResult, який не розуміється жодною іншою бібліотекою.

Зазвичай користувачі мають віддавати перевагу використанню concurrent.futures.ThreadPoolExecutor, який має простіший інтерфейс, розроблений навколо потоків із самого початку та повертає concurrent.futures.Future екземпляри, сумісні з багатьма інші бібліотеки, включаючи asyncio.

Інструкції з програмування

Існують певні вказівки та ідіоми, яких слід дотримуватися під час використання multiprocessing.

Всі методи запуску

Наступне стосується всіх методів запуску.

Уникайте спільного стану

Наскільки це можливо, слід намагатися уникати переміщення великих обсягів даних між процесами.

Ймовірно, найкраще використовувати черги або канали для зв’язку між процесами, а не використовувати примітиви синхронізації нижчого рівня.

Пробірність

Переконайтеся, що аргументи методів проксі-серверів можна вибрати.

Безпека потоків проксі

Не використовуйте проксі-об’єкт із кількох потоків, якщо ви не захистите його за допомогою блокування.

(Ніколи не виникає проблем із різними процесами, які використовують той самий проксі.)

Приєднання до зомбованих процесів

On Unix when a process finishes but has not been joined it becomes a zombie. There should never be very many because each time a new process starts (or active_children() is called) all completed processes which have not yet been joined will be joined. Also calling a finished process’s Process.is_alive will join the process. Even so it is probably good practice to explicitly join all the processes that you start.

Краще успадкувати, ніж маринувати/розмаринувати

Під час використання методів запуску spawn або forkserver багато типів із multiprocessing мають бути доступними для вибору, щоб дочірні процеси могли їх використовувати. Однак зазвичай слід уникати надсилання спільних об’єктів іншим процесам за допомогою каналів або черг. Натомість ви повинні організувати програму так, щоб процес, якому потрібен доступ до спільного ресурсу, створеного в іншому місці, міг успадкувати його від процесу-предка.

Уникайте завершення процесів

Використання методу Process.terminate для зупинки процесу може призвести до того, що будь-які спільні ресурси (такі як блокування, семафори, канали та черги), які зараз використовуються цим процесом, стануть несправними або недоступними для інших процесів.

Тому, ймовірно, найкраще використовувати Process.terminate лише для процесів, які ніколи не використовують спільні ресурси.

Приєднання до процесів, які використовують черги

Майте на увазі, що процес, який поставив елементи в чергу, чекатиме перед завершенням, доки всі буферизовані елементи не будуть передані потоком «фідера» до основного каналу. (Дочірній процес може викликати метод Queue.cancel_join_thread черги, щоб уникнути такої поведінки.)

Це означає, що щоразу, коли ви використовуєте чергу, вам потрібно переконатися, що всі елементи, які було поставлено в чергу, зрештою буде видалено перед приєднанням до процесу. Інакше ви не можете бути впевнені, що процеси, які поставили елементи в чергу, завершаться. Пам’ятайте також, що недемонічні процеси будуть приєднані автоматично.

Прикладом, який призведе до взаємоблокування, є наступний:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

Виправити тут можна було б поміняти місцями останні два рядки (або просто видалити рядок p.join()).

Явно передати ресурси дочірнім процесам

On Unix using the fork start method, a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process.

Окрім того, що код (потенційно) сумісний із Windows та іншими методами запуску, це також гарантує, що поки дочірній процес живий, об’єкт не збиратиме сміття в батьківському процесі. Це може бути важливо, якщо якийсь ресурс звільняється, коли об’єкт збирається як сміття в батьківському процесі.

Так наприклад

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

слід переписати як

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

Остерігайтеся заміни sys.stdin на «файлоподібний об’єкт»

multiprocessing спочатку безумовно називався:

os.close(sys.stdin.fileno())

у методі multiprocessing.Process._bootstrap() — це призвело до проблем із процесами в процесах. Це було змінено на:

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

Which solves the fundamental issue of processes colliding with each other resulting in a bad file descriptor error, but introduces a potential danger to applications which replace sys.stdin() with a «file-like object» with output buffering. This danger is that if multiple processes call close() on this file-like object, it could result in the same data being flushed to the object multiple times, resulting in corruption.

Якщо ви пишете файлоподібний об’єкт і використовуєте власне кешування, ви можете зробити його безпечним для розгалуження, зберігаючи pid кожного разу, коли ви додаєте його до кешу, і відкидаючи кеш, коли pid змінюється. Наприклад:

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

Для отримання додаткової інформації перегляньте bpo-5155, bpo-5313 та bpo-5331

Методи запуску spawn і forkserver

There are a few extra restriction which don’t apply to the fork start method.

Більше маринування

Переконайтеся, що всі аргументи Process.__init__() можна вибрати. Крім того, якщо ви створите підклас Process, то переконайтеся, що екземпляри можна вибрати під час виклику методу Process.start.

Глобальні змінні

Майте на увазі, що якщо код, запущений у дочірньому процесі, намагається отримати доступ до глобальної змінної, тоді значення, яке він бачить (якщо таке є), може не збігатися зі значенням у батьківському процесі під час Process.start викликано.

Однак глобальні змінні, які є лише константами рівня модуля, не викликають проблем.

Безпечне імпортування основного модуля

Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).

Наприклад, використання методу запуску spawn або forkserver під час запуску наступного модуля призведе до помилки з RuntimeError:

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

Натомість слід захистити «точку входу» програми за допомогою if __name__ == '__main__': наступним чином:

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(Рядок freeze_support() можна опустити, якщо програма буде працювати нормально, а не зависати.)

Це дозволяє щойно створеному інтерпретатору Python безпечно імпортувати модуль, а потім запускати функцію foo() модуля.

Подібні обмеження застосовуються, якщо пул або менеджер створено в основному модулі.

Приклади

Демонстрація створення та використання налаштованих менеджерів і проксі-серверів:

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

Використання Pool:

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

Приклад, який показує, як використовувати черги для передачі завдань у колекцію робочих процесів і збору результатів:

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()