Первое упоминание о совместном¶
Kod źródłowy: Lib/multiprocessing/
Dostępność: not Android, not iOS, not WASI.
Этот модуль не поддерживается на мобильных платформах или платформах WebAssembly.
Wprowadzenie¶
multiprocessing
— это пакет, который поддерживает создание процессов с использованием API, аналогичного модулю threading
. Пакет multiprocessing
предлагает как локальный, так и удаленный параллелизм, эффективно обходя Global Interpreter Lock за счет использования подпроцессов вместо потоков. Благодаря этому модуль multiprocessing
позволяет программисту полностью использовать несколько процессоров на данной машине. Он работает как в POSIX, так и в Windows.
Модуль multiprocessing
також представляє API, які не мають аналогів у модулі threading
. Яскравим прикладом цього є об’єкт Pool
, який пропонує зручний засіб розпаралелювання виконання функції для кількох вхідних значень, розподіляючи вхідні дані між процесами (паралелізм даних). Наступний приклад демонструє звичайну практику визначення таких функцій у модулі, щоб дочірні процеси могли успішно імпортувати цей модуль. Цей базовий приклад паралелізму даних з використанням Pool
,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
друкуватиме стандартний вихід
[1, 4, 9]
Zobacz także
concurrent.futures.ProcessPoolExecutor
предлагает интерфейс более высокого уровня для передачи задач фоновому процессу без блокировки выполнения вызывающего процесса. По сравнению с использованием интерфейса Pool
напрямую, API concurrent.futures
позволяет с большей легкостью отделить отправку работы в базовый пул процессов от ожидания результатов.
Клас Process
¶
У multiprocessing
процеси породжуються шляхом створення об’єкта Process
і виклику його методу start()
. Process
відповідає API threading.Thread
. Тривіальним прикладом багатопроцесорної програми є:
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
Щоб показати ідентифікатори окремих процесів, ось розширений приклад:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
Щоб отримати пояснення, чому потрібна частина if __name__ == '__main__''
, див. Інструкції з програмування.
The arguments to Process
usually need to be unpickleable from within
the child process. If you tried typing the above example directly into a REPL it
could lead to an AttributeError
in the child process trying to locate the
f function in the __main__
module.
Контексти та методи запуску¶
Залежно від платформи multiprocessing
підтримує три способи запуску процесу. Це методи запуску
- spawn
Родительский процесс запускает новый процесс интерпретатора Python. Дочерний процесс унаследует только те ресурсы, которые необходимы для запуска метода
run()
объекта процесса. В частности, не будут наследоваться ненужные файловые дескрипторы и дескрипторы родительского процесса. Запуск процесса с использованием этого метода происходит довольно медленно по сравнению с использованием fork или forkserver.Доступно на платформах POSIX и Windows. По умолчанию в Windows и macOS.
- fork
Батьківський процес використовує
os.fork()
для розгалуження інтерпретатора Python. Дочірній процес, коли він починається, фактично ідентичний батьківському процесу. Усі ресурси батьківського процесу успадковуються дочірнім процесом. Зверніть увагу, що безпечне розгалуження багатопотокового процесу є проблематичним.Available on POSIX systems. Currently the default on POSIX except macOS.
Informacja
The default start method will change away from fork in Python 3.14. Code that requires fork should explicitly specify that via
get_context()
orset_start_method()
.Zmienione w wersji 3.12: Если Python способен обнаружить, что ваш процесс имеет несколько потоков, функция
os.fork()
, которую этот метод запуска вызывает внутри себя, вызоветDeprecationWarning
. Используйте другой метод запуска. Дополнительную информацию смотрите в документацииos.fork()
.
- forkserver
Когда программа запускается и выбирает метод запуска forkserver, запускается серверный процесс. С этого момента всякий раз, когда требуется новый процесс, родительский процесс подключается к серверу и запрашивает создание нового процесса. Серверный процесс fork является однопоточным, если только системные библиотеки или предварительно загруженный импорт не порождает потоки в качестве побочного эффекта, поэтому в целом безопасно использовать
os.fork()
. Никакие ненужные ресурсы не наследуются.Available on POSIX platforms which support passing file descriptors over Unix pipes such as Linux.
Zmienione w wersji 3.4: spawn добавлен на все платформы POSIX, а forkserver добавлен на некоторые платформы POSIX. Дочерние процессы больше не наследуют все наследуемые дескрипторы родительских процессов в Windows.
Zmienione w wersji 3.8: В macOS метод запуска spawn теперь используется по умолчанию. Метод запуска fork следует считать небезопасным, поскольку он может привести к сбою подпроцесса, поскольку системные библиотеки macOS могут запускать потоки. См. bpo-33725.
В POSIX использование методов запуска spawn или forkserver также запустит процесс отслеживания ресурсов, который отслеживает несвязанные именованные системные ресурсы (такие как именованные семафоры или объекты SharedMemory
), созданные процессы программы. Когда все процессы завершатся, средство отслеживания ресурсов отключает связь с любым оставшимся отслеживаемым объектом. Обычно их не должно быть, но если процесс был завершен по сигналу, могут быть некоторые «утечки» ресурсов. (Ни утекшие семафоры, ни сегменты общей памяти не будут автоматически отключены до следующей перезагрузки. Это проблематично для обоих объектов, поскольку система допускает только ограниченное количество именованных семафоров, а сегменты общей памяти занимают некоторое пространство в основной памяти.)
Щоб вибрати метод запуску, ви використовуєте set_start_method()
в пункті if __name__ == '__main__'
головного модуля. Наприклад:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method()
не слід використовувати більше одного разу в програмі.
Крім того, ви можете використовувати get_context()
, щоб отримати об’єкт контексту. Контекстні об’єкти мають той самий API, що й багатопроцесорний модуль, і дозволяють використовувати кілька методів запуску в одній програмі.
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
Зауважте, що об’єкти, пов’язані з одним контекстом, можуть бути несумісними з процесами для іншого контексту. Зокрема, блокування, створені за допомогою контексту fork, не можна передати процесам, запущеним за допомогою методів запуску spawn або forkserver.
Libraries using multiprocessing
or
ProcessPoolExecutor
should be designed to allow
their users to provide their own multiprocessing context. Using a specific
context of your own within a library can lead to incompatibilities with the
rest of the library user’s application. Always document if your library
requires a specific start method.
Ostrzeżenie
Методы запуска 'spawn'
и forkserver'
обычно не могут использоваться с „замороженными” исполняемыми файлами (т.е. двоичными файлами, созданными такими пакетами, как PyInstaller и cx_Freeze) в системах POSIX. . Метод start 'fork'
может работать, если код не использует потоки.
Обмін об’єктами між процесами¶
multiprocessing
підтримує два типи каналів зв’язку між процесами:
Queues
Клас
Queue
є майже клономqueue.Queue
. Наприклад:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()Очереди безопасны для потоков и процессов. Любой объект, помещенный в очередь
multiprocessing
, будет сериализован.
Pipes
Функція
Pipe()
повертає пару об’єктів з’єднання, з’єднаних трубою, яка за умовчанням є дуплексною (двосторонньою). Наприклад:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()Два об’єкти з’єднання, які повертає
Pipe()
, представляють два кінці труби. Кожен об’єкт підключення має методиsend()
іrecv()
(серед інших). Зауважте, що дані в каналі можуть бути пошкоджені, якщо два процеси (або потоки) намагаються зчитувати або писати в той самий кінець каналу одночасно. Звичайно, немає ризику пошкодження через процеси, що використовують різні кінці труби одночасно.Метод
send()
сериализует объект, аrecv()
воссоздает объект.
Синхронізація між процесами¶
multiprocessing
містить еквіваленти всіх примітивів синхронізації з threading
. Наприклад, можна використовувати блокування, щоб гарантувати, що лише один процес друкує на стандартний вивід одночасно:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
Без використання блокування вихідні дані різних процесів можуть переплутатися.
Використання пулу працівників¶
Клас Pool
представляє пул робочих процесів. Він має методи, які дозволяють розвантажувати завдання на робочі процеси кількома різними способами.
Dla przykładu:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 seconds
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
Зверніть увагу, що методи пулу повинні використовуватися тільки тим процесом, який його створив.
Informacja
Функціональність цього пакета вимагає, щоб модуль __main__
міг імпортуватися дочірніми елементами. Це описано в Інструкції з програмування, проте тут варто звернути увагу на це. Це означає, що деякі приклади, такі як приклади multiprocessing.pool.Pool
не працюватимуть в інтерактивному інтерпретаторі. Наприклад:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
(Якщо ви спробуєте це, він фактично виведе три повні трасування, чергувані напіввипадковим чином, і тоді вам, можливо, доведеться якось зупинити батьківський процес.)
Referensi¶
Пакет multiprocessing
здебільшого повторює API модуля threading
.
Process
і винятки¶
- class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶
Об’єкти процесу представляють діяльність, яка виконується в окремому процесі. Клас
Process
має еквіваленти всіх методівthreading.Thread
.Конструктор всегда следует вызывать с ключевыми аргументами. group всегда должно быть
None
; он существует исключительно для совместимости сthreading.Thread
. target — это вызываемый объект, который будет вызываться методомrun()
. По умолчанию установлено значение None, что означает, что ничего не вызывается. name — это имя процесса (подробнее см.name
). args — это кортеж аргументов для целевого вызова. kwargs — это словарь аргументов ключевых слов для целевого вызова. Если он указан, аргумент daemon, содержащий только ключевые слова, устанавливает флаг процессаdaemon
в значениеTrue
илиFalse
. Если установлено значение «Нет» (по умолчанию), этот флаг будет унаследован от процесса создания.По умолчанию аргументы target не передаются. Аргумент args, который по умолчанию имеет значение
()
, может использоваться для указания списка или кортежа аргументов, передаваемых в target.If a subclass overrides the constructor, it must make sure it invokes the base class constructor (
super().__init__()
) before doing anything else to the process.Informacja
In general, all arguments to
Process
must be picklable. This is frequently observed when trying to create aProcess
or use aconcurrent.futures.ProcessPoolExecutor
from a REPL with a locally defined target function.Passing a callable object defined in the current REPL session causes the child process to die via an uncaught
AttributeError
exception when starting as target must have been defined within an importable module in order to be loaded during unpickling.Example of this uncatchable error from the child:
>>> import multiprocessing as mp >>> def knigit(): ... print("Ni!") ... >>> process = mp.Process(target=knigit) >>> process.start() >>> Traceback (most recent call last): File ".../multiprocessing/spawn.py", line ..., in spawn_main File ".../multiprocessing/spawn.py", line ..., in _main AttributeError: module '__main__' has no attribute 'knigit' >>> process <SpawnProcess name='SpawnProcess-1' pid=379473 parent=378707 stopped exitcode=1>
See Методи запуску spawn і forkserver. While this restriction is not true if using the
"fork"
start method, as of Python3.14
that is no longer the default on any platform. See Контексти та методи запуску. See also gh-132898.Zmienione w wersji 3.3: Добавлен параметр daemon.
- run()¶
Метод, що представляє діяльність процесу.
Ви можете перевизначити цей метод у підкласі. Стандартний метод
run()
викликає викликаний об’єкт, переданий конструктору об’єкта як цільовий аргумент, якщо такий є, з послідовними аргументами та ключовими аргументами, взятими з аргументів args і kwargs відповідно.Использование списка или кортежа в качестве аргумента args, передаваемого в
Process
, дает тот же эффект.Przykład:
>>> from multiprocessing import Process >>> p = Process(target=print, args=[1]) >>> p.run() 1 >>> p = Process(target=print, args=(1,)) >>> p.run() 1
- start()¶
Запустіть процес.
Це має бути викликано щонайбільше один раз на об’єкт процесу. Він організовує виклик методу
run()
об’єкта в окремому процесі.
- join([timeout])¶
Якщо додатковий аргумент timeout має значення
None
(за замовчуванням), метод блокується, доки процес, чий методjoin()
викликається, не завершиться. Якщо timeout є додатним числом, воно блокує щонайбільше timeout секунд. Зауважте, що метод повертаєNone
, якщо його процес завершується або якщо метод закінчився. Перевіртеexitcode
процесу, щоб визначити, чи він завершився.До процесу можна приєднуватися багато разів.
Процес не може приєднатися до себе, оскільки це спричинить тупикову блокування. Спроба приєднатися до процесу до його запуску є помилкою.
- name¶
Назва процесу. Ім’я - це рядок, який використовується лише для ідентифікації. Він не має семантики. Кілька процесів можуть мати однакові назви.
Початкове ім’я задається конструктором. Якщо конструктору не надано явної назви, ім’я у формі «Process-N1:N2:…:Nk» побудований, де кожен Nk є N-м дочірнім елементом свого батька.
- is_alive()¶
Повернути, чи процес активний.
Приблизно, об’єкт процесу живий з моменту повернення методу
start()
до завершення дочірнього процесу.
- daemon¶
Прапор демона процесу, логічне значення. Це має бути встановлено перед викликом
start()
.Початкове значення успадковується від процесу створення.
Коли процес завершується, він намагається завершити всі свої демонічні дочірні процеси.
Зауважте, що демонічному процесу не дозволяється створювати дочірні процеси. Інакше демонічний процес залишив би своїх нащадків сиротами, якщо він буде припинений під час завершення процесу батьківського процесу. Крім того, це не демони чи служби Unix, це звичайні процеси, які будуть припинені (і не приєднані), якщо недемонічні процеси вийшли.
Окрім API
threading.Thread
, об’єктиProcess
також підтримують такі атрибути та методи:- pid¶
Поверніть ідентифікатор процесу. До того, як процес буде створено, це буде
None
.
- exitcode¶
Код виходу дитини. Це буде
None
, якщо процес ще не завершено.Якщо дочірній метод
run()
повернувся нормально, код виходу буде 0. Якщо він закінчився черезsys.exit()
із цілим аргументом N, код виходу буде N.Якщо дочірній процес завершився через виняток, який не було перехоплено в
run()
, код виходу буде 1. Якщо його було завершено сигналом N, код виходу матиме від’ємне значення -N.
- authkey¶
Ключ автентифікації процесу (рядок байтів).
Коли
multiprocessing
ініціалізовано, головному процесу призначається випадковий рядок за допомогоюos.urandom()
.Коли об’єкт
Process
створюється, він успадковує ключ автентифікації свого батьківського процесу, хоча це можна змінити, встановившиauthkey
інший байтовий рядок.Перегляньте Ключі автентифікації.
- sentinel¶
Числовий дескриптор системного об’єкта, який стане „готовим” після завершення процесу.
Вы можете использовать это значение, если хотите одновременно ожидать нескольких событий, используя
multiprocessing.connection.wait()
. В противном случае проще вызватьjoin()
.В Windows это дескриптор ОС, который можно использовать с семейством вызовов API WaitForSingleObject и WaitForMultipleObjects. В POSIX это файловый дескриптор, используемый с примитивами из модуля
select
.Dodane w wersji 3.3.
- terminate()¶
Завершите процесс. В POSIX это делается с помощью сигнала
SIGTERM
; в Windows используетсяTerminateProcess()
. Обратите внимание, что обработчики выхода, предложенияfinally и т. д. не будут выполнены.Зверніть увагу, що процеси-нащадки процесу не будуть припинені – вони просто стануть сиротами.
Ostrzeżenie
Якщо цей метод використовується, коли пов’язаний процес використовує канал або чергу, канал або чергу можуть бути пошкоджені та можуть стати непридатними для використання іншим процесом. Подібним чином, якщо процес отримав блокування або семафор тощо, його завершення може призвести до блокування інших процесів.
- kill()¶
То же, что и
terminate()
, но с использованием сигналаSIGKILL
в POSIX.Dodane w wersji 3.7.
- close()¶
Закрийте об’єкт
Process
, звільнивши всі пов’язані з ним ресурси.ValueError
виникає, якщо основний процес все ще виконується. Після успішного поверненняclose()
більшість інших методів і атрибутів об’єктаProcess
викличутьValueError
.Dodane w wersji 3.7.
Зауважте, що методи
start()
,join()
,is_alive()
,terminate()
іexitcode
має викликати лише процес, який створив об’єкт процесу .Приклад використання деяких методів
Process
:>>> import multiprocessing, time, signal >>> mp_context = multiprocessing.get_context('spawn') >>> p = mp_context.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <...Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <...Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <...Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
- exception multiprocessing.ProcessError¶
Базовий клас усіх винятків
multiprocessing
.
- exception multiprocessing.BufferTooShort¶
Исключение, вызываемое
Connection.recv_bytes_into()
, когда предоставленный объект буфера слишком мал для чтения сообщения.Якщо
e
є екземпляромBufferTooShort
, тоe.args[0]
видасть повідомлення як рядок байтів.
- exception multiprocessing.AuthenticationError¶
Викликається, коли виникає помилка автентифікації.
- exception multiprocessing.TimeoutError¶
Викликається методами з тайм-аутом, коли час очікування закінчується.
Труби та черги¶
При використанні кількох процесів зазвичай використовується передача повідомлень для зв’язку між процесами та уникається використання будь-яких примітивів синхронізації, таких як блокування.
Для передачі повідомлень можна використовувати Pipe()
(для з’єднання між двома процесами) або чергу (що дозволяє використовувати кілька виробників і споживачів).
Типи Queue
, SimpleQueue
і JoinableQueue
є чергами багатьох виробників і споживачів FIFO, змодельованими на основі queue.Queue
клас у стандартній бібліотеці. Вони відрізняються тим, що Queue
не має методів task_done()
і join()
, представлених у queue.Queue
Python 2.5 клас.
Якщо ви використовуєте JoinableQueue
, тоді ви має викликати JoinableQueue.task_done()
для кожного завдання, вилученого з черги, інакше семафор, який використовується для підрахунку кількості незавершених завдань, може зрештою переповнитися, викликаючи виняток.
Одним из отличий от других реализаций очередей Python является то, что очереди multiprocessing
сериализуют все объекты, которые помещаются в них, с помощью pickle
. Объект, возвращаемый методом get, представляет собой воссозданный объект, который не использует общую память с исходным объектом.
Зауважте, що можна також створити спільну чергу за допомогою об’єкта менеджера – див. Менеджери.
Informacja
multiprocessing
використовує звичайні винятки queue.Empty
і queue.Full
, щоб сигналізувати про час очікування. Вони недоступні в просторі імен multiprocessing
, тому їх потрібно імпортувати з queue
.
Informacja
Коли об’єкт ставиться в чергу, об’єкт очищається, а фоновий потік пізніше скидає обрані дані в базовий канал. Це має деякі наслідки, які є трохи дивними, але не повинні викликати жодних практичних труднощів - якщо вони дійсно вас турбують, ви можете натомість використати чергу, створену за допомогою manager.
Після розміщення об’єкта в порожній черзі може виникнути нескінченно мала затримка, перш ніж метод
empty()
черги повернеFalse
, аget_nowait()
зможе повернутися без викликуqueue.Empty
.Якщо кілька процесів ставлять об’єкти в чергу, об’єкти можуть бути отримані на іншому кінці не за порядком. Однак об’єкти, поставлені в чергу одним і тим же процесом, завжди будуть в очікуваному порядку один відносно одного.
Ostrzeżenie
Якщо процес зупинено за допомогою Process.terminate()
або os.kill()
під час спроби використання Queue
, то дані в черзі, ймовірно, будуть пошкоджені. Це може призвести до того, що будь-який інший процес отримає виняток, коли він спробує використати чергу пізніше.
Ostrzeżenie
Як згадувалося вище, якщо дочірній процес поставив елементи в чергу (і він не використовував JoinableQueue.cancel_join_thread
), тоді цей процес не завершиться, доки всі буферизовані елементи не будуть скинуті в канал.
Це означає, що якщо ви спробуєте приєднатися до цього процесу, ви можете отримати тупикову блокування, якщо ви не впевнені, що всі елементи, які були поставлені в чергу, використано. Так само, якщо дочірній процес є недемонічним, тоді батьківський процес може зависнути при виході, коли він намагається приєднатися до всіх своїх недемонічних дочірніх процесів.
Зауважте, що черга, створена за допомогою менеджера, не має цієї проблеми. Дивіться Інструкції з програмування.
Для прикладу використання черг для міжпроцесного зв’язку див. Przykłady.
- multiprocessing.Pipe([duplex])¶
Повертає пару об’єктів
(conn1, conn2)
Connection
, що представляють кінці труби.Якщо duplex має значення
True
(за замовчуванням), тоді канал є двонаправленим. Якщо duplex має значенняFalse
, тоді канал є односпрямованим:conn1
можна використовувати лише для отримання повідомлень, аconn2
— лише для надсилання повідомлень.Метод
send()
сериализует объект с помощьюpickle
, аrecv()
воссоздает объект.
- class multiprocessing.Queue([maxsize])¶
Повертає спільну чергу процесу, реалізовану за допомогою каналу та кількох блокувань/семафорів. Коли процес вперше ставить елемент у чергу, запускається потік, який передає об’єкти з буфера в канал.
Звичайні винятки
queue.Empty
іqueue.Full
із модуляqueue
стандартної бібліотеки створюються, щоб повідомити про час очікування.Queue
реалізує всі методиqueue.Queue
, крімtask_done()
іjoin()
.- qsize()¶
Повертає приблизний розмір черги. Через семантику багатопоточності/багатопроцесорності це число ненадійне.
Обратите внимание, что это может вызвать ошибку
NotImplementedError
на таких платформах, как macOS, гдеsem_getvalue()
не реализован.
- empty()¶
Повертає
True
, якщо черга порожня,False
інакше. Через семантику багатопоточності/багатопроцесорності це ненадійно.Может вызвать ошибку
OSError
в закрытых очередях. (не гарантировано)
- full()¶
Повертає
True
, якщо черга заповнена,False
інакше. Через семантику багатопоточності/багатопроцесорності це ненадійно.
- put(obj[, block[, timeout]])¶
Помістіть obj у чергу. Якщо необов’язковий аргумент block має значення
True
(за замовчуванням), а timeout має значенняNone
(за замовчуванням), за потреби блокуйте, доки не з’явиться вільний слот. Якщо timeout є додатним числом, він блокує щонайбільше timeout секунд і викликає винятокqueue.Full
, якщо протягом цього часу не було вільного місця. В іншому випадку (block має значенняFalse
), помістіть елемент у чергу, якщо вільний слот є негайно доступним, інакше викликайте винятокqueue.Full
(timeout у цьому випадку ігнорується).Zmienione w wersji 3.8: Якщо чергу закрито, замість
AssertionError
виникаєValueError
.
- put_nowait(obj)¶
Еквівалент
put(obj, False)
.
- get([block[, timeout]])¶
Видалити та повернути елемент із черги. Якщо необов’язкові аргументи block мають значення
True
(за замовчуванням), а timeout —None
(за замовчуванням), за потреби блокуйте, доки елемент не стане доступним. Якщо timeout є додатним числом, він блокує щонайбільше timeout секунд і викликає винятокqueue.Empty
, якщо жоден елемент не був доступний протягом цього часу. В іншому випадку (блок має значенняFalse
), повертає елемент, якщо він одразу доступний, інакше викликає винятокqueue.Empty
(у цьому випадку timeout ігнорується).Zmienione w wersji 3.8: Якщо чергу закрито, замість
OSError
виникаєValueError
.
- get_nowait()¶
Еквівалент
get(False)
.
multiprocessing.Queue
має кілька додаткових методів, яких немає вqueue.Queue
. Ці методи зазвичай непотрібні для більшості коду:- close()¶
Закрийте чергу: звільніть внутрішні ресурси.
A queue must not be used anymore after it is closed. For example,
get()
,put()
andempty()
methods must no longer be called.The background thread will quit once it has flushed all buffered data to the pipe. This is called automatically when the queue is garbage collected.
- join_thread()¶
Приєднайтеся до фонової нитки. Це можна використовувати лише після виклику
close()
. Він блокується, доки не завершиться фоновий потік, гарантуючи, що всі дані в буфері скинуті в канал.За замовчуванням, якщо процес не є творцем черги, після виходу він спробує приєднатися до фонового потоку черги. Процес може викликати
cancel_join_thread()
, щоб змуситиjoin_thread()
нічого не робити.
- cancel_join_thread()¶
Запобігти блокуванню
join_thread()
. Зокрема, це запобігає автоматичному приєднанню фонового потоку під час завершення процесу – див.join_thread()
.Кращою назвою для цього методу може бути
allow_exit_without_flush()
. Ймовірно, це спричинить втрату даних у черзі, і вам майже напевно не потрібно буде їх використовувати. Насправді він доступний лише тоді, коли вам потрібно, щоб поточний процес завершився негайно, не чекаючи, щоб скинути дані з черги в основний канал, і ви не дбаєте про втрачені дані.
Informacja
Функціональність цього класу вимагає функціонуючої спільної реалізації семафора в головній операційній системі. Без нього функціональні можливості цього класу будуть вимкнені, а спроби створити екземпляр
Queue
призведуть доImportError
. Додаткову інформацію див. bpo-3770. Те саме стосується будь-якого зі спеціалізованих типів черги, перелічених нижче.
- class multiprocessing.SimpleQueue¶
Це спрощений тип
Queue
, дуже схожий на заблокованийPipe
.- close()¶
Закрийте чергу: звільніть внутрішні ресурси.
Після закриття чергу більше не можна використовувати. Наприклад, методи
get()
,put()
іempty()
більше не можна викликати.Dodane w wersji 3.9.
- empty()¶
Повертає
True
, якщо черга порожня,False
інакше.Всегда вызывает ошибку
OSError
, если SimpleQueue закрыта.
- get()¶
Видалити та повернути елемент із черги.
- put(item)¶
Поставте товар в чергу.
- class multiprocessing.JoinableQueue([maxsize])¶
JoinableQueue
, підкласQueue
, це черга, яка додатково має методиtask_done()
іjoin()
.- task_done()¶
Вказує на те, що завдання, яке раніше було в черзі, виконано. Використовується споживачами черги. Для кожного
get()
, який використовується для отримання завдання, наступний викликtask_done()
повідомляє черзі, що обробку завдання завершено.Якщо
join()
зараз блокується, воно відновиться, коли всі елементи будуть оброблені (це означає, що викликtask_done()
отримано для кожного елемента, який бувput()
в чергу).Викликає
ValueError
, якщо викликається стільки разів, скільки було елементів у черзі.
- join()¶
Блокуйте, доки не буде отримано та оброблено всі елементи в черзі.
Кількість незавершених завдань зростає щоразу, коли елемент додається до черги. Підрахунок зменшується щоразу, коли споживач викликає
task_done()
, щоб вказати, що елемент отримано та вся робота над ним завершена. Коли кількість незавершених завдань падає до нуля,join()
розблоковується.
Diğer¶
- multiprocessing.active_children()¶
Повернути список усіх живих дітей поточного процесу.
Виклик цього має побічний ефект „приєднання” до будь-яких процесів, які вже завершилися.
- multiprocessing.cpu_count()¶
Повертає кількість процесорів у системі.
Это число не эквивалентно количеству процессоров, которые может использовать текущий процесс. Количество используемых процессоров можно получить с помощью
os.process_cpu_count()
(илиlen(os.sched_getaffinity(0))
).Коли кількість ЦП не може бути визначена, виникає
NotImplementedError
.Zobacz także
Zmienione w wersji 3.13: Возвращаемое значение также можно переопределить с помощью флага
-X cpu_count
илиPYTHON_CPU_COUNT
, поскольку это всего лишь оболочка API-интерфейсовos
подсчета процессоров.
- multiprocessing.current_process()¶
Повертає об’єкт
Process
, що відповідає поточному процесу.Аналог
threading.current_thread()
.
- multiprocessing.parent_process()¶
Повертає об’єкт
Process
, що відповідає батьківському процесуcurrent_process()
. Для основного процесуparent_process
будеNone
.Dodane w wersji 3.8.
- multiprocessing.freeze_support()¶
增加对于使用
multiprocessing
的程序已被冻结以产生可执行文件的支持。 (针对 py2exe, PyInstaller 和 cx_Freeze 时行了测试。)Цю функцію потрібно викликати відразу після рядка
if __name__ == '__main__'
головного модуля. Наприклад:from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
Якщо рядок
freeze_support()
пропущено, спроба запустити заморожений виконуваний файл викличеRuntimeError
.当启动方法不是 spawn 时调用
freeze_support()
不会有效果。 此外,如果该模块被 Python 解释器正常运行(程序未被冻结),则freeze_support()
也不会有效果。
- multiprocessing.get_all_start_methods()¶
Возвращает список поддерживаемых методов запуска, первый из которых используется по умолчанию. Возможные методы запуска: fork, spawn и forkserver. Не все платформы поддерживают все методы. См. мультипроцессорные-стартовые методы.
Dodane w wersji 3.4.
- multiprocessing.get_context(method=None)¶
Повертає об’єкт контексту, який має ті самі атрибути, що й модуль
multiprocessing
.If method is
None
then the default context is returned. Note that if the global start method has not been set, this will set it to the default method. Otherwise method should be'fork'
,'spawn'
,'forkserver'
.ValueError
is raised if the specified start method is not available. See Контексти та методи запуску.Dodane w wersji 3.4.
- multiprocessing.get_start_method(allow_none=False)¶
Повертає назву методу запуску, який використовується для запуску процесів.
If the global start method has not been set and allow_none is
False
, then the start method is set to the default and the name is returned. If the start method has not been set and allow_none isTrue
thenNone
is returned.Возвращаемое значение может быть
'fork'
,'spawn'
,'forkserver'
илиNone
. См. мультипроцессорные-стартовые методы.Dodane w wersji 3.4.
Zmienione w wersji 3.8: У macOS метод запуску spawn тепер є типовим. Метод запуску fork слід вважати небезпечним, оскільки він може призвести до збою підпроцесу. Див. bpo-33725.
- multiprocessing.set_executable(executable)¶
Встановіть шлях інтерпретатора Python для використання під час запуску дочірнього процесу. (За замовчуванням використовується
sys.executable
). Вбудовувачі, ймовірно, повинні будуть зробити щось на зразок:set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
перш ніж вони зможуть створити дочірні процеси.
Zmienione w wersji 3.4: Теперь поддерживается в POSIX при использовании метода запуска spawn.
Zmienione w wersji 3.11: Accepts a path-like object.
- multiprocessing.set_forkserver_preload(module_names)¶
Установите список имен модулей для основного процесса forkserver, который будет пытаться импортировать, чтобы их уже импортированное состояние наследовалось разветвленными процессами. Любой
ImportError
при этом игнорируется. Это можно использовать для повышения производительности, чтобы избежать повторной работы в каждом процессе.Чтобы это работало, его необходимо вызвать до запуска процесса forkserver (перед созданием
Pool
или запускомProcess
).Имеет смысл только при использовании метода запуска forkserver. См. мультипроцессорные-стартовые методы.
Dodane w wersji 3.4.
- multiprocessing.set_start_method(method, force=False)¶
Установите метод, который должен использоваться для запуска дочерних процессов. Аргументом method может быть
'fork'
,'spawn'
или'forkserver'
. ВызываетRuntimeError
, если метод запуска уже установлен и force не имеет значенияTrue
. Если method имеет значение «None» и force имеет значение «True», то для метода запуска устанавливается значение «None». Если method имеет значение None, а force имеет значение False, тогда контекст устанавливается в контекст по умолчанию.Зверніть увагу, що це має бути викликано щонайбільше один раз, і його слід захистити всередині пропозиції
if __name__ == '__main__'
головного модуля.См. Контексти та методи запуску.
Dodane w wersji 3.4.
Informacja
multiprocessing
не містить аналогів threading.active_count()
, threading.enumerate()
, threading.settrace()
, threading.setprofile()
, threading.Timer
або threading.local
.
Objek Koneksi¶
Об’єкти підключення дозволяють надсилати та отримувати об’єкти або рядки, які можна вибрати. Їх можна розглядати як підключені сокети, орієнтовані на повідомлення.
Об’єкти підключення зазвичай створюються за допомогою Pipe
– див. також Слухачі та клієнти.
- class multiprocessing.connection.Connection¶
- send(obj)¶
Надішліть об’єкт на інший кінець з’єднання, який слід прочитати за допомогою
recv()
.Об’єкт має бути маринованим. Дуже великі pickles (приблизно 32 MiB+, хоча це залежить від ОС) можуть викликати виняток
ValueError
.
- recv()¶
Повернути об’єкт, надісланий з іншого кінця з’єднання за допомогою
send()
. Блокує, поки не буде що отримати. ВикликаєEOFError
, якщо не залишилося нічого для отримання, а інший кінець був закритий.
- fileno()¶
Повертає дескриптор файлу або дескриптор, який використовується підключенням.
- close()¶
Закрийте з’єднання.
Це викликається автоматично, коли підключення збирається сміттям.
- poll([timeout])¶
Повернути, чи є дані для читання.
Якщо timeout не вказано, він негайно повернеться. Якщо timeout є числом, це вказує максимальний час у секундах для блокування. Якщо timeout має значення
None
, тоді використовується нескінченний тайм-аут.Зауважте, що кілька об’єктів з’єднання можна опитувати одночасно за допомогою
multiprocessing.connection.wait()
.
- send_bytes(buffer[, offset[, size]])¶
Надіслати байтові дані з bytes-like object як повне повідомлення.
Якщо вказано зсув, дані зчитуються з цієї позиції в буфері. Якщо задано size, то стільки байтів буде прочитано з буфера. Дуже великі буфери (приблизно 32 MiB+, хоча це залежить від ОС) можуть викликати виключення
ValueError
- recv_bytes([maxlength])¶
Повертає повне повідомлення байтових даних, надісланих з іншого кінця з’єднання, у вигляді рядка. Блокує, поки не буде що отримати. Викликає
EOFError
, якщо не залишилося нічого для отримання, а інший кінець закрито.Якщо вказано maxlength і повідомлення довше, ніж maxlength, тоді виникає
OSError
і з’єднання більше не читається.
- recv_bytes_into(buffer[, offset])¶
Прочитайте в buffer повне повідомлення байтових даних, надісланих з іншого кінця з’єднання, і поверніть кількість байтів у повідомленні. Блокує, поки не буде що отримати. Викликає
EOFError
, якщо не залишилося нічого для отримання, а інший кінець був закритий.buffer має бути доступним для запису bytes-like object. Якщо задано offset, повідомлення буде записано в буфер із цієї позиції. Зсув має бути невід’ємним цілим числом, меншим за довжину буфера (у байтах).
Якщо буфер закороткий, виникає виняток
BufferTooShort
, і повне повідомлення доступне якe.args[0]
, деe
є винятком.
Zmienione w wersji 3.3: Самі об’єкти підключення тепер можна передавати між процесами за допомогою
Connection.send()
іConnection.recv()
.Объекты соединения теперь также поддерживают протокол управления контекстом — см. Bağlam Yöneticisi Türleri.
__enter__()
возвращает объект соединения, а__exit__()
вызываетclose()
.
Na przykład:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Ostrzeżenie
Метод Connection.recv()
автоматично видаляє отримані дані, що може становити загрозу безпеці, якщо ви не можете довіряти процесу, який надіслав повідомлення.
Таким чином, якщо об’єкт підключення не було створено за допомогою Pipe()
, ви повинні використовувати лише методи recv()
і send()
після виконання певної автентифікації. Перегляньте Ключі автентифікації.
Ostrzeżenie
Якщо процес зупиняється під час спроби читання або запису в канал, то дані в каналі, ймовірно, будуть пошкоджені, тому що може стати неможливо точно визначити, де пролягають межі повідомлення.
Примітиви синхронізації¶
Зазвичай примітиви синхронізації не такі необхідні в багатопроцесовій програмі, як у багатопотоковій програмі. Перегляньте документацію для модуля threading
.
Зауважте, що можна також створити примітиви синхронізації за допомогою об’єкта менеджера – див. Менеджери.
- class multiprocessing.Barrier(parties[, action[, timeout]])¶
Бар’єрний об’єкт: клон
threading.Barrier
.Dodane w wersji 3.3.
- class multiprocessing.BoundedSemaphore([value])¶
Обмежений семафорний об’єкт: близький аналог
threading.BoundedSemaphore
.Існує єдина відмінність від його близького аналога: перший аргумент методу
acquire
має назву block, що узгоджується зLock.acquire()
.Informacja
У macOS це неможливо відрізнити від
Semaphore
, оскількиsem_getvalue()
не реалізовано на цій платформі.
- class multiprocessing.Condition([lock])¶
Змінна умови: псевдонім для
threading.Condition
.Якщо вказано lock, це має бути об’єкт
Lock
абоRLock
ізmultiprocessing
.Zmienione w wersji 3.3: Додано метод
wait_for()
.
- class multiprocessing.Event¶
Клон
threading.Event
.
- class multiprocessing.Lock¶
Нерекурсивний об’єкт блокування: близький аналог
threading.Lock
. Після того, як процес або потік отримав блокування, наступні спроби отримати його від будь-якого процесу або потоку будуть блокуватися, доки його не буде звільнено; будь-який процес або потік може його звільнити. Концепції та поведінкаthreading.Lock
, які застосовуються до потоків, відтворені тут уmultiprocessing.Lock
, оскільки вони застосовуються до процесів або потоків, за винятком зазначених випадків.Зауважте, що
Lock
насправді є фабричною функцією, яка повертає примірникmultiprocessing.synchronize.Lock
, ініціалізований контекстом за замовчуванням.Lock
підтримує протокол context manager і тому може використовуватися вwith
операторах.- acquire(block=True, timeout=None)¶
Отримайте блокування, блокування або неблокування.
Якщо для аргументу block встановлено значення
True
(за замовчуванням), виклик методу блокуватиметься, доки блокування не перейде в розблокований стан, а потім встановлюватиме для нього значенняTrue
і повертатимеTrue
. Зауважте, що назва цього першого аргументу відрізняється від такої вthreading.Lock.acquire()
.Якщо для аргументу block встановлено значення
False
, виклик методу не блокується. Якщо блокування зараз у заблокованому стані, повернітьFalse
; інакше встановіть блокування в заблокований стан і повернітьTrue
.При виклику з додатним значенням з плаваючою комою для timeout, блокувати щонайбільше на кількість секунд, визначену timeout, доки не вдасться отримати блокування. Виклики з від’ємним значенням для timeout еквівалентні timeout рівному нулю. Виклики зі значенням timeout
None
(за замовчуванням) встановлюють період очікування як нескінченний. Зауважте, що обробка негативних значень або значеньNone
для timeout відрізняється від реалізованої поведінки вthreading.Lock.acquire()
. Аргумент timeout не має практичного значення, якщо для аргументу block встановлено значенняFalse
і, таким чином, він ігнорується. ПовертаєTrue
, якщо блокування отримано, абоFalse
, якщо період очікування минув.
- release()¶
Відпустіть блокування. Це може бути викликано з будь-якого процесу або потоку, а не тільки процесу або потоку, який спочатку отримав блокування.
Поведінка така ж, як у
threading.Lock.release()
, за винятком того, що під час виклику для розблокованого блокування виникаєValueError
.
- class multiprocessing.RLock¶
Об’єкт рекурсивного блокування: близький аналог
threading.RLock
. Рекурсивне блокування має бути звільнено процесом або потоком, який його отримав. Після того як процес або потік отримав рекурсивне блокування, той самий процес або потік може отримати його знову без блокування; цей процес або потік повинен випускати його один раз за кожен раз, коли його було отримано.Зауважте, що
RLock
насправді є фабричною функцією, яка повертає екземплярmultiprocessing.synchronize.RLock
, ініціалізований контекстом за замовчуванням.RLock
підтримує протокол context manager і тому може використовуватися вwith
операторах.- acquire(block=True, timeout=None)¶
Отримайте блокування, блокування або неблокування.
При виклику з аргументом block, встановленим у значення
True
, блокувати, доки блокування не буде в розблокованому стані (не належить жодному процесу або потоку), якщо блокування вже не належить поточному процесу або потоку. Тоді поточний процес або потік отримує право власності на блокування (якщо він ще не має права власності), а рівень рекурсії всередині блокування збільшується на одиницю, що призводить до повернення значенняTrue
. Зауважте, що є кілька відмінностей у поведінці цього першого аргументу порівняно з реалізацієюthreading.RLock.acquire()
, починаючи з назви самого аргументу.При виклику з аргументом block, встановленим на
False
, не блокувати. Якщо блокування вже було отримано (і, отже, ним володіє) інший процес або потік, поточний процес або потік не приймає права власності, а рівень рекурсії в межах блокування не змінюється, що призводить до повернення значенняFalse
. Якщо блокування знаходиться в розблокованому стані, поточний процес або потік приймає право власності, і рівень рекурсії збільшується, що призводить до повернення значенняTrue
.Використання та поведінка аргументу timeout такі ж, як і в
Lock.acquire()
. Зауважте, що деякі з цих дій timeout відрізняються від реалізованих уthreading.RLock.acquire()
.
- release()¶
Зніміть блокування, зменшивши рівень рекурсії. Якщо після зменшення рівень рекурсії дорівнює нулю, скиньте блокування до розблокованого (не належить жодному процесу чи потоку), а якщо будь-які інші процеси чи потоки заблоковані в очікуванні розблокування блокування, дозвольте рівно одному з них продовжити. Якщо після декременту рівень рекурсії все ще ненульовий, блокування залишається заблокованим і належить процесу або потоку, що викликає.
Викликайте цей метод лише тоді, коли процес або потік, що викликає, володіє блокуванням. Помилка
AssertionError
виникає, якщо цей метод викликається процесом або потоком, відмінним від власника, або якщо блокування знаходиться в розблокованому стані (не належить). Зауважте, що тип винятку, викликаного в цій ситуації, відрізняється від реалізованої поведінки вthreading.RLock.release()
.
- class multiprocessing.Semaphore([value])¶
Об’єкт семафор: близький аналог
threading.Semaphore
.Існує єдина відмінність від його близького аналога: перший аргумент методу
acquire
має назву block, що узгоджується зLock.acquire()
.
Informacja
У macOS sem_timedwait
не підтримується, тому виклик acquire()
із тайм-аутом буде емулювати поведінку цієї функції за допомогою циклу сну.
Informacja
Для деяких функцій цього пакета необхідна функціональна реалізація спільного семафора в головній операційній системі. Без нього модуль multiprocessing.synchronize
буде вимкнено, а спроби його імпортувати призведуть до ImportError
. Додаткову інформацію див. bpo-3770.
Менеджери¶
Менеджери надають можливість створювати дані, якими можна ділитися між різними процесами, включно з загальним доступом через мережу між процесами, що виконуються на різних машинах. Об’єкт менеджера контролює серверний процес, який керує спільними об’єктами. Інші процеси можуть отримати доступ до спільних об’єктів за допомогою проксі-серверів.
- multiprocessing.Manager()¶
Повертає запущений об’єкт
SyncManager
, який можна використовувати для спільного використання об’єктів між процесами. Повернений об’єкт менеджера відповідає породженому дочірньому процесу та має методи, які створюватимуть спільні об’єкти та повертатимуть відповідні проксі-сервери.
Процеси менеджера буде закрито, щойно їх буде зібрано сміття або їхній батьківський процес завершиться. Класи менеджерів визначені в модулі multiprocessing.managers
:
- class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)¶
Створіть об’єкт BaseManager.
Після створення потрібно викликати
start()
абоget_server().serve_forever()
, щоб переконатися, що об’єкт менеджера посилається на запущений процес менеджера.адреса — це адреса, на якій процес менеджера прослуховує нові підключення. Якщо адреса має значення
None
, тоді вибирається довільна адреса.authkey — це ключ автентифікації, який використовуватиметься для перевірки дійсності вхідних підключень до процесу сервера. Якщо authkey має значення
None
, тоді використовуєтьсяcurrent_process().authkey
. В іншому випадку використовується authkey, і це має бути рядок байтів.serializer должен быть
'pickle'
(используйте сериализациюpickle
) или'xmlrpclib'
(используйте сериализациюxmlrpc.client
).ctx — объект контекста или «Нет» (используйте текущий контекст). См. функцию
get_context()
.shutdown_timeout — это тайм-аут в секундах, используемый для ожидания завершения процесса, используемого менеджером, в методе
shutdown()
. Если время выключения истекло, процесс завершается. Если время завершения процесса также истекает, процесс завершается.Zmienione w wersji 3.11: Добавлен параметр shutdown_timeout.
- start([initializer[, initargs]])¶
Запустіть підпроцес, щоб запустити менеджер. Якщо initializer не
None
, тоді підпроцес викличеinitializer(*initargs)
під час запуску.
- get_server()¶
Повертає об’єкт
Server
, який представляє фактичний сервер під керуванням менеджера. Об’єктServer
підтримує методserve_forever()
:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
додатково має атрибутaddress
.
- connect()¶
Підключіть об’єкт локального менеджера до віддаленого процесу менеджера:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
- shutdown()¶
Зупиніть процес, який використовує менеджер. Це доступно, лише якщо
start()
було використано для запуску процесу сервера.Це можна викликати кілька разів.
- register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶
Метод класу, який можна використовувати для реєстрації типу або виклику в класі менеджера.
typeid — це „ідентифікатор типу”, який використовується для ідентифікації певного типу спільного об’єкта. Це має бути рядок.
callable — це виклик, який використовується для створення об’єктів для ідентифікатора цього типу. Якщо екземпляр менеджера буде підключено до сервера за допомогою методу
connect()
або якщо аргумент create_method має значенняFalse
, тоді це можна залишити якNone
.proxytype є підкласом
BaseProxy
, який використовується для створення проксі для спільних об’єктів із цим typeid. ЯкщоNone
, то проксі-клас створюється автоматично.exposed використовується для визначення послідовності назв методів, до яких проксі-серверам для цього typeid має бути дозволено доступ за допомогою
BaseProxy._callmethod()
. (Якщо exposed має значенняNone
, тоді замість нього використовуєтьсяproxytype._exposed_
, якщо він існує.) У випадку, коли відкритий список не вказано, усі „загальнодоступні методи” спільного об’єкта будуть доступними. . (Тут „публічний метод” означає будь-який атрибут, який має метод__call__()
і ім’я якого не починається з'_'
.)method_to_typeid — це зіставлення, яке використовується для визначення типу повернення тих відкритих методів, які мають повертати проксі. Він відображає назви методів у рядках typeid. (Якщо method_to_typeid має значення
None
, тоді замість нього використовуєтьсяproxytype._method_to_typeid_
, якщо він існує.) Якщо назва методу не є ключем цього відображення або якщо відображення має значенняNone
, тоді об’єкт, повернутий методом, буде скопійовано за значенням.create_method визначає, чи слід створювати метод з іменем typeid, яке можна використовувати, щоб наказати серверному процесу створити новий спільний об’єкт і повернути для нього проксі. За замовчуванням це
True
.
Екземпляри
BaseManager
також мають одну властивість лише для читання:- address¶
Адреса, яку використовує менеджер.
Zmienione w wersji 3.3: Об’єкти менеджера підтримують протокол керування контекстом – див. Bağlam Yöneticisi Türleri.
__enter__()
запускає серверний процес (якщо він ще не запущений), а потім повертає об’єкт менеджера.__exit__()
викликаєshutdown()
.У попередніх версіях
__enter__()
не запускав серверний процес менеджера, якщо він ще не був запущений.
- class multiprocessing.managers.SyncManager¶
Підклас
BaseManager
, який можна використовувати для синхронізації процесів. Об’єкти цього типу повертаєmultiprocessing.Manager()
.Його методи створюють і повертають Проксі об’єкти для ряду типів даних, які зазвичай використовуються, щоб синхронізувати між процесами. Це, зокрема, включає спільні списки та словники.
- Barrier(parties[, action[, timeout]])¶
Створіть спільний об’єкт
threading.Barrier
і поверніть для нього проксі.Dodane w wersji 3.3.
- BoundedSemaphore([value])¶
Створіть спільний об’єкт
threading.BoundedSemaphore
і поверніть для нього проксі.
- Condition([lock])¶
Створіть спільний об’єкт
threading.Condition
і поверніть для нього проксі.Якщо вказано lock, це має бути проксі для об’єкта
threading.Lock
абоthreading.RLock
.Zmienione w wersji 3.3: Додано метод
wait_for()
.
- Event()¶
Створіть спільний об’єкт
threading.Event
і поверніть для нього проксі.
- Lock()¶
Створіть спільний об’єкт
threading.Lock
і поверніть для нього проксі.
- Queue([maxsize])¶
Створіть спільний об’єкт
queue.Queue
і поверніть для нього проксі.
- RLock()¶
Створіть спільний об’єкт
threading.RLock
і поверніть для нього проксі.
- Semaphore([value])¶
Створіть спільний об’єкт
threading.Semaphore
і поверніть для нього проксі.
- Array(typecode, sequence)¶
Створіть масив і поверніть для нього проксі.
- Value(typecode, value)¶
Створіть об’єкт із доступним для запису атрибутом „значення” та поверніть для нього проксі-сервер.
Zmienione w wersji 3.6: Спільні об’єкти можуть бути вкладеними. Наприклад, спільний об’єкт-контейнер, такий як спільний список, може містити інші спільні об’єкти, якими керуватиме та синхронізуватиме
SyncManager
.
- class multiprocessing.managers.Namespace¶
Тип, який можна зареєструвати в
SyncManager
.Об’єкт простору імен не має відкритих методів, але має атрибути, доступні для запису. Його подання показує значення його атрибутів.
Однак, коли використовується проксі для об’єкта простору імен, атрибут, що починається з
'_''
буде атрибутом проксі, а не атрибутом референта:>>> mp_context = multiprocessing.get_context('spawn') >>> manager = mp_context.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
Індивідуальні менеджери¶
Щоб створити власний менеджер, потрібно створити підклас BaseManager
і використовувати метод класу register()
для реєстрації нових типів або викликів у класі менеджера. Наприклад:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
Використання віддаленого менеджера¶
Можна запустити керуючий сервер на одній машині, а клієнти використовуватимуть його з інших машин (за умови, що задіяні брандмауери дозволяють це).
Виконання наступних команд створює сервер для однієї спільної черги, до якої мають доступ віддалені клієнти:
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Один клієнт може отримати доступ до сервера наступним чином:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
Інший клієнт також може використовувати його:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
Локальні процеси також можуть отримати доступ до цієї черги, використовуючи код вище на клієнті для доступу до неї віддалено:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super().__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Проксі об’єкти¶
Проксі — це об’єкт, який посилається на спільний об’єкт, який живе (імовірно) в іншому процесі. Спільний об’єкт називається референтом проксі. Кілька проксі-об’єктів можуть мати один і той же референт.
Проксі-об’єкт має методи, які викликають відповідні методи його референта (хоча не кожен метод референта обов’язково буде доступним через проксі). Таким чином, проксі можна використовувати так само, як і його референт:
>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Зауважте, що застосування str()
до проксі поверне подання референта, тоді як застосування repr()
поверне подання проксі.
Важливою особливістю проксі-об’єктів є те, що їх можна вибирати, тому їх можна передавати між процесами. Таким чином, референт може містити Проксі об’єкти. Це дозволяє вкладати ці керовані списки, dicts та інші Проксі об’єкти:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
Подібним чином, проксі dict і список можуть бути вкладені один в одного:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
Якщо стандартні (не проксі) list
або dict
об’єкти містяться в референті, модифікації цих змінних значень не поширюватимуться через менеджер, оскільки проксі не може дізнатися, коли значення що містяться в них, змінено. Однак збереження значення в проксі-контейнері (що запускає __setitem__
в проксі-об’єкті) поширюється через менеджер, тому для ефективної зміни такого елемента можна повторно призначити змінене значення проксі-серверу контейнера: :
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
Цей підхід, можливо, менш зручний, ніж використання вкладених Проксі об’єкти для більшості випадків використання, але також демонструє рівень контролю над синхронізацією.
Informacja
Проксі-типи в multiprocessing
не підтримують порівняння за значенням. Так, наприклад, ми маємо:
>>> manager.list([1,2,3]) == [1,2,3]
False
Під час порівнянь слід просто використовувати копію референта.
- class multiprocessing.managers.BaseProxy¶
Проксі-об’єкти є екземплярами підкласів
BaseProxy
.- _callmethod(methodname[, args[, kwds]])¶
Виклик і повернення результату методу референта проксі.
Якщо
proxy
є проксі, референтом якого єobj
, тоді виразproxy._callmethod(methodname, args, kwds)
обчислить вираз
getattr(obj, methodname)(*args, **kwds)
в процесі менеджера.
Поверненим значенням буде копія результату виклику або проксі для нового спільного об’єкта – див. документацію щодо аргументу method_to_typeid
BaseManager.register()
.Якщо виклик викликає виняток, він повторно викликається
_callmethod()
. Якщо в процесі менеджера виникає інший виняток, він перетворюється на винятокRemoteError
і викликається_callmethod()
.Зокрема, зауважте, що виняток буде створено, якщо methodname не було виявлено.
Приклад використання
_callmethod()
:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
- _getvalue()¶
Повернути копію референту.
Якщо референт неможливо вибрати, це спричинить виняток.
- __repr__()¶
Повертає представлення проксі-об’єкта.
- __str__()¶
Повернути представлення референта.
Прибирати¶
Проксі-об’єкт використовує зворотний виклик weakref, щоб, коли він збирає сміття, він скасовує реєстрацію в менеджері, якому належить його референт.
Спільний об’єкт видаляється з процесу менеджера, коли більше немає проксі-серверів, які посилаються на нього.
Пули процесів¶
Можна створити пул процесів, які виконуватимуть передані йому завдання за допомогою класу Pool
.
- class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶
Об’єкт пулу процесів, який керує пулом робочих процесів, до яких можна надсилати завдання. Він підтримує асинхронні результати з тайм-аутами та зворотними викликами та має реалізацію паралельної карти.
processes — количество используемых рабочих процессов. Если processes имеет значение None, то используется число, возвращаемое
os.process_cpu_count()
.Якщо initializer не
None
, тоді кожен робочий процес викличеinitializer(*initargs)
під час свого запуску.maxtasksperchild — це кількість завдань, які робочий процес може виконати, перш ніж він вийде та буде замінений новим робочим процесом, щоб звільнити невикористані ресурси. Типовим значенням maxtasksperchild є
None
, що означає, що робочі процеси живуть стільки ж, скільки пул.context можна використовувати для визначення контексту, який використовується для запуску робочих процесів. Зазвичай пул створюється за допомогою функції
multiprocessing.Pool()
або методуPool()
контекстного об’єкта. В обох випадках контекст встановлено належним чином.Зауважте, що методи об’єкта пулу має викликати тільки процес, який створив пул.
Ostrzeżenie
Об’єкти
multiprocessing.pool
мають внутрішні ресурси, якими потрібно належним чином керувати (як і будь-яким іншим ресурсом), використовуючи пул як контекстний менеджер або викликаючиclose()
іterminate()
вручну. Якщо цього не зробити, процес може призупинити завершення.Зауважте, що некоректно покладатися на збирач сміття для знищення пулу, оскільки CPython не гарантує, що буде викликано фіналізатор пулу (див.
object.__del__()
для отримання додаткової інформації).Zmienione w wersji 3.2: Добавлен параметр maxtasksperchild.
Zmienione w wersji 3.4: Добавлен параметр context.
Zmienione w wersji 3.13: processes по умолчанию использует
os.process_cpu_count()
вместоos.cpu_count()
.Informacja
Робочі процеси в межах
Pool
зазвичай живі протягом повної тривалості робочої черги пулу. Частий шаблон, який зустрічається в інших системах (таких як Apache, mod_wsgi тощо) для звільнення ресурсів, які зберігаються робочими засобами, полягає в тому, щоб дозволити робочому в межах пулу завершити лише певний обсяг роботи перед виходом, очищенням і породженням нового процесу на заміну старого. Аргумент maxtasksperchild дляPool
надає цю можливість кінцевому користувачеві.- apply(func[, args[, kwds]])¶
Виклик func з аргументами args і ключовими аргументами kwds. Блокується, поки не буде готовий результат. Враховуючи ці блоки,
apply_async()
краще підходить для виконання роботи паралельно. Крім того, func виконується лише в одному з воркерів пулу.
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])¶
Варіант методу
apply()
, який повертає об’єктAsyncResult
.Якщо вказано callback, це має бути виклик, який приймає один аргумент. Коли результат стає готовим, до нього застосовується callback, якщо тільки виклик не вдався, у цьому випадку замість нього застосовується error_callback.
Якщо вказано error_callback, це має бути виклик, який приймає один аргумент. Якщо цільова функція дає збій, то error_callback викликається з екземпляром винятку.
Зворотні виклики мають завершитися негайно, інакше потік, який обробляє результати, буде заблоковано.
- map(func, iterable[, chunksize])¶
Паралельний еквівалент вбудованої функції
map()
(хоча вона підтримує лише один аргумент iterable, для кількох ітерацій див.starmap()
). Блокується, поки не буде готовий результат.Цей метод розбиває iterable на декілька фрагментів, які він надсилає до пулу процесів як окремі завдання. (Приблизний) розмір цих фрагментів можна вказати, встановивши для chunksize додатне ціле число.
Зауважте, що це може спричинити велике використання пам’яті для дуже довгих ітерацій. Розгляньте можливість використання
imap()
абоimap_unordered()
з явним параметром chunksize для кращої ефективності.
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
Варіант методу
map()
, який повертає об’єктAsyncResult
.Якщо вказано callback, це має бути виклик, який приймає один аргумент. Коли результат стає готовим, до нього застосовується callback, якщо тільки виклик не вдався, у цьому випадку замість нього застосовується error_callback.
Якщо вказано error_callback, це має бути виклик, який приймає один аргумент. Якщо цільова функція дає збій, то error_callback викликається з екземпляром винятку.
Зворотні виклики мають завершитися негайно, інакше потік, який обробляє результати, буде заблоковано.
- imap(func, iterable[, chunksize])¶
Ленича версія
map()
.Аргумент chunksize такий самий, як той, який використовується методом
map()
. Для дуже довгих ітерацій використання великого значення для chunksize може зробити роботу завершеною набагато швидше, ніж використання значення за замовчуванням1
.Крім того, якщо chunksize дорівнює
1
, тоді методnext()
ітератора, який повертає методimap()
, має додатковий параметр timeout:next(timeout)
викличеmultiprocessing.TimeoutError
, якщо результат не може бути повернутий протягом часу очікування секунд.
- imap_unordered(func, iterable[, chunksize])¶
Те саме, що
imap()
, за винятком того, що порядок результатів від повернутого ітератора слід вважати довільним. (Тільки коли є лише один робочий процес, порядок гарантовано буде „правильним”.)
- starmap(func, iterable[, chunksize])¶
Подібно до
map()
, за винятком того, що елементи iterable мають бути ітерованими, які розпаковуються як аргументи.Тому ітерація
[(1,2), (3, 4)]
призводить до[func(1,2), func(3,4)]
.Dodane w wersji 3.3.
- starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
Комбінація
starmap()
іmap_async()
, яка виконує ітерацію по iterable ітерацій і викликає func з розпакованими ітераціями. Повертає об’єкт результату.Dodane w wersji 3.3.
- close()¶
Запобігає надсиланню додаткових завдань до пулу. Після виконання всіх завдань робочі процеси завершаться.
- terminate()¶
Негайно зупиняє робочі процеси, не завершуючи незавершену роботу. Коли об’єкт пулу збирається як сміття, негайно буде викликано
terminate()
.
- join()¶
Зачекайте, поки робочі процеси завершаться. Потрібно викликати
close()
абоterminate()
перед використаннямjoin()
.
Zmienione w wersji 3.3: Об’єкти пулу тепер підтримують протокол керування контекстом – див. Bağlam Yöneticisi Türleri.
__enter__()
повертає об’єкт пулу, а__exit__()
викликаєterminate()
.
- class multiprocessing.pool.AsyncResult¶
Клас результату, який повертають
Pool.apply_async()
іPool.map_async()
.- get([timeout])¶
Поверніть результат, коли він надійде. Якщо timeout не
None
і результат не надходить протягом timeout секунд, тоді виникаєmultiprocessing.TimeoutError
. Якщо віддалений виклик викликав виняток, цей виняток буде повторно викликаноget()
.
- wait([timeout])¶
Зачекайте, поки буде доступний результат або поки не мине тайм-аут секунди.
- ready()¶
Повідомити, чи завершено виклик.
- successful()¶
Повертає, чи завершено виклик без виклику винятку. Викличе
ValueError
, якщо результат не готовий.Zmienione w wersji 3.7: Якщо результат не готовий, замість
AssertionError
виникаєValueError
.
Наступний приклад демонструє використання пулу:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
Слухачі та клієнти¶
Зазвичай передача повідомлень між процесами здійснюється за допомогою черг або за допомогою об’єктів Connection
, які повертає Pipe()
.
Однак модуль multiprocessing.connection
забезпечує додаткову гнучкість. По суті, це надає API високого рівня, орієнтований на повідомлення, для роботи з сокетами або іменованими каналами Windows. Він також підтримує дайджест-автентифікацію за допомогою модуля hmac
і для опитування кількох з’єднань одночасно.
- multiprocessing.connection.deliver_challenge(connection, authkey)¶
Надішліть випадково згенероване повідомлення на інший кінець з’єднання та дочекайтеся відповіді.
Якщо відповідь відповідає дайджесту повідомлення з використанням authkey як ключа, тоді на інший кінець з’єднання надсилається вітальне повідомлення. Інакше виникає
AuthenticationError
.
- multiprocessing.connection.answer_challenge(connection, authkey)¶
Отримайте повідомлення, обчисліть дайджест повідомлення, використовуючи authkey як ключ, а потім надішліть дайджест назад.
Якщо вітальне повідомлення не отримано, виникає
AuthenticationError
.
- multiprocessing.connection.Client(address[, family[, authkey]])¶
Спроба встановити з’єднання зі слухачем, який використовує адресу address, повертаючи
Connection
.Тип з’єднання визначається аргументом family, але зазвичай його можна опустити, оскільки його зазвичай можна визначити з формату address. (Див. Формати адрес)
Если указан authkey, а не
None
, это должна быть байтовая строка, которая будет использоваться в качестве секретного ключа для проверки подлинности на основе HMAC. Аутентификация не выполняется, если authkey имеет значение «None».AuthenticationError
возникает, если аутентификация не удалась. См. Ключі автентифікації.
- class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])¶
Обгортка для пов’язаного сокета або каналу з іменем Windows, який „слухає” з’єднання.
address — це адреса, яка буде використовуватися зв’язаним сокетом або іменованим каналом об’єкта слухача.
Informacja
Якщо використовується адреса „0.0.0.0”, ця адреса не буде кінцевою точкою підключення в Windows. Якщо вам потрібна підключена кінцева точка, вам слід використовувати „127.0.0.1”.
сімейство — це тип розетки (або названої труби), яку слід використовувати. Це може бути один із рядків
'AF_INET'
(для сокета TCP),'AF_UNIX'
(для сокета домену Unix) або'AF_PIPE''
(для іменованого каналу Windows) . З них лише перший гарантовано доступний. Якщо family має значенняNone
, тоді сім’я виводиться з формату address. Якщо адреса такожNone
, тоді вибрано значення за замовчуванням. Це за замовчуванням сімейство, яке вважається найшвидшим із доступних. Дивіться Формати адрес. Зауважте, що якщо сімейство має значення'AF_UNIX
, а адресаNone
, то сокет буде створено в приватному тимчасовому каталозі, створеному за допомогоюtempfile.mkstemp()
.Якщо об’єкт слухача використовує сокет, тоді backlog (1 за замовчуванням) передається в метод
listen()
сокета після того, як його буде зв’язано.Если указан authkey, а не
None
, это должна быть байтовая строка, которая будет использоваться в качестве секретного ключа для проверки подлинности на основе HMAC. Аутентификация не выполняется, если authkey имеет значение «None».AuthenticationError
возникает, если аутентификация не удалась. См. Ключі автентифікації.- accept()¶
Прийняти підключення до зв’язаного сокета або іменованого каналу об’єкта слухача та повернути об’єкт
Connection
. Якщо спроба автентифікації не вдається, виникаєAuthenticationError
.
- close()¶
Закрийте прив’язаний сокет або іменований канал об’єкта слухача. Це викликається автоматично, коли слухач збирає сміття. Однак бажано називати це явно.
Об’єкти слухача мають такі властивості лише для читання:
- address¶
Адреса, яка використовується об’єктом Listener.
- last_accepted¶
Адреса, з якої надійшло останнє прийняте підключення. Якщо це недоступно, це
None
.
Zmienione w wersji 3.3: Об’єкти слухача тепер підтримують протокол керування контекстом – див. Bağlam Yöneticisi Türleri.
__enter__()
повертає об’єкт слухача, а__exit__()
викликаєclose()
.
- multiprocessing.connection.wait(object_list, timeout=None)¶
Зачекайте, поки об’єкт у object_list буде готовий. Повертає список тих об’єктів у object_list, які готові. Якщо timeout є числом з плаваючою точкою, виклик блокується щонайбільше на стільки секунд. Якщо timeout має значення
None
, тоді він блокуватиметься на необмежений період. Від’ємний тайм-аут еквівалентний нульовому тайм-ауту.И для POSIX, и для Windows объект может появиться в object_list, если он
читабельний об’єкт
Connection
;підключений і читабельний об’єкт
socket.socket
; або
Об’єкт з’єднання або сокета готовий, коли є доступні дані для читання з нього, або інший кінець закрито.
POSIX:
wait(object_list, timeout)
почти эквивалентен``select.select(object_list, [], [], timeout)``. Разница в том, что еслиselect.select()
прерывается сигналом, он может вызватьOSError
с номером ошибкиEINTR
, тогда какwait()
этого не сделает.Windows: элемент в object_list должен быть либо целочисленным дескриптором, который является ожидаемым (согласно определению, используемому в документации Win32-функции
WaitForMultipleObjects()
), либо это может быть объект сfileno()
метод, который возвращает дескриптор сокета или дескриптор канала. (Обратите внимание, что дескрипторы каналов и дескрипторы сокетов не являются дескрипторами ожидания.)Dodane w wersji 3.3.
Examples
Наступний код сервера створює прослуховувач, який використовує 'секретний пароль
як ключ автентифікації. Потім він очікує з’єднання та надсилає деякі дані клієнту:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
Наступний код підключається до сервера та отримує деякі дані з сервера:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
Наступний код використовує wait()
для очікування повідомлень від кількох процесів одночасно:
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
Формати адрес¶
Адреса „AF_INET” — це кортеж у формі „(ім’я хоста, порт)”, де ім’я хоста — рядок, а порт — ціле число.
Адреса „AF_UNIX” — це рядок, що представляє назву файлу у файловій системі.
Адрес
'AF_PIPE'
представляет собой строку видаr'\\.\pipe\PipeName'
. Чтобы использоватьClient()
для подключения к именованному каналу на удаленном компьютере с именем ServerName, следует использовать адрес в формеr'\\ServerName\pipe \PipeName'
вместо этого.
Зауважте, що будь-який рядок, який починається двома зворотними похилими рисками, за замовчуванням вважається адресою 'AF_PIPE'
, а не адресою 'AF_UNIX'
.
Ключі автентифікації¶
Коли використовується Connection.recv
, отримані дані автоматично видаляються. На жаль, видалення даних із ненадійного джерела становить загрозу безпеці. Тому Listener
і Client()
використовують модуль hmac
для забезпечення автентифікації дайджесту.
Ключ автентифікації — це рядок байтів, який можна розглядати як пароль: коли з’єднання встановлено, обидва кінці вимагатимуть підтвердження того, що інший знає ключ автентифікації. (Демонстрація того, що обидві сторони використовують той самий ключ, не передбачає надсилання ключа через з’єднання.)
Якщо автентифікація запитується, але ключ автентифікації не вказано, тоді використовується значення, що повертається current_process().authkey
(див. Process
). Це значення буде автоматично успадковано будь-яким об’єктом Process
, який створює поточний процес. Це означає, що (за замовчуванням) усі процеси багатопроцесної програми спільно використовуватимуть один ключ автентифікації, який можна використовувати під час встановлення з’єднань між собою.
Відповідні ключі автентифікації також можна згенерувати за допомогою os.urandom()
.
Logowanie¶
Доступна певна підтримка журналювання. Однак зауважте, що пакунок logging
не використовує спільні блокування процесів, тому (залежно від типу обробника) повідомлення від різних процесів можуть переплутатися.
- multiprocessing.get_logger()¶
Повертає реєстратор, який використовується
multiprocessing
. За потреби буде створено новий.При первом создании регистратор имеет уровень
logging.NOTSET
и не имеет обработчика по умолчанию. Сообщения, отправленные в этот регистратор, по умолчанию не передаются в корневой регистратор.Зауважте, що у Windows дочірні процеси успадковуватимуть лише рівень реєстратора батьківського процесу – будь-які інші налаштування реєстратора не успадковуватимуться.
- multiprocessing.log_to_stderr(level=None)¶
Ця функція виконує виклик
get_logger()
, але окрім повернення реєстратора, створеного get_logger, вона додає обробник, який надсилає вихідні дані доsys.stderr
у форматі'[%(levelname)s/%(processName)s] %(message)s ''
. Ви можете змінитиlevelname
реєстратора, передавши аргументlevel
.
Нижче наведено приклад сеансу з увімкненим журналюванням:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
Щоб отримати повну таблицю рівнів журналювання, перегляньте модуль logging
.
Модуль multiprocessing.dummy
¶
multiprocessing.dummy
повторює API multiprocessing
, але є не більше ніж обгорткою модуля threading
.
Зокрема, функція Pool
, надана multiprocessing.dummy
, повертає екземпляр ThreadPool
, який є підкласом Pool
, який підтримує всі виклики методів, але використовує пул робочих потоків, а не робочих процесів.
- class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])¶
Об’єкт пулу потоків, який керує пулом робочих потоків, до яких можна надсилати завдання. Екземпляри
ThreadPool
повністю сумісні з інтерфейсом екземплярівPool
, і їхніми ресурсами також потрібно правильно керувати, використовуючи пул як контекстний менеджер або викликаючиclose()
іterminate()
вручну.процессы — количество используемых рабочих потоков. Если processes имеет значение None, то используется число, возвращаемое
os.process_cpu_count()
.Якщо initializer не
None
, тоді кожен робочий процес викличеinitializer(*initargs)
під час свого запуску.На відміну від
Pool
, maxtasksperchild і context не можна надати.Informacja
ThreadPool
має той самий інтерфейс, що йPool
, який розроблено навколо пулу процесів і передує появі модуляconcurrent.futures
. Таким чином, він успадковує деякі операції, які не мають сенсу для пулу, що підтримується потоками, і має власний тип для представлення статусу асинхронних завдань,AsyncResult
, який не розуміється жодною іншою бібліотекою.Зазвичай користувачі мають віддавати перевагу використанню
concurrent.futures.ThreadPoolExecutor
, який має простіший інтерфейс, розроблений навколо потоків із самого початку та повертаєconcurrent.futures.Future
екземпляри, сумісні з багатьма інші бібліотеки, включаючиasyncio
.
Інструкції з програмування¶
Існують певні вказівки та ідіоми, яких слід дотримуватися під час використання multiprocessing
.
Всі методи запуску¶
Наступне стосується всіх методів запуску.
Уникайте спільного стану
Наскільки це можливо, слід намагатися уникати переміщення великих обсягів даних між процесами.
Ймовірно, найкраще використовувати черги або канали для зв’язку між процесами, а не використовувати примітиви синхронізації нижчого рівня.
Пробірність
Переконайтеся, що аргументи методів проксі-серверів можна вибрати.
Безпека потоків проксі
Не використовуйте проксі-об’єкт із кількох потоків, якщо ви не захистите його за допомогою блокування.
(Ніколи не виникає проблем із різними процесами, які використовують той самий проксі.)
Приєднання до зомбованих процесів
В POSIX, когда процесс завершается, но к нему не присоединяются, он становится зомби. Их никогда не должно быть слишком много, поскольку каждый раз при запуске нового процесса (или вызове
active_children()
) все завершенные процессы, которые еще не были присоединены, будут объединены. Также вызовProcess.is_alive
готового процесса присоединяется к процессу. Даже в этом случае, вероятно, будет хорошей практикой явно присоединяться ко всем запускаемым вами процессам.
Краще успадкувати, ніж маринувати/розмаринувати
Під час використання методів запуску spawn або forkserver багато типів із
multiprocessing
мають бути доступними для вибору, щоб дочірні процеси могли їх використовувати. Однак зазвичай слід уникати надсилання спільних об’єктів іншим процесам за допомогою каналів або черг. Натомість ви повинні організувати програму так, щоб процес, якому потрібен доступ до спільного ресурсу, створеного в іншому місці, міг успадкувати його від процесу-предка.
Уникайте завершення процесів
Використання методу
Process.terminate
для зупинки процесу може призвести до того, що будь-які спільні ресурси (такі як блокування, семафори, канали та черги), які зараз використовуються цим процесом, стануть несправними або недоступними для інших процесів.Тому, ймовірно, найкраще використовувати
Process.terminate
лише для процесів, які ніколи не використовують спільні ресурси.
Приєднання до процесів, які використовують черги
Майте на увазі, що процес, який поставив елементи в чергу, чекатиме перед завершенням, доки всі буферизовані елементи не будуть передані потоком „фідера” до основного каналу. (Дочірній процес може викликати метод
Queue.cancel_join_thread
черги, щоб уникнути такої поведінки.)Це означає, що щоразу, коли ви використовуєте чергу, вам потрібно переконатися, що всі елементи, які було поставлено в чергу, зрештою буде видалено перед приєднанням до процесу. Інакше ви не можете бути впевнені, що процеси, які поставили елементи в чергу, завершаться. Пам’ятайте також, що недемонічні процеси будуть приєднані автоматично.
Прикладом, який призведе до взаємоблокування, є наступний:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()Виправити тут можна було б поміняти місцями останні два рядки (або просто видалити рядок
p.join()
).
Явно передати ресурси дочірнім процесам
В POSIX с использованием метода запуска fork дочерний процесс может использовать общий ресурс, созданный в родительском процессе с использованием глобального ресурса. Однако лучше передать объект в качестве аргумента конструктору дочернего процесса.
Окрім того, що код (потенційно) сумісний із Windows та іншими методами запуску, це також гарантує, що поки дочірній процес живий, об’єкт не збиратиме сміття в батьківському процесі. Це може бути важливо, якщо якийсь ресурс звільняється, коли об’єкт збирається як сміття в батьківському процесі.
Так наприклад
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()слід переписати як
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
Остерігайтеся заміни sys.stdin
на „файлоподібний об’єкт”
multiprocessing
спочатку безумовно називався:os.close(sys.stdin.fileno())у методі
multiprocessing.Process._bootstrap()
— це призвело до проблем із процесами в процесах. Це було змінено на:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)Это решает фундаментальную проблему конфликтов процессов друг с другом, что приводит к ошибке неправильного файлового дескриптора, но представляет потенциальную опасность для приложений, которые заменяют
sys.stdin()
„файлоподобным объектом” с буферизацией вывода. Эта опасность заключается в том, что если несколько процессов вызовутclose()
для этого файлового объекта, это может привести к тому, что одни и те же данные будут сброшены в объект несколько раз, что приведет к повреждению.Якщо ви пишете файлоподібний об’єкт і використовуєте власне кешування, ви можете зробити його безпечним для розгалуження, зберігаючи pid кожного разу, коли ви додаєте його до кешу, і відкидаючи кеш, коли pid змінюється. Наприклад:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cacheДля отримання додаткової інформації перегляньте bpo-5155, bpo-5313 та bpo-5331
Методи запуску spawn і forkserver¶
Существует несколько дополнительных ограничений, которые не применяются к методу запуска fork.
Більше маринування
Ensure that all arguments to
Process
are picklable. Also, if you subclassProcess.__init__
, you must make sure that instances will be picklable when theProcess.start
method is called.
Глобальні змінні
Майте на увазі, що якщо код, запущений у дочірньому процесі, намагається отримати доступ до глобальної змінної, тоді значення, яке він бачить (якщо таке є), може не збігатися зі значенням у батьківському процесі під час
Process.start викликано
.Однак глобальні змінні, які є лише константами рівня модуля, не викликають проблем.
Безпечне імпортування основного модуля
Убедитесь, что основной модуль может быть безопасно импортирован новым интерпретатором Python, не вызывая непредвиденных побочных эффектов (например, запуска нового процесса).
Наприклад, використання методу запуску spawn або forkserver під час запуску наступного модуля призведе до помилки з
RuntimeError
:from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()Натомість слід захистити „точку входу” програми за допомогою
if __name__ == '__main__':
наступним чином:from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(Рядок
freeze_support()
можна опустити, якщо програма буде працювати нормально, а не зависати.)Це дозволяє щойно створеному інтерпретатору Python безпечно імпортувати модуль, а потім запускати функцію foo() модуля.
Подібні обмеження застосовуються, якщо пул або менеджер створено в основному модулі.
Przykłady¶
Демонстрація створення та використання налаштованих менеджерів і проксі-серверів:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
Використання Pool
:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
Приклад, який показує, як використовувати черги для передачі завдань у колекцію робочих процесів і збору результатів:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()