multiprocessing
— Process-based parallelism¶
Вихідний код: Lib/multiprocessing/
Вступ¶
multiprocessing
is a package that supports spawning processes using an
API similar to the threading
module. The multiprocessing
package
offers both local and remote concurrency, effectively side-stepping the
Global Interpreter Lock by using
subprocesses instead of threads. Due
to this, the multiprocessing
module allows the programmer to fully
leverage multiple processors on a given machine. It runs on both Unix and
Windows.
Модуль multiprocessing
також представляє API, які не мають аналогів у модулі threading
. Яскравим прикладом цього є об’єкт Pool
, який пропонує зручний засіб розпаралелювання виконання функції для кількох вхідних значень, розподіляючи вхідні дані між процесами (паралелізм даних). Наступний приклад демонструє звичайну практику визначення таких функцій у модулі, щоб дочірні процеси могли успішно імпортувати цей модуль. Цей базовий приклад паралелізму даних з використанням Pool
,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
друкуватиме стандартний вихід
[1, 4, 9]
Клас Process
¶
У multiprocessing
процеси породжуються шляхом створення об’єкта Process
і виклику його методу start()
. Process
відповідає API threading.Thread
. Тривіальним прикладом багатопроцесорної програми є:
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
Щоб показати ідентифікатори окремих процесів, ось розширений приклад:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
Щоб отримати пояснення, чому потрібна частина if __name__ == '__main__''
, див. Інструкції з програмування.
Контексти та методи запуску¶
Залежно від платформи multiprocessing
підтримує три способи запуску процесу. Це методи запуску
- спаун
The parent process starts a fresh python interpreter process. The child process will only inherit those resources necessary to run the process object’s
run()
method. In particular, unnecessary file descriptors and handles from the parent process will not be inherited. Starting a process using this method is rather slow compared to using fork or forkserver.Available on Unix and Windows. The default on Windows and macOS.
- вилка
Батьківський процес використовує
os.fork()
для розгалуження інтерпретатора Python. Дочірній процес, коли він починається, фактично ідентичний батьківському процесу. Усі ресурси батьківського процесу успадковуються дочірнім процесом. Зверніть увагу, що безпечне розгалуження багатопотокового процесу є проблематичним.Available on Unix only. The default on Unix.
- форксервер
When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use
os.fork()
. No unnecessary resources are inherited.Available on Unix platforms which support passing file descriptors over Unix pipes.
Змінено в версії 3.8: У macOS метод запуску spawn тепер є типовим. Метод запуску fork слід вважати небезпечним, оскільки він може призвести до збою підпроцесу. Див. bpo-33725.
Змінено в версії 3.4: spawn added on all unix platforms, and forkserver added for some unix platforms. Child processes no longer inherit all of the parents inheritable handles on Windows.
On Unix using the spawn or forkserver start methods will also
start a resource tracker process which tracks the unlinked named
system resources (such as named semaphores or
SharedMemory
objects) created
by processes of the program. When all processes
have exited the resource tracker unlinks any remaining tracked object.
Usually there should be none, but if a process was killed by a signal
there may be some «leaked» resources. (Neither leaked semaphores nor shared
memory segments will be automatically unlinked until the next reboot. This is
problematic for both objects because the system allows only a limited number of
named semaphores, and shared memory segments occupy some space in the main
memory.)
Щоб вибрати метод запуску, ви використовуєте set_start_method()
в пункті if __name__ == '__main__'
головного модуля. Наприклад:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method()
не слід використовувати більше одного разу в програмі.
Крім того, ви можете використовувати get_context()
, щоб отримати об’єкт контексту. Контекстні об’єкти мають той самий API, що й багатопроцесорний модуль, і дозволяють використовувати кілька методів запуску в одній програмі.
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
Зауважте, що об’єкти, пов’язані з одним контекстом, можуть бути несумісними з процесами для іншого контексту. Зокрема, блокування, створені за допомогою контексту fork, не можна передати процесам, запущеним за допомогою методів запуску spawn або forkserver.
Бібліотека, яка хоче використовувати певний метод запуску, ймовірно, повинна використовувати get_context()
, щоб уникнути втручання у вибір користувача бібліотеки.
Попередження
The 'spawn'
and 'forkserver'
start methods cannot currently
be used with «frozen» executables (i.e., binaries produced by
packages like PyInstaller and cx_Freeze) on Unix.
The 'fork'
start method does work.
Обмін об’єктами між процесами¶
multiprocessing
підтримує два типи каналів зв’язку між процесами:
Черги
Клас
Queue
є майже клономqueue.Queue
. Наприклад:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()Queues are thread and process safe.
Труби
Функція
Pipe()
повертає пару об’єктів з’єднання, з’єднаних трубою, яка за умовчанням є дуплексною (двосторонньою). Наприклад:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()Два об’єкти з’єднання, які повертає
Pipe()
, представляють два кінці труби. Кожен об’єкт підключення має методиsend()
іrecv()
(серед інших). Зауважте, що дані в каналі можуть бути пошкоджені, якщо два процеси (або потоки) намагаються зчитувати або писати в той самий кінець каналу одночасно. Звичайно, немає ризику пошкодження через процеси, що використовують різні кінці труби одночасно.
Синхронізація між процесами¶
multiprocessing
містить еквіваленти всіх примітивів синхронізації з threading
. Наприклад, можна використовувати блокування, щоб гарантувати, що лише один процес друкує на стандартний вивід одночасно:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
Без використання блокування вихідні дані різних процесів можуть переплутатися.
Використання пулу працівників¶
Клас Pool
представляє пул робочих процесів. Він має методи, які дозволяють розвантажувати завдання на робочі процеси кількома різними способами.
Наприклад:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
Зверніть увагу, що методи пулу повинні використовуватися тільки тим процесом, який його створив.
Примітка
Функціональність цього пакета вимагає, щоб модуль __main__
міг імпортуватися дочірніми елементами. Це описано в Інструкції з програмування, проте тут варто звернути увагу на це. Це означає, що деякі приклади, такі як приклади multiprocessing.pool.Pool
не працюватимуть в інтерактивному інтерпретаторі. Наприклад:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(Якщо ви спробуєте це, він фактично виведе три повні трасування, чергувані напіввипадковим чином, і тоді вам, можливо, доведеться якось зупинити батьківський процес.)
довідка¶
Пакет multiprocessing
здебільшого повторює API модуля threading
.
Process
і винятки¶
-
class
multiprocessing.
Process
(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶ Об’єкти процесу представляють діяльність, яка виконується в окремому процесі. Клас
Process
має еквіваленти всіх методівthreading.Thread
.The constructor should always be called with keyword arguments. group should always be
None
; it exists solely for compatibility withthreading.Thread
. target is the callable object to be invoked by therun()
method. It defaults toNone
, meaning nothing is called. name is the process name (seename
for more details). args is the argument tuple for the target invocation. kwargs is a dictionary of keyword arguments for the target invocation. If provided, the keyword-only daemon argument sets the processdaemon
flag toTrue
orFalse
. IfNone
(the default), this flag will be inherited from the creating process.By default, no arguments are passed to target.
Якщо підклас перевизначає конструктор, він повинен переконатися, що він викликає конструктор базового класу (
Process.__init__()
), перш ніж щось робити з процесом.Змінено в версії 3.3: Added the daemon argument.
-
run
()¶ Метод, що представляє діяльність процесу.
Ви можете перевизначити цей метод у підкласі. Стандартний метод
run()
викликає викликаний об’єкт, переданий конструктору об’єкта як цільовий аргумент, якщо такий є, з послідовними аргументами та ключовими аргументами, взятими з аргументів args і kwargs відповідно.
-
start
()¶ Запустіть процес.
Це має бути викликано щонайбільше один раз на об’єкт процесу. Він організовує виклик методу
run()
об’єкта в окремому процесі.
-
join
([timeout])¶ Якщо додатковий аргумент timeout має значення
None
(за замовчуванням), метод блокується, доки процес, чий методjoin()
викликається, не завершиться. Якщо timeout є додатним числом, воно блокує щонайбільше timeout секунд. Зауважте, що метод повертаєNone
, якщо його процес завершується або якщо метод закінчився. Перевіртеexitcode
процесу, щоб визначити, чи він завершився.До процесу можна приєднуватися багато разів.
Процес не може приєднатися до себе, оскільки це спричинить тупикову блокування. Спроба приєднатися до процесу до його запуску є помилкою.
-
name
¶ Назва процесу. Ім’я - це рядок, який використовується лише для ідентифікації. Він не має семантики. Кілька процесів можуть мати однакові назви.
Початкове ім’я задається конструктором. Якщо конструктору не надано явної назви, ім’я у формі „Process-N1:N2:…:Nk“ побудований, де кожен Nk є N-м дочірнім елементом свого батька.
-
is_alive
()¶ Повернути, чи процес активний.
Приблизно, об’єкт процесу живий з моменту повернення методу
start()
до завершення дочірнього процесу.
-
daemon
¶ Прапор демона процесу, логічне значення. Це має бути встановлено перед викликом
start()
.Початкове значення успадковується від процесу створення.
Коли процес завершується, він намагається завершити всі свої демонічні дочірні процеси.
Зауважте, що демонічному процесу не дозволяється створювати дочірні процеси. Інакше демонічний процес залишив би своїх нащадків сиротами, якщо він буде припинений під час завершення процесу батьківського процесу. Крім того, це не демони чи служби Unix, це звичайні процеси, які будуть припинені (і не приєднані), якщо недемонічні процеси вийшли.
Окрім API
threading.Thread
, об’єктиProcess
також підтримують такі атрибути та методи:-
pid
¶ Поверніть ідентифікатор процесу. До того, як процес буде створено, це буде
None
.
-
exitcode
¶ Код виходу дитини. Це буде
None
, якщо процес ще не завершено.Якщо дочірній метод
run()
повернувся нормально, код виходу буде 0. Якщо він закінчився черезsys.exit()
із цілим аргументом N, код виходу буде N.Якщо дочірній процес завершився через виняток, який не було перехоплено в
run()
, код виходу буде 1. Якщо його було завершено сигналом N, код виходу матиме від’ємне значення -N.
-
authkey
¶ Ключ автентифікації процесу (рядок байтів).
Коли
multiprocessing
ініціалізовано, головному процесу призначається випадковий рядок за допомогоюos.urandom()
.Коли об’єкт
Process
створюється, він успадковує ключ автентифікації свого батьківського процесу, хоча це можна змінити, встановившиauthkey
інший байтовий рядок.Перегляньте Ключі автентифікації.
-
sentinel
¶ Числовий дескриптор системного об’єкта, який стане «готовим» після завершення процесу.
You can use this value if you want to wait on several events at once using
multiprocessing.connection.wait()
. Otherwise callingjoin()
is simpler.On Windows, this is an OS handle usable with the
WaitForSingleObject
andWaitForMultipleObjects
family of API calls. On Unix, this is a file descriptor usable with primitives from theselect
module.Нове в версії 3.3.
-
terminate
()¶ Terminate the process. On Unix this is done using the
SIGTERM
signal; on WindowsTerminateProcess()
is used. Note that exit handlers and finally clauses, etc., will not be executed.Зверніть увагу, що процеси-нащадки процесу не будуть припинені – вони просто стануть сиротами.
Попередження
Якщо цей метод використовується, коли пов’язаний процес використовує канал або чергу, канал або чергу можуть бути пошкоджені та можуть стати непридатними для використання іншим процесом. Подібним чином, якщо процес отримав блокування або семафор тощо, його завершення може призвести до блокування інших процесів.
-
kill
()¶ Same as
terminate()
but using theSIGKILL
signal on Unix.Нове в версії 3.7.
-
close
()¶ Закрийте об’єкт
Process
, звільнивши всі пов’язані з ним ресурси.ValueError
виникає, якщо основний процес все ще виконується. Після успішного поверненняclose()
більшість інших методів і атрибутів об’єктаProcess
викличутьValueError
.Нове в версії 3.7.
Зауважте, що методи
start()
,join()
,is_alive()
,terminate()
іexitcode
має викликати лише процес, який створив об’єкт процесу .Приклад використання деяких методів
Process
:>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
-
-
exception
multiprocessing.
ProcessError
¶ Базовий клас усіх винятків
multiprocessing
.
-
exception
multiprocessing.
BufferTooShort
¶ Exception raised by
Connection.recv_bytes_into()
when the supplied buffer object is too small for the message read.Якщо
e
є екземпляромBufferTooShort
, тоe.args[0]
видасть повідомлення як рядок байтів.
-
exception
multiprocessing.
AuthenticationError
¶ Викликається, коли виникає помилка автентифікації.
-
exception
multiprocessing.
TimeoutError
¶ Викликається методами з тайм-аутом, коли час очікування закінчується.
Труби та черги¶
При використанні кількох процесів зазвичай використовується передача повідомлень для зв’язку між процесами та уникається використання будь-яких примітивів синхронізації, таких як блокування.
Для передачі повідомлень можна використовувати Pipe()
(для з’єднання між двома процесами) або чергу (що дозволяє використовувати кілька виробників і споживачів).
Типи Queue
, SimpleQueue
і JoinableQueue
є чергами багатьох виробників і споживачів FIFO, змодельованими на основі queue.Queue
клас у стандартній бібліотеці. Вони відрізняються тим, що Queue
не має методів task_done()
і join()
, представлених у queue.Queue
Python 2.5 клас.
Якщо ви використовуєте JoinableQueue
, тоді ви має викликати JoinableQueue.task_done()
для кожного завдання, вилученого з черги, інакше семафор, який використовується для підрахунку кількості незавершених завдань, може зрештою переповнитися, викликаючи виняток.
Зауважте, що можна також створити спільну чергу за допомогою об’єкта менеджера – див. Менеджери.
Примітка
multiprocessing
використовує звичайні винятки queue.Empty
і queue.Full
, щоб сигналізувати про час очікування. Вони недоступні в просторі імен multiprocessing
, тому їх потрібно імпортувати з queue
.
Примітка
Коли об’єкт ставиться в чергу, об’єкт очищається, а фоновий потік пізніше скидає обрані дані в базовий канал. Це має деякі наслідки, які є трохи дивними, але не повинні викликати жодних практичних труднощів - якщо вони дійсно вас турбують, ви можете натомість використати чергу, створену за допомогою manager.
Після розміщення об’єкта в порожній черзі може виникнути нескінченно мала затримка, перш ніж метод
empty()
черги повернеFalse
, аget_nowait()
зможе повернутися без викликуqueue.Empty
.Якщо кілька процесів ставлять об’єкти в чергу, об’єкти можуть бути отримані на іншому кінці не за порядком. Однак об’єкти, поставлені в чергу одним і тим же процесом, завжди будуть в очікуваному порядку один відносно одного.
Попередження
Якщо процес зупинено за допомогою Process.terminate()
або os.kill()
під час спроби використання Queue
, то дані в черзі, ймовірно, будуть пошкоджені. Це може призвести до того, що будь-який інший процес отримає виняток, коли він спробує використати чергу пізніше.
Попередження
Як згадувалося вище, якщо дочірній процес поставив елементи в чергу (і він не використовував JoinableQueue.cancel_join_thread
), тоді цей процес не завершиться, доки всі буферизовані елементи не будуть скинуті в канал.
Це означає, що якщо ви спробуєте приєднатися до цього процесу, ви можете отримати тупикову блокування, якщо ви не впевнені, що всі елементи, які були поставлені в чергу, використано. Так само, якщо дочірній процес є недемонічним, тоді батьківський процес може зависнути при виході, коли він намагається приєднатися до всіх своїх недемонічних дочірніх процесів.
Зауважте, що черга, створена за допомогою менеджера, не має цієї проблеми. Дивіться Інструкції з програмування.
Для прикладу використання черг для міжпроцесного зв’язку див. Приклади.
-
multiprocessing.
Pipe
([duplex])¶ Повертає пару об’єктів
(conn1, conn2)
Connection
, що представляють кінці труби.Якщо duplex має значення
True
(за замовчуванням), тоді канал є двонаправленим. Якщо duplex має значенняFalse
, тоді канал є односпрямованим:conn1
можна використовувати лише для отримання повідомлень, аconn2
— лише для надсилання повідомлень.
-
class
multiprocessing.
Queue
([maxsize])¶ Повертає спільну чергу процесу, реалізовану за допомогою каналу та кількох блокувань/семафорів. Коли процес вперше ставить елемент у чергу, запускається потік, який передає об’єкти з буфера в канал.
Звичайні винятки
queue.Empty
іqueue.Full
із модуляqueue
стандартної бібліотеки створюються, щоб повідомити про час очікування.Queue
реалізує всі методиqueue.Queue
, крімtask_done()
іjoin()
.-
qsize
()¶ Повертає приблизний розмір черги. Через семантику багатопоточності/багатопроцесорності це число ненадійне.
Note that this may raise
NotImplementedError
on Unix platforms like macOS wheresem_getvalue()
is not implemented.
-
empty
()¶ Повертає
True
, якщо черга порожня,False
інакше. Через семантику багатопоточності/багатопроцесорності це ненадійно.
-
full
()¶ Повертає
True
, якщо черга заповнена,False
інакше. Через семантику багатопоточності/багатопроцесорності це ненадійно.
-
put
(obj[, block[, timeout]])¶ Помістіть obj у чергу. Якщо необов’язковий аргумент block має значення
True
(за замовчуванням), а timeout має значенняNone
(за замовчуванням), за потреби блокуйте, доки не з’явиться вільний слот. Якщо timeout є додатним числом, він блокує щонайбільше timeout секунд і викликає винятокqueue.Full
, якщо протягом цього часу не було вільного місця. В іншому випадку (block має значенняFalse
), помістіть елемент у чергу, якщо вільний слот є негайно доступним, інакше викликайте винятокqueue.Full
(timeout у цьому випадку ігнорується).Змінено в версії 3.8: Якщо чергу закрито, замість
AssertionError
виникаєValueError
.
-
put_nowait
(obj)¶ Еквівалент
put(obj, False)
.
-
get
([block[, timeout]])¶ Видалити та повернути елемент із черги. Якщо необов’язкові аргументи block мають значення
True
(за замовчуванням), а timeout —None
(за замовчуванням), за потреби блокуйте, доки елемент не стане доступним. Якщо timeout є додатним числом, він блокує щонайбільше timeout секунд і викликає винятокqueue.Empty
, якщо жоден елемент не був доступний протягом цього часу. В іншому випадку (блок має значенняFalse
), повертає елемент, якщо він одразу доступний, інакше викликає винятокqueue.Empty
(у цьому випадку timeout ігнорується).Змінено в версії 3.8: Якщо чергу закрито, замість
OSError
виникаєValueError
.
-
get_nowait
()¶ Еквівалент
get(False)
.
multiprocessing.Queue
має кілька додаткових методів, яких немає вqueue.Queue
. Ці методи зазвичай непотрібні для більшості коду:-
close
()¶ Укажіть, що поточний процес більше не додаватиме дані до цієї черги. Фоновий потік завершиться, коли всі буферизовані дані буде скинуто в канал. Це викликається автоматично, коли в черзі збирається сміття.
-
join_thread
()¶ Приєднайтеся до фонової нитки. Це можна використовувати лише після виклику
close()
. Він блокується, доки не завершиться фоновий потік, гарантуючи, що всі дані в буфері скинуті в канал.За замовчуванням, якщо процес не є творцем черги, після виходу він спробує приєднатися до фонового потоку черги. Процес може викликати
cancel_join_thread()
, щоб змуситиjoin_thread()
нічого не робити.
-
cancel_join_thread
()¶ Запобігти блокуванню
join_thread()
. Зокрема, це запобігає автоматичному приєднанню фонового потоку під час завершення процесу – див.join_thread()
.A better name for this method might be
allow_exit_without_flush()
. It is likely to cause enqueued data to lost, and you almost certainly will not need to use it. It is really only there if you need the current process to exit immediately without waiting to flush enqueued data to the underlying pipe, and you don’t care about lost data.
Примітка
Функціональність цього класу вимагає функціонуючої спільної реалізації семафора в головній операційній системі. Без нього функціональні можливості цього класу будуть вимкнені, а спроби створити екземпляр
Queue
призведуть доImportError
. Додаткову інформацію див. bpo-3770. Те саме стосується будь-якого зі спеціалізованих типів черги, перелічених нижче.-
-
class
multiprocessing.
SimpleQueue
¶ Це спрощений тип
Queue
, дуже схожий на заблокованийPipe
.-
close
()¶ Закрийте чергу: звільніть внутрішні ресурси.
Після закриття чергу більше не можна використовувати. Наприклад, методи
get()
,put()
іempty()
більше не можна викликати.Нове в версії 3.9.
-
empty
()¶ Повертає
True
, якщо черга порожня,False
інакше.
-
get
()¶ Видалити та повернути елемент із черги.
-
put
(item)¶ Поставте товар в чергу.
-
-
class
multiprocessing.
JoinableQueue
([maxsize])¶ JoinableQueue
, підкласQueue
, це черга, яка додатково має методиtask_done()
іjoin()
.-
task_done
()¶ Вказує на те, що завдання, яке раніше було в черзі, виконано. Використовується споживачами черги. Для кожного
get()
, який використовується для отримання завдання, наступний викликtask_done()
повідомляє черзі, що обробку завдання завершено.Якщо
join()
зараз блокується, воно відновиться, коли всі елементи будуть оброблені (це означає, що викликtask_done()
отримано для кожного елемента, який бувput()
в чергу).Викликає
ValueError
, якщо викликається стільки разів, скільки було елементів у черзі.
-
join
()¶ Блокуйте, доки не буде отримано та оброблено всі елементи в черзі.
Кількість незавершених завдань зростає щоразу, коли елемент додається до черги. Підрахунок зменшується щоразу, коли споживач викликає
task_done()
, щоб вказати, що елемент отримано та вся робота над ним завершена. Коли кількість незавершених завдань падає до нуля,join()
розблоковується.
-
Різне¶
-
multiprocessing.
active_children
()¶ Повернути список усіх живих дітей поточного процесу.
Виклик цього має побічний ефект «приєднання» до будь-яких процесів, які вже завершилися.
-
multiprocessing.
cpu_count
()¶ Повертає кількість процесорів у системі.
This number is not equivalent to the number of CPUs the current process can use. The number of usable CPUs can be obtained with
len(os.sched_getaffinity(0))
Коли кількість ЦП не може бути визначена, виникає
NotImplementedError
.Дивись також
-
multiprocessing.
current_process
()¶ Повертає об’єкт
Process
, що відповідає поточному процесу.Аналог
threading.current_thread()
.
-
multiprocessing.
parent_process
()¶ Повертає об’єкт
Process
, що відповідає батьківському процесуcurrent_process()
. Для основного процесуparent_process
будеNone
.Нове в версії 3.8.
-
multiprocessing.
freeze_support
()¶ Додайте підтримку, коли програма, яка використовує
multiprocessing
, була заморожена для створення виконуваного файлу Windows. (Було перевірено за допомогою py2exe, PyInstaller і cx_Freeze.)Цю функцію потрібно викликати відразу після рядка
if __name__ == '__main__'
головного модуля. Наприклад:from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
Якщо рядок
freeze_support()
пропущено, спроба запустити заморожений виконуваний файл викличеRuntimeError
.Виклик
freeze_support()
не має ефекту під час виклику в будь-якій операційній системі, крім Windows. Крім того, якщо модуль нормально запускається інтерпретатором Python у Windows (програма не була заморожена), тоfreeze_support()
не має ефекту.
-
multiprocessing.
get_all_start_methods
()¶ Returns a list of the supported start methods, the first of which is the default. The possible start methods are
'fork'
,'spawn'
and'forkserver'
. On Windows only'spawn'
is available. On Unix'fork'
and'spawn'
are always supported, with'fork'
being the default.Нове в версії 3.4.
-
multiprocessing.
get_context
(method=None)¶ Повертає об’єкт контексту, який має ті самі атрибути, що й модуль
multiprocessing
.If method is
None
then the default context is returned. Otherwise method should be'fork'
,'spawn'
,'forkserver'
.ValueError
is raised if the specified start method is not available.Нове в версії 3.4.
-
multiprocessing.
get_start_method
(allow_none=False)¶ Повертає назву методу запуску, який використовується для запуску процесів.
Якщо метод запуску не було виправлено і allow_none має значення false, тоді метод запуску фіксується за замовчуванням і повертається ім’я. Якщо метод запуску не було виправлено і allow_none має значення true, тоді повертається
None
.The return value can be
'fork'
,'spawn'
,'forkserver'
orNone
.'fork'
is the default on Unix, while'spawn'
is the default on Windows and macOS.
Змінено в версії 3.8: У macOS метод запуску spawn тепер є типовим. Метод запуску fork слід вважати небезпечним, оскільки він може призвести до збою підпроцесу. Див. bpo-33725.
Нове в версії 3.4.
-
multiprocessing.
set_executable
(executable)¶ Встановіть шлях інтерпретатора Python для використання під час запуску дочірнього процесу. (За замовчуванням використовується
sys.executable
). Вбудовувачі, ймовірно, повинні будуть зробити щось на зразок:set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
перш ніж вони зможуть створити дочірні процеси.
Змінено в версії 3.4: Now supported on Unix when the
'spawn'
start method is used.
-
multiprocessing.
set_start_method
(method)¶ Set the method which should be used to start child processes. method can be
'fork'
,'spawn'
or'forkserver'
.Зверніть увагу, що це має бути викликано щонайбільше один раз, і його слід захистити всередині пропозиції
if __name__ == '__main__'
головного модуля.Нове в версії 3.4.
Примітка
multiprocessing
не містить аналогів threading.active_count()
, threading.enumerate()
, threading.settrace()
, threading.setprofile()
, threading.Timer
або threading.local
.
Об’єкти підключення¶
Об’єкти підключення дозволяють надсилати та отримувати об’єкти або рядки, які можна вибрати. Їх можна розглядати як підключені сокети, орієнтовані на повідомлення.
Об’єкти підключення зазвичай створюються за допомогою Pipe
– див. також Слухачі та клієнти.
-
class
multiprocessing.connection.
Connection
¶ -
send
(obj)¶ Надішліть об’єкт на інший кінець з’єднання, який слід прочитати за допомогою
recv()
.Об’єкт має бути маринованим. Дуже великі pickles (приблизно 32 MiB+, хоча це залежить від ОС) можуть викликати виняток
ValueError
.
-
recv
()¶ Повернути об’єкт, надісланий з іншого кінця з’єднання за допомогою
send()
. Блокує, поки не буде що отримати. ВикликаєEOFError
, якщо не залишилося нічого для отримання, а інший кінець був закритий.
-
fileno
()¶ Повертає дескриптор файлу або дескриптор, який використовується підключенням.
-
close
()¶ Закрийте з’єднання.
Це викликається автоматично, коли підключення збирається сміттям.
-
poll
([timeout])¶ Повернути, чи є дані для читання.
Якщо timeout не вказано, він негайно повернеться. Якщо timeout є числом, це вказує максимальний час у секундах для блокування. Якщо timeout має значення
None
, тоді використовується нескінченний тайм-аут.Зауважте, що кілька об’єктів з’єднання можна опитувати одночасно за допомогою
multiprocessing.connection.wait()
.
-
send_bytes
(buffer[, offset[, size]])¶ Надіслати байтові дані з bytes-like object як повне повідомлення.
Якщо вказано зсув, дані зчитуються з цієї позиції в буфері. Якщо задано size, то стільки байтів буде прочитано з буфера. Дуже великі буфери (приблизно 32 MiB+, хоча це залежить від ОС) можуть викликати виключення
ValueError
-
recv_bytes
([maxlength])¶ Повертає повне повідомлення байтових даних, надісланих з іншого кінця з’єднання, у вигляді рядка. Блокує, поки не буде що отримати. Викликає
EOFError
, якщо не залишилося нічого для отримання, а інший кінець закрито.Якщо вказано maxlength і повідомлення довше, ніж maxlength, тоді виникає
OSError
і з’єднання більше не читається.
-
recv_bytes_into
(buffer[, offset])¶ Прочитайте в buffer повне повідомлення байтових даних, надісланих з іншого кінця з’єднання, і поверніть кількість байтів у повідомленні. Блокує, поки не буде що отримати. Викликає
EOFError
, якщо не залишилося нічого для отримання, а інший кінець був закритий.buffer має бути доступним для запису bytes-like object. Якщо задано offset, повідомлення буде записано в буфер із цієї позиції. Зсув має бути невід’ємним цілим числом, меншим за довжину буфера (у байтах).
Якщо буфер закороткий, виникає виняток
BufferTooShort
, і повне повідомлення доступне якe.args[0]
, деe
є винятком.
Змінено в версії 3.3: Самі об’єкти підключення тепер можна передавати між процесами за допомогою
Connection.send()
іConnection.recv()
.Нове в версії 3.3: Connection objects now support the context management protocol – see Типи менеджера контексту.
__enter__()
returns the connection object, and__exit__()
callsclose()
.-
Наприклад:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Попередження
Метод Connection.recv()
автоматично видаляє отримані дані, що може становити загрозу безпеці, якщо ви не можете довіряти процесу, який надіслав повідомлення.
Таким чином, якщо об’єкт підключення не було створено за допомогою Pipe()
, ви повинні використовувати лише методи recv()
і send()
після виконання певної автентифікації. Перегляньте Ключі автентифікації.
Попередження
Якщо процес зупиняється під час спроби читання або запису в канал, то дані в каналі, ймовірно, будуть пошкоджені, тому що може стати неможливо точно визначити, де пролягають межі повідомлення.
Примітиви синхронізації¶
Зазвичай примітиви синхронізації не такі необхідні в багатопроцесовій програмі, як у багатопотоковій програмі. Перегляньте документацію для модуля threading
.
Зауважте, що можна також створити примітиви синхронізації за допомогою об’єкта менеджера – див. Менеджери.
-
class
multiprocessing.
Barrier
(parties[, action[, timeout]])¶ Бар’єрний об’єкт: клон
threading.Barrier
.Нове в версії 3.3.
-
class
multiprocessing.
BoundedSemaphore
([value])¶ Обмежений семафорний об’єкт: близький аналог
threading.BoundedSemaphore
.Існує єдина відмінність від його близького аналога: перший аргумент методу
acquire
має назву block, що узгоджується зLock.acquire()
.Примітка
У macOS це неможливо відрізнити від
Semaphore
, оскількиsem_getvalue()
не реалізовано на цій платформі.
-
class
multiprocessing.
Condition
([lock])¶ Змінна умови: псевдонім для
threading.Condition
.Якщо вказано lock, це має бути об’єкт
Lock
абоRLock
ізmultiprocessing
.Змінено в версії 3.3: Додано метод
wait_for()
.
-
class
multiprocessing.
Event
¶ Клон
threading.Event
.
-
class
multiprocessing.
Lock
¶ Нерекурсивний об’єкт блокування: близький аналог
threading.Lock
. Після того, як процес або потік отримав блокування, наступні спроби отримати його від будь-якого процесу або потоку будуть блокуватися, доки його не буде звільнено; будь-який процес або потік може його звільнити. Концепції та поведінкаthreading.Lock
, які застосовуються до потоків, відтворені тут уmultiprocessing.Lock
, оскільки вони застосовуються до процесів або потоків, за винятком зазначених випадків.Зауважте, що
Lock
насправді є фабричною функцією, яка повертає примірникmultiprocessing.synchronize.Lock
, ініціалізований контекстом за замовчуванням.Lock
підтримує протокол context manager і тому може використовуватися вwith
операторах.-
acquire
(block=True, timeout=None)¶ Отримайте блокування, блокування або неблокування.
Якщо для аргументу block встановлено значення
True
(за замовчуванням), виклик методу блокуватиметься, доки блокування не перейде в розблокований стан, а потім встановлюватиме для нього значенняTrue
і повертатимеTrue
. Зауважте, що назва цього першого аргументу відрізняється від такої вthreading.Lock.acquire()
.Якщо для аргументу block встановлено значення
False
, виклик методу не блокується. Якщо блокування зараз у заблокованому стані, повернітьFalse
; інакше встановіть блокування в заблокований стан і повернітьTrue
.При виклику з додатним значенням з плаваючою комою для timeout, блокувати щонайбільше на кількість секунд, визначену timeout, доки не вдасться отримати блокування. Виклики з від’ємним значенням для timeout еквівалентні timeout рівному нулю. Виклики зі значенням timeout
None
(за замовчуванням) встановлюють період очікування як нескінченний. Зауважте, що обробка негативних значень або значеньNone
для timeout відрізняється від реалізованої поведінки вthreading.Lock.acquire()
. Аргумент timeout не має практичного значення, якщо для аргументу block встановлено значенняFalse
і, таким чином, він ігнорується. ПовертаєTrue
, якщо блокування отримано, абоFalse
, якщо період очікування минув.
-
release
()¶ Відпустіть блокування. Це може бути викликано з будь-якого процесу або потоку, а не тільки процесу або потоку, який спочатку отримав блокування.
Поведінка така ж, як у
threading.Lock.release()
, за винятком того, що під час виклику для розблокованого блокування виникаєValueError
.
-
-
class
multiprocessing.
RLock
¶ Об’єкт рекурсивного блокування: близький аналог
threading.RLock
. Рекурсивне блокування має бути звільнено процесом або потоком, який його отримав. Після того як процес або потік отримав рекурсивне блокування, той самий процес або потік може отримати його знову без блокування; цей процес або потік повинен випускати його один раз за кожен раз, коли його було отримано.Зауважте, що
RLock
насправді є фабричною функцією, яка повертає екземплярmultiprocessing.synchronize.RLock
, ініціалізований контекстом за замовчуванням.RLock
підтримує протокол context manager і тому може використовуватися вwith
операторах.-
acquire
(block=True, timeout=None)¶ Отримайте блокування, блокування або неблокування.
При виклику з аргументом block, встановленим у значення
True
, блокувати, доки блокування не буде в розблокованому стані (не належить жодному процесу або потоку), якщо блокування вже не належить поточному процесу або потоку. Тоді поточний процес або потік отримує право власності на блокування (якщо він ще не має права власності), а рівень рекурсії всередині блокування збільшується на одиницю, що призводить до повернення значенняTrue
. Зауважте, що є кілька відмінностей у поведінці цього першого аргументу порівняно з реалізацієюthreading.RLock.acquire()
, починаючи з назви самого аргументу.При виклику з аргументом block, встановленим на
False
, не блокувати. Якщо блокування вже було отримано (і, отже, ним володіє) інший процес або потік, поточний процес або потік не приймає права власності, а рівень рекурсії в межах блокування не змінюється, що призводить до повернення значенняFalse
. Якщо блокування знаходиться в розблокованому стані, поточний процес або потік приймає право власності, і рівень рекурсії збільшується, що призводить до повернення значенняTrue
.Використання та поведінка аргументу timeout такі ж, як і в
Lock.acquire()
. Зауважте, що деякі з цих дій timeout відрізняються від реалізованих уthreading.RLock.acquire()
.
-
release
()¶ Зніміть блокування, зменшивши рівень рекурсії. Якщо після зменшення рівень рекурсії дорівнює нулю, скиньте блокування до розблокованого (не належить жодному процесу чи потоку), а якщо будь-які інші процеси чи потоки заблоковані в очікуванні розблокування блокування, дозвольте рівно одному з них продовжити. Якщо після декременту рівень рекурсії все ще ненульовий, блокування залишається заблокованим і належить процесу або потоку, що викликає.
Викликайте цей метод лише тоді, коли процес або потік, що викликає, володіє блокуванням. Помилка
AssertionError
виникає, якщо цей метод викликається процесом або потоком, відмінним від власника, або якщо блокування знаходиться в розблокованому стані (не належить). Зауважте, що тип винятку, викликаного в цій ситуації, відрізняється від реалізованої поведінки вthreading.RLock.release()
.
-
-
class
multiprocessing.
Semaphore
([value])¶ Об’єкт семафор: близький аналог
threading.Semaphore
.Існує єдина відмінність від його близького аналога: перший аргумент методу
acquire
має назву block, що узгоджується зLock.acquire()
.
Примітка
У macOS sem_timedwait
не підтримується, тому виклик acquire()
із тайм-аутом буде емулювати поведінку цієї функції за допомогою циклу сну.
Примітка
If the SIGINT signal generated by Ctrl-C arrives while the main thread is
blocked by a call to BoundedSemaphore.acquire()
, Lock.acquire()
,
RLock.acquire()
, Semaphore.acquire()
, Condition.acquire()
or Condition.wait()
then the call will be immediately interrupted and
KeyboardInterrupt
will be raised.
This differs from the behaviour of threading
where SIGINT will be
ignored while the equivalent blocking calls are in progress.
Примітка
Для деяких функцій цього пакета необхідна функціональна реалізація спільного семафора в головній операційній системі. Без нього модуль multiprocessing.synchronize
буде вимкнено, а спроби його імпортувати призведуть до ImportError
. Додаткову інформацію див. bpo-3770.
Менеджери¶
Менеджери надають можливість створювати дані, якими можна ділитися між різними процесами, включно з загальним доступом через мережу між процесами, що виконуються на різних машинах. Об’єкт менеджера контролює серверний процес, який керує спільними об’єктами. Інші процеси можуть отримати доступ до спільних об’єктів за допомогою проксі-серверів.
-
multiprocessing.
Manager
()¶ Повертає запущений об’єкт
SyncManager
, який можна використовувати для спільного використання об’єктів між процесами. Повернений об’єкт менеджера відповідає породженому дочірньому процесу та має методи, які створюватимуть спільні об’єкти та повертатимуть відповідні проксі-сервери.
Процеси менеджера буде закрито, щойно їх буде зібрано сміття або їхній батьківський процес завершиться. Класи менеджерів визначені в модулі multiprocessing.managers
:
-
class
multiprocessing.managers.
BaseManager
([address[, authkey]])¶ Створіть об’єкт BaseManager.
Після створення потрібно викликати
start()
абоget_server().serve_forever()
, щоб переконатися, що об’єкт менеджера посилається на запущений процес менеджера.адреса — це адреса, на якій процес менеджера прослуховує нові підключення. Якщо адреса має значення
None
, тоді вибирається довільна адреса.authkey — це ключ автентифікації, який використовуватиметься для перевірки дійсності вхідних підключень до процесу сервера. Якщо authkey має значення
None
, тоді використовуєтьсяcurrent_process().authkey
. В іншому випадку використовується authkey, і це має бути рядок байтів.-
start
([initializer[, initargs]])¶ Запустіть підпроцес, щоб запустити менеджер. Якщо initializer не
None
, тоді підпроцес викличеinitializer(*initargs)
під час запуску.
-
get_server
()¶ Повертає об’єкт
Server
, який представляє фактичний сервер під керуванням менеджера. Об’єктServer
підтримує методserve_forever()
:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
додатково має атрибутaddress
.
-
connect
()¶ Підключіть об’єкт локального менеджера до віддаленого процесу менеджера:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
-
shutdown
()¶ Зупиніть процес, який використовує менеджер. Це доступно, лише якщо
start()
було використано для запуску процесу сервера.Це можна викликати кілька разів.
-
register
(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶ Метод класу, який можна використовувати для реєстрації типу або виклику в класі менеджера.
typeid — це «ідентифікатор типу», який використовується для ідентифікації певного типу спільного об’єкта. Це має бути рядок.
callable — це виклик, який використовується для створення об’єктів для ідентифікатора цього типу. Якщо екземпляр менеджера буде підключено до сервера за допомогою методу
connect()
або якщо аргумент create_method має значенняFalse
, тоді це можна залишити якNone
.proxytype є підкласом
BaseProxy
, який використовується для створення проксі для спільних об’єктів із цим typeid. ЯкщоNone
, то проксі-клас створюється автоматично.exposed використовується для визначення послідовності назв методів, до яких проксі-серверам для цього typeid має бути дозволено доступ за допомогою
BaseProxy._callmethod()
. (Якщо exposed має значенняNone
, тоді замість нього використовуєтьсяproxytype._exposed_
, якщо він існує.) У випадку, коли відкритий список не вказано, усі «загальнодоступні методи» спільного об’єкта будуть доступними. . (Тут «публічний метод» означає будь-який атрибут, який має метод__call__()
і ім’я якого не починається з'_'
.)method_to_typeid — це зіставлення, яке використовується для визначення типу повернення тих відкритих методів, які мають повертати проксі. Він відображає назви методів у рядках typeid. (Якщо method_to_typeid має значення
None
, тоді замість нього використовуєтьсяproxytype._method_to_typeid_
, якщо він існує.) Якщо назва методу не є ключем цього відображення або якщо відображення має значенняNone
, тоді об’єкт, повернутий методом, буде скопійовано за значенням.create_method визначає, чи слід створювати метод з іменем typeid, яке можна використовувати, щоб наказати серверному процесу створити новий спільний об’єкт і повернути для нього проксі. За замовчуванням це
True
.
Екземпляри
BaseManager
також мають одну властивість лише для читання:-
address
¶ Адреса, яку використовує менеджер.
Змінено в версії 3.3: Об’єкти менеджера підтримують протокол керування контекстом – див. Типи менеджера контексту.
__enter__()
запускає серверний процес (якщо він ще не запущений), а потім повертає об’єкт менеджера.__exit__()
викликаєshutdown()
.У попередніх версіях
__enter__()
не запускав серверний процес менеджера, якщо він ще не був запущений.-
-
class
multiprocessing.managers.
SyncManager
¶ Підклас
BaseManager
, який можна використовувати для синхронізації процесів. Об’єкти цього типу повертаєmultiprocessing.Manager()
.Його методи створюють і повертають Проксі об’єкти для ряду типів даних, які зазвичай використовуються, щоб синхронізувати між процесами. Це, зокрема, включає спільні списки та словники.
-
Barrier
(parties[, action[, timeout]])¶ Створіть спільний об’єкт
threading.Barrier
і поверніть для нього проксі.Нове в версії 3.3.
-
BoundedSemaphore
([value])¶ Створіть спільний об’єкт
threading.BoundedSemaphore
і поверніть для нього проксі.
-
Condition
([lock])¶ Створіть спільний об’єкт
threading.Condition
і поверніть для нього проксі.Якщо вказано lock, це має бути проксі для об’єкта
threading.Lock
абоthreading.RLock
.Змінено в версії 3.3: Додано метод
wait_for()
.
-
Event
()¶ Створіть спільний об’єкт
threading.Event
і поверніть для нього проксі.
-
Lock
()¶ Створіть спільний об’єкт
threading.Lock
і поверніть для нього проксі.
-
Queue
([maxsize])¶ Створіть спільний об’єкт
queue.Queue
і поверніть для нього проксі.
-
RLock
()¶ Створіть спільний об’єкт
threading.RLock
і поверніть для нього проксі.
-
Semaphore
([value])¶ Створіть спільний об’єкт
threading.Semaphore
і поверніть для нього проксі.
-
Array
(typecode, sequence)¶ Створіть масив і поверніть для нього проксі.
-
Value
(typecode, value)¶ Створіть об’єкт із доступним для запису атрибутом «значення» та поверніть для нього проксі-сервер.
Змінено в версії 3.6: Спільні об’єкти можуть бути вкладеними. Наприклад, спільний об’єкт-контейнер, такий як спільний список, може містити інші спільні об’єкти, якими керуватиме та синхронізуватиме
SyncManager
.-
-
class
multiprocessing.managers.
Namespace
¶ Тип, який можна зареєструвати в
SyncManager
.Об’єкт простору імен не має відкритих методів, але має атрибути, доступні для запису. Його подання показує значення його атрибутів.
Однак, коли використовується проксі для об’єкта простору імен, атрибут, що починається з
'_''
буде атрибутом проксі, а не атрибутом референта:>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
Індивідуальні менеджери¶
Щоб створити власний менеджер, потрібно створити підклас BaseManager
і використовувати метод класу register()
для реєстрації нових типів або викликів у класі менеджера. Наприклад:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
Використання віддаленого менеджера¶
Можна запустити керуючий сервер на одній машині, а клієнти використовуватимуть його з інших машин (за умови, що задіяні брандмауери дозволяють це).
Виконання наступних команд створює сервер для однієї спільної черги, до якої мають доступ віддалені клієнти:
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Один клієнт може отримати доступ до сервера наступним чином:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
Інший клієнт також може використовувати його:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
Локальні процеси також можуть отримати доступ до цієї черги, використовуючи код вище на клієнті для доступу до неї віддалено:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super().__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Проксі об’єкти¶
Проксі — це об’єкт, який посилається на спільний об’єкт, який живе (імовірно) в іншому процесі. Спільний об’єкт називається референтом проксі. Кілька проксі-об’єктів можуть мати один і той же референт.
Проксі-об’єкт має методи, які викликають відповідні методи його референта (хоча не кожен метод референта обов’язково буде доступним через проксі). Таким чином, проксі можна використовувати так само, як і його референт:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Зауважте, що застосування str()
до проксі поверне подання референта, тоді як застосування repr()
поверне подання проксі.
Важливою особливістю проксі-об’єктів є те, що їх можна вибирати, тому їх можна передавати між процесами. Таким чином, референт може містити Проксі об’єкти. Це дозволяє вкладати ці керовані списки, dicts та інші Проксі об’єкти:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
Подібним чином, проксі dict і список можуть бути вкладені один в одного:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
Якщо стандартні (не проксі) list
або dict
об’єкти містяться в референті, модифікації цих змінних значень не поширюватимуться через менеджер, оскільки проксі не може дізнатися, коли значення що містяться в них, змінено. Однак збереження значення в проксі-контейнері (що запускає __setitem__
в проксі-об’єкті) поширюється через менеджер, тому для ефективної зміни такого елемента можна повторно призначити змінене значення проксі-серверу контейнера: :
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
Цей підхід, можливо, менш зручний, ніж використання вкладених Проксі об’єкти для більшості випадків використання, але також демонструє рівень контролю над синхронізацією.
Примітка
Проксі-типи в multiprocessing
не підтримують порівняння за значенням. Так, наприклад, ми маємо:
>>> manager.list([1,2,3]) == [1,2,3]
False
Під час порівнянь слід просто використовувати копію референта.
-
class
multiprocessing.managers.
BaseProxy
¶ Проксі-об’єкти є екземплярами підкласів
BaseProxy
.-
_callmethod
(methodname[, args[, kwds]])¶ Виклик і повернення результату методу референта проксі.
Якщо
proxy
є проксі, референтом якого єobj
, тоді виразproxy._callmethod(methodname, args, kwds)
обчислить вираз
getattr(obj, methodname)(*args, **kwds)
в процесі менеджера.
Поверненим значенням буде копія результату виклику або проксі для нового спільного об’єкта – див. документацію щодо аргументу method_to_typeid
BaseManager.register()
.Якщо виклик викликає виняток, він повторно викликається
_callmethod()
. Якщо в процесі менеджера виникає інший виняток, він перетворюється на винятокRemoteError
і викликається_callmethod()
.Зокрема, зауважте, що виняток буде створено, якщо methodname не було виявлено.
Приклад використання
_callmethod()
:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
-
_getvalue
()¶ Повернути копію референту.
Якщо референт неможливо вибрати, це спричинить виняток.
-
__repr__
()¶ Повертає представлення проксі-об’єкта.
-
__str__
()¶ Повернути представлення референта.
-
Прибирати¶
Проксі-об’єкт використовує зворотний виклик weakref, щоб, коли він збирає сміття, він скасовує реєстрацію в менеджері, якому належить його референт.
Спільний об’єкт видаляється з процесу менеджера, коли більше немає проксі-серверів, які посилаються на нього.
Пули процесів¶
Можна створити пул процесів, які виконуватимуть передані йому завдання за допомогою класу Pool
.
-
class
multiprocessing.pool.
Pool
([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶ Об’єкт пулу процесів, який керує пулом робочих процесів, до яких можна надсилати завдання. Він підтримує асинхронні результати з тайм-аутами та зворотними викликами та має реалізацію паралельної карти.
processes is the number of worker processes to use. If processes is
None
then the number returned byos.cpu_count()
is used.Якщо initializer не
None
, тоді кожен робочий процес викличеinitializer(*initargs)
під час свого запуску.maxtasksperchild — це кількість завдань, які робочий процес може виконати, перш ніж він вийде та буде замінений новим робочим процесом, щоб звільнити невикористані ресурси. Типовим значенням maxtasksperchild є
None
, що означає, що робочі процеси живуть стільки ж, скільки пул.context можна використовувати для визначення контексту, який використовується для запуску робочих процесів. Зазвичай пул створюється за допомогою функції
multiprocessing.Pool()
або методуPool()
контекстного об’єкта. В обох випадках контекст встановлено належним чином.Зауважте, що методи об’єкта пулу має викликати тільки процес, який створив пул.
Попередження
Об’єкти
multiprocessing.pool
мають внутрішні ресурси, якими потрібно належним чином керувати (як і будь-яким іншим ресурсом), використовуючи пул як контекстний менеджер або викликаючиclose()
іterminate()
вручну. Якщо цього не зробити, процес може призупинити завершення.Note that is not correct to rely on the garbage colletor to destroy the pool as CPython does not assure that the finalizer of the pool will be called (see
object.__del__()
for more information).Нове в версії 3.2: maxtasksperchild
Нове в версії 3.4: context
Примітка
Робочі процеси в межах
Pool
зазвичай живі протягом повної тривалості робочої черги пулу. Частий шаблон, який зустрічається в інших системах (таких як Apache, mod_wsgi тощо) для звільнення ресурсів, які зберігаються робочими засобами, полягає в тому, щоб дозволити робочому в межах пулу завершити лише певний обсяг роботи перед виходом, очищенням і породженням нового процесу на заміну старого. Аргумент maxtasksperchild дляPool
надає цю можливість кінцевому користувачеві.-
apply
(func[, args[, kwds]])¶ Виклик func з аргументами args і ключовими аргументами kwds. Блокується, поки не буде готовий результат. Враховуючи ці блоки,
apply_async()
краще підходить для виконання роботи паралельно. Крім того, func виконується лише в одному з воркерів пулу.
-
apply_async
(func[, args[, kwds[, callback[, error_callback]]]])¶ Варіант методу
apply()
, який повертає об’єктAsyncResult
.Якщо вказано callback, це має бути виклик, який приймає один аргумент. Коли результат стає готовим, до нього застосовується callback, якщо тільки виклик не вдався, у цьому випадку замість нього застосовується error_callback.
Якщо вказано error_callback, це має бути виклик, який приймає один аргумент. Якщо цільова функція дає збій, то error_callback викликається з екземпляром винятку.
Зворотні виклики мають завершитися негайно, інакше потік, який обробляє результати, буде заблоковано.
-
map
(func, iterable[, chunksize])¶ Паралельний еквівалент вбудованої функції
map()
(хоча вона підтримує лише один аргумент iterable, для кількох ітерацій див.starmap()
). Блокується, поки не буде готовий результат.Цей метод розбиває iterable на декілька фрагментів, які він надсилає до пулу процесів як окремі завдання. (Приблизний) розмір цих фрагментів можна вказати, встановивши для chunksize додатне ціле число.
Зауважте, що це може спричинити велике використання пам’яті для дуже довгих ітерацій. Розгляньте можливість використання
imap()
абоimap_unordered()
з явним параметром chunksize для кращої ефективності.
-
map_async
(func, iterable[, chunksize[, callback[, error_callback]]])¶ Варіант методу
map()
, який повертає об’єктAsyncResult
.Якщо вказано callback, це має бути виклик, який приймає один аргумент. Коли результат стає готовим, до нього застосовується callback, якщо тільки виклик не вдався, у цьому випадку замість нього застосовується error_callback.
Якщо вказано error_callback, це має бути виклик, який приймає один аргумент. Якщо цільова функція дає збій, то error_callback викликається з екземпляром винятку.
Зворотні виклики мають завершитися негайно, інакше потік, який обробляє результати, буде заблоковано.
-
imap
(func, iterable[, chunksize])¶ Ленича версія
map()
.Аргумент chunksize такий самий, як той, який використовується методом
map()
. Для дуже довгих ітерацій використання великого значення для chunksize може зробити роботу завершеною набагато швидше, ніж використання значення за замовчуванням1
.Крім того, якщо chunksize дорівнює
1
, тоді методnext()
ітератора, який повертає методimap()
, має додатковий параметр timeout:next(timeout)
викличеmultiprocessing.TimeoutError
, якщо результат не може бути повернутий протягом часу очікування секунд.
-
imap_unordered
(func, iterable[, chunksize])¶ Те саме, що
imap()
, за винятком того, що порядок результатів від повернутого ітератора слід вважати довільним. (Тільки коли є лише один робочий процес, порядок гарантовано буде «правильним».)
-
starmap
(func, iterable[, chunksize])¶ Подібно до
map()
, за винятком того, що елементи iterable мають бути ітерованими, які розпаковуються як аргументи.Тому ітерація
[(1,2), (3, 4)]
призводить до[func(1,2), func(3,4)]
.Нове в версії 3.3.
-
starmap_async
(func, iterable[, chunksize[, callback[, error_callback]]])¶ Комбінація
starmap()
іmap_async()
, яка виконує ітерацію по iterable ітерацій і викликає func з розпакованими ітераціями. Повертає об’єкт результату.Нове в версії 3.3.
-
close
()¶ Запобігає надсиланню додаткових завдань до пулу. Після виконання всіх завдань робочі процеси завершаться.
-
terminate
()¶ Негайно зупиняє робочі процеси, не завершуючи незавершену роботу. Коли об’єкт пулу збирається як сміття, негайно буде викликано
terminate()
.
-
join
()¶ Зачекайте, поки робочі процеси завершаться. Потрібно викликати
close()
абоterminate()
перед використаннямjoin()
.
Нове в версії 3.3: Об’єкти пулу тепер підтримують протокол керування контекстом – див. Типи менеджера контексту.
__enter__()
повертає об’єкт пулу, а__exit__()
викликаєterminate()
.-
-
class
multiprocessing.pool.
AsyncResult
¶ Клас результату, який повертають
Pool.apply_async()
іPool.map_async()
.-
get
([timeout])¶ Поверніть результат, коли він надійде. Якщо timeout не
None
і результат не надходить протягом timeout секунд, тоді виникаєmultiprocessing.TimeoutError
. Якщо віддалений виклик викликав виняток, цей виняток буде повторно викликаноget()
.
-
wait
([timeout])¶ Зачекайте, поки буде доступний результат або поки не мине тайм-аут секунди.
-
ready
()¶ Повідомити, чи завершено виклик.
-
successful
()¶ Повертає, чи завершено виклик без виклику винятку. Викличе
ValueError
, якщо результат не готовий.Змінено в версії 3.7: Якщо результат не готовий, замість
AssertionError
виникаєValueError
.
-
Наступний приклад демонструє використання пулу:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
Слухачі та клієнти¶
Зазвичай передача повідомлень між процесами здійснюється за допомогою черг або за допомогою об’єктів Connection
, які повертає Pipe()
.
Однак модуль multiprocessing.connection
забезпечує додаткову гнучкість. По суті, це надає API високого рівня, орієнтований на повідомлення, для роботи з сокетами або іменованими каналами Windows. Він також підтримує дайджест-автентифікацію за допомогою модуля hmac
і для опитування кількох з’єднань одночасно.
-
multiprocessing.connection.
deliver_challenge
(connection, authkey)¶ Надішліть випадково згенероване повідомлення на інший кінець з’єднання та дочекайтеся відповіді.
Якщо відповідь відповідає дайджесту повідомлення з використанням authkey як ключа, тоді на інший кінець з’єднання надсилається вітальне повідомлення. Інакше виникає
AuthenticationError
.
-
multiprocessing.connection.
answer_challenge
(connection, authkey)¶ Отримайте повідомлення, обчисліть дайджест повідомлення, використовуючи authkey як ключ, а потім надішліть дайджест назад.
Якщо вітальне повідомлення не отримано, виникає
AuthenticationError
.
-
multiprocessing.connection.
Client
(address[, family[, authkey]])¶ Спроба встановити з’єднання зі слухачем, який використовує адресу address, повертаючи
Connection
.Тип з’єднання визначається аргументом family, але зазвичай його можна опустити, оскільки його зазвичай можна визначити з формату address. (Див. Формати адрес)
If authkey is given and not None, it should be a byte string and will be used as the secret key for an HMAC-based authentication challenge. No authentication is done if authkey is None.
AuthenticationError
is raised if authentication fails. See Ключі автентифікації.
-
class
multiprocessing.connection.
Listener
([address[, family[, backlog[, authkey]]]])¶ Обгортка для пов’язаного сокета або каналу з іменем Windows, який «слухає» з’єднання.
address — це адреса, яка буде використовуватися зв’язаним сокетом або іменованим каналом об’єкта слухача.
Примітка
Якщо використовується адреса «0.0.0.0», ця адреса не буде кінцевою точкою підключення в Windows. Якщо вам потрібна підключена кінцева точка, вам слід використовувати «127.0.0.1».
сімейство — це тип розетки (або названої труби), яку слід використовувати. Це може бути один із рядків
'AF_INET'
(для сокета TCP),'AF_UNIX'
(для сокета домену Unix) або'AF_PIPE''
(для іменованого каналу Windows) . З них лише перший гарантовано доступний. Якщо family має значенняNone
, тоді сім’я виводиться з формату address. Якщо адреса такожNone
, тоді вибрано значення за замовчуванням. Це за замовчуванням сімейство, яке вважається найшвидшим із доступних. Дивіться Формати адрес. Зауважте, що якщо сімейство має значення'AF_UNIX
, а адресаNone
, то сокет буде створено в приватному тимчасовому каталозі, створеному за допомогоюtempfile.mkstemp()
.Якщо об’єкт слухача використовує сокет, тоді backlog (1 за замовчуванням) передається в метод
listen()
сокета після того, як його буде зв’язано.If authkey is given and not None, it should be a byte string and will be used as the secret key for an HMAC-based authentication challenge. No authentication is done if authkey is None.
AuthenticationError
is raised if authentication fails. See Ключі автентифікації.-
accept
()¶ Прийняти підключення до зв’язаного сокета або іменованого каналу об’єкта слухача та повернути об’єкт
Connection
. Якщо спроба автентифікації не вдається, виникаєAuthenticationError
.
-
close
()¶ Закрийте прив’язаний сокет або іменований канал об’єкта слухача. Це викликається автоматично, коли слухач збирає сміття. Однак бажано називати це явно.
Об’єкти слухача мають такі властивості лише для читання:
-
address
¶ Адреса, яка використовується об’єктом Listener.
-
last_accepted
¶ Адреса, з якої надійшло останнє прийняте підключення. Якщо це недоступно, це
None
.
Нове в версії 3.3: Об’єкти слухача тепер підтримують протокол керування контекстом – див. Типи менеджера контексту.
__enter__()
повертає об’єкт слухача, а__exit__()
викликаєclose()
.-
-
multiprocessing.connection.
wait
(object_list, timeout=None)¶ Зачекайте, поки об’єкт у object_list буде готовий. Повертає список тих об’єктів у object_list, які готові. Якщо timeout є числом з плаваючою точкою, виклик блокується щонайбільше на стільки секунд. Якщо timeout має значення
None
, тоді він блокуватиметься на необмежений період. Від’ємний тайм-аут еквівалентний нульовому тайм-ауту.For both Unix and Windows, an object can appear in object_list if it is
читабельний об’єкт
Connection
;підключений і читабельний об’єкт
socket.socket
; або
Об’єкт з’єднання або сокета готовий, коли є доступні дані для читання з нього, або інший кінець закрито.
Unix:
wait(object_list, timeout)
almost equivalentselect.select(object_list, [], [], timeout)
. The difference is that, ifselect.select()
is interrupted by a signal, it can raiseOSError
with an error number ofEINTR
, whereaswait()
will not.Windows: An item in object_list must either be an integer handle which is waitable (according to the definition used by the documentation of the Win32 function
WaitForMultipleObjects()
) or it can be an object with afileno()
method which returns a socket handle or pipe handle. (Note that pipe handles and socket handles are not waitable handles.)Нове в версії 3.3.
Приклади
Наступний код сервера створює прослуховувач, який використовує 'секретний пароль
як ключ автентифікації. Потім він очікує з’єднання та надсилає деякі дані клієнту:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
Наступний код підключається до сервера та отримує деякі дані з сервера:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
Наступний код використовує wait()
для очікування повідомлень від кількох процесів одночасно:
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
Формати адрес¶
Адреса «AF_INET» — це кортеж у формі «(ім’я хоста, порт)», де ім’я хоста — рядок, а порт — ціле число.
Адреса «AF_UNIX» — це рядок, що представляє назву файлу у файловій системі.
An
'AF_PIPE'
address is a string of the formr'\.\pipe{PipeName}'
. To useClient()
to connect to a named pipe on a remote computer called ServerName one should use an address of the formr'\ServerName\pipe{PipeName}'
instead.
Зауважте, що будь-який рядок, який починається двома зворотними похилими рисками, за замовчуванням вважається адресою 'AF_PIPE'
, а не адресою 'AF_UNIX'
.
Ключі автентифікації¶
Коли використовується Connection.recv
, отримані дані автоматично видаляються. На жаль, видалення даних із ненадійного джерела становить загрозу безпеці. Тому Listener
і Client()
використовують модуль hmac
для забезпечення автентифікації дайджесту.
Ключ автентифікації — це рядок байтів, який можна розглядати як пароль: коли з’єднання встановлено, обидва кінці вимагатимуть підтвердження того, що інший знає ключ автентифікації. (Демонстрація того, що обидві сторони використовують той самий ключ, не передбачає надсилання ключа через з’єднання.)
Якщо автентифікація запитується, але ключ автентифікації не вказано, тоді використовується значення, що повертається current_process().authkey
(див. Process
). Це значення буде автоматично успадковано будь-яким об’єктом Process
, який створює поточний процес. Це означає, що (за замовчуванням) усі процеси багатопроцесної програми спільно використовуватимуть один ключ автентифікації, який можна використовувати під час встановлення з’єднань між собою.
Відповідні ключі автентифікації також можна згенерувати за допомогою os.urandom()
.
Лісозаготівля¶
Доступна певна підтримка журналювання. Однак зауважте, що пакунок logging
не використовує спільні блокування процесів, тому (залежно від типу обробника) повідомлення від різних процесів можуть переплутатися.
-
multiprocessing.
get_logger
()¶ Повертає реєстратор, який використовується
multiprocessing
. За потреби буде створено новий.When first created the logger has level
logging.NOTSET
and no default handler. Messages sent to this logger will not by default propagate to the root logger.Зауважте, що у Windows дочірні процеси успадковуватимуть лише рівень реєстратора батьківського процесу – будь-які інші налаштування реєстратора не успадковуватимуться.
-
multiprocessing.
log_to_stderr
(level=None)¶ Ця функція виконує виклик
get_logger()
, але окрім повернення реєстратора, створеного get_logger, вона додає обробник, який надсилає вихідні дані доsys.stderr
у форматі'[%(levelname)s/%(processName)s] %(message)s ''
. Ви можете змінитиlevelname
реєстратора, передавши аргументlevel
.
Нижче наведено приклад сеансу з увімкненим журналюванням:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
Щоб отримати повну таблицю рівнів журналювання, перегляньте модуль logging
.
Модуль multiprocessing.dummy
¶
multiprocessing.dummy
повторює API multiprocessing
, але є не більше ніж обгорткою модуля threading
.
Зокрема, функція Pool
, надана multiprocessing.dummy
, повертає екземпляр ThreadPool
, який є підкласом Pool
, який підтримує всі виклики методів, але використовує пул робочих потоків, а не робочих процесів.
-
class
multiprocessing.pool.
ThreadPool
([processes[, initializer[, initargs]]])¶ Об’єкт пулу потоків, який керує пулом робочих потоків, до яких можна надсилати завдання. Екземпляри
ThreadPool
повністю сумісні з інтерфейсом екземплярівPool
, і їхніми ресурсами також потрібно правильно керувати, використовуючи пул як контекстний менеджер або викликаючиclose()
іterminate()
вручну.processes is the number of worker threads to use. If processes is
None
then the number returned byos.cpu_count()
is used.Якщо initializer не
None
, тоді кожен робочий процес викличеinitializer(*initargs)
під час свого запуску.На відміну від
Pool
, maxtasksperchild і context не можна надати.Примітка
ThreadPool
має той самий інтерфейс, що йPool
, який розроблено навколо пулу процесів і передує появі модуляconcurrent.futures
. Таким чином, він успадковує деякі операції, які не мають сенсу для пулу, що підтримується потоками, і має власний тип для представлення статусу асинхронних завдань,AsyncResult
, який не розуміється жодною іншою бібліотекою.Зазвичай користувачі мають віддавати перевагу використанню
concurrent.futures.ThreadPoolExecutor
, який має простіший інтерфейс, розроблений навколо потоків із самого початку та повертаєconcurrent.futures.Future
екземпляри, сумісні з багатьма інші бібліотеки, включаючиasyncio
.
Інструкції з програмування¶
Існують певні вказівки та ідіоми, яких слід дотримуватися під час використання multiprocessing
.
Всі методи запуску¶
Наступне стосується всіх методів запуску.
Уникайте спільного стану
Наскільки це можливо, слід намагатися уникати переміщення великих обсягів даних між процесами.
Ймовірно, найкраще використовувати черги або канали для зв’язку між процесами, а не використовувати примітиви синхронізації нижчого рівня.
Пробірність
Переконайтеся, що аргументи методів проксі-серверів можна вибрати.
Безпека потоків проксі
Не використовуйте проксі-об’єкт із кількох потоків, якщо ви не захистите його за допомогою блокування.
(Ніколи не виникає проблем із різними процесами, які використовують той самий проксі.)
Приєднання до зомбованих процесів
On Unix when a process finishes but has not been joined it becomes a zombie. There should never be very many because each time a new process starts (or
active_children()
is called) all completed processes which have not yet been joined will be joined. Also calling a finished process’sProcess.is_alive
will join the process. Even so it is probably good practice to explicitly join all the processes that you start.
Краще успадкувати, ніж маринувати/розмаринувати
Під час використання методів запуску spawn або forkserver багато типів із
multiprocessing
мають бути доступними для вибору, щоб дочірні процеси могли їх використовувати. Однак зазвичай слід уникати надсилання спільних об’єктів іншим процесам за допомогою каналів або черг. Натомість ви повинні організувати програму так, щоб процес, якому потрібен доступ до спільного ресурсу, створеного в іншому місці, міг успадкувати його від процесу-предка.
Уникайте завершення процесів
Використання методу
Process.terminate
для зупинки процесу може призвести до того, що будь-які спільні ресурси (такі як блокування, семафори, канали та черги), які зараз використовуються цим процесом, стануть несправними або недоступними для інших процесів.Тому, ймовірно, найкраще використовувати
Process.terminate
лише для процесів, які ніколи не використовують спільні ресурси.
Приєднання до процесів, які використовують черги
Майте на увазі, що процес, який поставив елементи в чергу, чекатиме перед завершенням, доки всі буферизовані елементи не будуть передані потоком «фідера» до основного каналу. (Дочірній процес може викликати метод
Queue.cancel_join_thread
черги, щоб уникнути такої поведінки.)Це означає, що щоразу, коли ви використовуєте чергу, вам потрібно переконатися, що всі елементи, які було поставлено в чергу, зрештою буде видалено перед приєднанням до процесу. Інакше ви не можете бути впевнені, що процеси, які поставили елементи в чергу, завершаться. Пам’ятайте також, що недемонічні процеси будуть приєднані автоматично.
Прикладом, який призведе до взаємоблокування, є наступний:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()Виправити тут можна було б поміняти місцями останні два рядки (або просто видалити рядок
p.join()
).
Явно передати ресурси дочірнім процесам
On Unix using the fork start method, a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process.
Окрім того, що код (потенційно) сумісний із Windows та іншими методами запуску, це також гарантує, що поки дочірній процес живий, об’єкт не збиратиме сміття в батьківському процесі. Це може бути важливо, якщо якийсь ресурс звільняється, коли об’єкт збирається як сміття в батьківському процесі.
Так наприклад
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()слід переписати як
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
Остерігайтеся заміни sys.stdin
на «файлоподібний об’єкт»
multiprocessing
спочатку безумовно називався:os.close(sys.stdin.fileno())у методі
multiprocessing.Process._bootstrap()
— це призвело до проблем із процесами в процесах. Це було змінено на:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)Which solves the fundamental issue of processes colliding with each other resulting in a bad file descriptor error, but introduces a potential danger to applications which replace
sys.stdin()
with a «file-like object» with output buffering. This danger is that if multiple processes callclose()
on this file-like object, it could result in the same data being flushed to the object multiple times, resulting in corruption.Якщо ви пишете файлоподібний об’єкт і використовуєте власне кешування, ви можете зробити його безпечним для розгалуження, зберігаючи pid кожного разу, коли ви додаєте його до кешу, і відкидаючи кеш, коли pid змінюється. Наприклад:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cacheДля отримання додаткової інформації перегляньте bpo-5155, bpo-5313 та bpo-5331
Методи запуску spawn і forkserver¶
There are a few extra restriction which don’t apply to the fork start method.
Більше маринування
Переконайтеся, що всі аргументи
Process.__init__()
можна вибрати. Крім того, якщо ви створите підкласProcess
, то переконайтеся, що екземпляри можна вибрати під час виклику методуProcess.start
.
Глобальні змінні
Майте на увазі, що якщо код, запущений у дочірньому процесі, намагається отримати доступ до глобальної змінної, тоді значення, яке він бачить (якщо таке є), може не збігатися зі значенням у батьківському процесі під час
Process.start викликано
.Однак глобальні змінні, які є лише константами рівня модуля, не викликають проблем.
Безпечне імпортування основного модуля
Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).
Наприклад, використання методу запуску spawn або forkserver під час запуску наступного модуля призведе до помилки з
RuntimeError
:from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()Натомість слід захистити «точку входу» програми за допомогою
if __name__ == '__main__':
наступним чином:from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(Рядок
freeze_support()
можна опустити, якщо програма буде працювати нормально, а не зависати.)Це дозволяє щойно створеному інтерпретатору Python безпечно імпортувати модуль, а потім запускати функцію foo() модуля.
Подібні обмеження застосовуються, якщо пул або менеджер створено в основному модулі.
Приклади¶
Демонстрація створення та використання налаштованих менеджерів і проксі-серверів:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
Використання Pool
:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
Приклад, який показує, як використовувати черги для передачі завдань у колекцію робочих процесів і збору результатів:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()