17.2. multiprocessing
— Parallélisme par processus¶
Code source : Lib/multiprocessing/
17.2.1. Introduction¶
multiprocessing
est un paquet qui permet l’instanciation de processus via la même API que le module threading
. Le paquet multiprocessing
offre à la fois des possibilités de programmation concurrente locale ou à distance, contournant les problèmes du Global Interpreter Lock en utilisant des processus plutôt que des fils d’exécution. Ainsi, le module multiprocessing
permet au développeur de bénéficier entièrement des multiples processeurs sur une machine. Il tourne à la fois sur les systèmes Unix et Windows.
Le module multiprocessing
introduit aussi des API sans analogues dans le module threading
. Un exemple est l’objet Pool
qui offre une manière pratique de paralléliser l’exécution d’une fonction sur de multiples valeurs d’entrée, distribuant ces valeurs entre les processus (parallélisme de données). L’exemple suivant présente la manière classique de définir une telle fonction dans un module afin que les processus fils puissent importer ce module avec succès. L’exemple basique de parallélisme de données utilise Pool
,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
affiche sur la sortie standard
[1, 4, 9]
17.2.1.1. La classe Process
¶
Dans le module multiprocessing
, les processus sont instanciés en créant un objet Process
et en appelant sa méthode start()
. La classe Process
suit la même API que threading.Thread
. Un exemple trivial d’un programme multi-processus est
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
Pour afficher les IDs des processus impliqués, voici un exemple plus étoffé :
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
La nécessité de la ligne if __name__ == '__main__'
est expliquée par Lignes directrices de programmation.
17.2.1.2. Contextes et méthodes de démarrage¶
Suivant la plateforme, multiprocessing
gère trois manières de démarrer un processus. Ces méthodes de démarrage sont
- spawn
Le processus parent démarre un processus neuf avec un interpréteur Python. Le processus fils hérite uniquement des ressources nécessaires pour exécuter la méthode
run()
de l’objet associé au processus. En particulier, les descripteurs de fichiers superflus et gérés par le processus parent ne sont pas hérités. Démarrer un processus en utilisant cette méthode est plutôt lent par rapport à fork ou forkserver.Disponible sur Unix et Windows. Par défaut sur Windows.
- fork
Le processus parent utilise
os.fork()
pour forker l’interpréteur Python. Le processus fils, quand il démarre, est effectivement identique au processus parent. Toutes les ressources du parent sont héritées par le fils. Notez qu’il est problématique de forker sans danger un processus multi-threadé.Disponible uniquement sous Unix. Par défaut sous Unix.
- forkserver
Quand le programme démarre et choisit la méthode de démarrage forkserver, un processus serveur est lancé. Dès lors, chaque fois qu’un nouveau processus est nécessaire, le processus parent se connecte au serveur et lui demande de forker un nouveau processus. Le processus serveur de fork n’utilisant qu’un seul fil d’exécution, il peut utiliser
os.fork()
sans danger. Les ressources superflues ne sont pas héritées.Disponible sur les plateformes Unix qui acceptent le passage de descripteurs de fichiers à travers des tubes (pipes) Unix.
Modifié dans la version 3.4: spawn ajouté à toutes les plateformes Unix, et forkserver ajouté à certaines plateformes Unix. Les processus fils n’héritent plus de tous les descripteurs héritables du parent sous Windows.
Sous Unix, utiliser les méthodes de démarrage spawn ou forkserver démarre aussi un processus semaphore tracker qui traque les sémaphores nommés non libérés créés par les processus du programme. Quand tous les processus sont terminés, le traqueur de sémaphores libère les sémaphores restants. Généralement il ne devrait pas y en avoir, mais si un processus a été tué par un signal, certains sémaphores ont pu « fuiter ». (Libérer les sémaphores nommés est une affaire sérieuse puisque le système n’en autorise qu’un certain nombre, et qu’ils ne seront pas automatiquement libérés avant le prochain redémarrage.)
Pour sélectionner une méthode de démarrage, utilisez la fonction set_start_method()
dans la clause if __name__ == '__main__'
du module principal. Par exemple :
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method()
ne doit pas être utilisée plus d’une fois dans le programme.
Alternativement, vous pouvez utiliser get_context()
pour obtenir un contexte. Les contextes ont la même API que le module multiprocessing, et permettent l’utilisation de plusieurs méthodes de démarrage dans un même programme.
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
Notez que les objets relatifs à un contexte ne sont pas forcément compatibles avec les processus d’un contexte différent. En particulier, les verrous créés avec le contexte fork ne peuvent pas être passés aux processus lancés avec les méthodes spawn ou forkserver.
Une bibliothèque qui veut utiliser une méthode de démarrage particulière devrait probablement faire appel à get_context()
pour éviter d’interférer avec le choix de l’utilisateur de la bibliothèque.
17.2.1.3. Échange d’objets entre les processus¶
multiprocessing
gère deux types de canaux de communication entre les processus :
Queues
La classe
Queue
est un clone assez proche dequeue.Queue
. Par exemple :from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()Les queues peuvent être utilisées par plusieurs fils d’exécution ou processus.
Tubes (pipes)
La fonction
Pipe()
renvoie une paire d’objets de connexion connectés à un tube qui est par défaut à double-sens. Par exemple :from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()Les deux objets de connexion renvoyés par
Pipe()
représentent les deux bouts d’un tube. Chaque objet de connexion possède (entre autres) des méthodessend()
etrecv()
. Notez que les données d’un tube peuvent être corrompues si deux processus (ou fils d’exécution) essaient de lire ou d’écrire sur le même bout du tube en même temps. Évidemment il n’y a pas de risque de corruption si les processus utilisent deux bouts différents en même temps.
17.2.1.4. Synchronisation entre processus¶
multiprocessing
contient des équivalents à toutes les primitives de synchronisation de threading
. Par exemple il est possible d’utiliser un verrou pour s’assurer qu’un seul processus à la fois écrit sur la sortie standard :
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
Sans le verrou, les sorties des différents processus risquent d’être mélangées.
17.2.1.5. Partager un état entre les processus¶
Comme mentionné plus haut, il est généralement préférable d’éviter autant que possible d’utiliser des états partagés en programmation concurrente. C’est particulièrement vrai quand plusieurs processus sont utilisés.
Cependant, si vous devez réellement partager des données, multiprocessing
permet de le faire de deux manières.
Mémoire partagée
Les données peuvent être stockées dans une mémoire partagée en utilisant des
Value
ou desArray
. Par exemple, le code suivantfrom multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])affiche
3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]Les arguments
'd'
et'i'
utilisés à la création desnum
et arr` sont des codes de types tels qu’utilisés par le modulearray
:'d'
indique un flottant double-précision et'i'
indique un entier signé. Ces objets partagés seront sûr d’utilisation entre processus et fils d’exécution.Pour plus de flexibilité dans l’utilisation de mémoire partagée, vous pouvez utiliser le module
multiprocessing.sharedctypes
qui permet la création d’objets arbitraires ctypes alloués depuis la mémoire partagée.
Processus serveur
Un objet gestionnaire renvoyé par
Manager()
contrôle un processus serveur qui détient les objets Python et autorise les autres processus à les manipuler à l’aide de mandataires.Un gestionnaire renvoyé par
Manager()
supportera les typeslist
,dict
,Namespace
,Lock
,RLock
,Semaphore
,BoundedSemaphore
,Condition
,Event
,Barrier
,Queue
,Value
etArray
. Par exemple,from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)affiche
{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]Les processus serveurs de gestionnaires sont plus flexibles que les mémoires partagées parce qu’ils peuvent gérer des types d’objets arbitraires. Aussi, un gestionnaire unique peut être partagé par les processus sur différentes machines à travers le réseau. Cependant, ils sont plus lents que les mémoires partagées.
17.2.1.6. Utiliser un réservoir de workers¶
La classe Pool
représente une pool de processus de travail. Elle possède des méthodes qui permettent aux tâches d’être déchargées vers les processus de travail de différentes manières.
Par exemple :
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
Notez que les méthodes d’une pool ne devraient être utilisées que par le processus qui l’a créée.
Note
Fonctionnellement ce paquet exige que que le module __main__
soit importable par les fils. Cela est expliqué sur la page Lignes directrices de programmation, il est cependant utile de le rappeler ici. Cela signifie que certains exemples, comme les exemples utilisant multiprocessing.pool.Pool
, ne fonctionnent pas dans l’interpréteur interactif. Par exemple :
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(Si vous essayez ce code, il affichera trois traces d’appels complètes entrelacées de manière semi-aléatoire, et vous aurez alors à stopper le processus maître.)
17.2.2. Référence¶
Le paquet multiprocessing
reproduit en grande partie l’API du module threading
.
17.2.2.1. Process
et exceptions¶
-
class
multiprocessing.
Process
(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶ Les objets process représentent une activité exécutée dans un processus séparé. La classe
Process
a des équivalents à toutes les méthodes dethreading.Thread
.Le constructeur doit toujours être appelé avec des arguments nommés. group devrait toujours être
None
; il existe uniquement pour la compatibilité avecthreading.Thread
. target est l’objet appelable qui est invoqué par la méthoderun(). Il vaut ``None`()
par défaut, signifiant que rien n’est appelé. name est le nom du processus (voirname
pour plus de détails). args est le tuple d’arguments pour l’invocation de la cible. kwargs est le dictionnaire des arguments nommés pour l’invocation de la cible. S’il est fourni, l’argument nommé daemon met l’optiondaemon
du processus àTrue
ouFalse
. S’il estNone
(par défaut), l’option est héritée par le processus créateur.Par défaut, aucun argument n’est passé à target.
Si une sous-classe redéfinit le constructeur, elle doit s’assurer d’invoquer le constructeur de la classe de base (
Process.__init__()
) avant de faire autre chose du processus.Modifié dans la version 3.3: Ajout de l’argument daemon.
-
run
()¶ Méthode représentant l’activité du processus.
Vous pouvez redéfinir cette méthode dans une sous-classe. La méthode standard
run()
invoque l’objet appelable passé au constructeur comme argument target, si fourni, avec les arguments séquentiels et nommés respectivement pris depuis les paramètres args et kwargs.
-
start
()¶ Démarre l’activité du processus.
Elle doit être appelée au plus une fois par objet processus. Elle s’arrange pour que la méthode
run()
de l’objet soit invoquée dans un processus séparé.
-
join
([timeout])¶ Si l’argument optionnel timeout est
None
(par défaut), la méthode bloque jusqu’à ce que le processus dont la méthodejoin()
a été appelée se termine. Si timeout est un nombre positif, elle bloque au maximum pendant timeout secondes. Notez que la méthode renvoieNone
si le processus se termine ou si le temps d’exécution expire. Vérifiez l’attributexitcode
du processus pour déterminer s’il s’est terminé.join peut être appelée plusieurs fois sur un même processus.
Un processus ne peut pas s’attendre lui-même car cela causerait un interblocage. C’est une erreur d’essayer d’attendre un processus avant qu’il ne soit démarré.
-
name
¶ Le nom du processus. Le nom est une chaîne de caractères utilisée uniquement pour l’identification du processus. Il n’a pas de sémantique. Plusieurs processus peuvent avoir le même nom.
Le nom initial est déterminé par le constructeur. Si aucun nom explicite n’est fourni au constructeur, un nom de la forme « Process-N1:N2:…:Nk » est construit, où chaque Nk est le N-ième enfant de son parent.
-
is_alive
()¶ Renvoie vrai si le processus est en vie, faux sinon.
Grossièrement, un objet processus est en vie depuis le moment où la méthode
start()
finit de s’exécuter jusqu’à ce que le processus fils se termine.
-
daemon
¶ L’option daemon du processus, une valeur booléenne. L’option doit être réglée avant que la méthode
start()
ne soit appelée.La valeur initiale est héritée par le processus créateur.
Quand un processus se ferme, il tente de terminer tous ses processus enfants daemon.
Notez qu’un processus daemon n’est pas autorisé à créer des processus fils. Sinon un processus daemon laisserait ses enfants orphelins lorsqu’il se termine par la fermeture de son parent. De plus, ce ne sont pas des daemons ou services Unix, ce sont des processus normaux qui seront terminés (et non attendus) si un processus non daemon se ferme.
En plus de l’API
threading.Thread
, les objetsProcess
supportent aussi les attributs et méthodes suivants :-
pid
¶ Renvoie l’ID du processus. Avant que le processus ne soit lancé, la valeur est
None
.
-
exitcode
¶ Le code de fermeture de l’enfant. La valeur est
None
si le processus ne s’est pas encore terminé. Une valeur négative -N indique que le fils a été terminé par un signal N.
-
authkey
¶ La clé d’authentification du processus (une chaîne d’octets).
Quand
multiprocessing
est initialisé, une chaîne aléatoire est assignée au processus principal, en utilisantos.urandom()
.Quand un objet
Process
est créé, il hérité de la clé d’authentification de son parent, bien que cela puisse être changé à l’aide du paramètreauthkey
pour une autre chaîne d’octets.Voir Clés d’authentification.
-
sentinel
¶ Un identifiant numérique de l’objet système qui devient « prêt » quand le processus se termine.
Vous pouvez utiliser cette valeur si vous voulez attendre plusieurs événements à la fois en utilisant
multiprocessing.connection.wait()
. Autrement appelerjoin()
est plus simple.Sous Windows, c’est un mécanisme de l’OS utilisable avec les familles d’appels API
WaitForSingleObject
etWaitForMultipleObjects
. Sous Unix, c’est un descripteur de fichier utilisable avec les primitives sur moduleselect
.Nouveau dans la version 3.3.
-
terminate
()¶ Termine le processus. Sous Unix cela est réalisé à l’aide d’un signal
SIGTERM
, sous WindowsTerminateProcess()
est utilisé. Notez que les gestionnaires de sortie, les clauses finally etc. ne sont pas exécutées.Notez que les descendants du processus ne seront pas terminés – ils deviendront simplement orphelins.
Avertissement
Si cette méthode est utilisée quand le processus associé utilise un tube ou une queue, alors le tube ou la queue sont susceptibles d’être corrompus et peuvent devenir inutilisables par les autres processus. De façon similaire, si le processus a acquis un verrou, un sémaphore ou autre, alors le terminer est susceptible de provoquer des blocages dans les autres processus.
Notez que les méthodes
start()
,join()
,is_alive()
,terminate()
etexitcode
ne devraient être appelées que par le processus ayant créé l’objet process.Exemple d’utilisation de quelques méthodes de
Process
:>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process(Process-1, initial)> False >>> p.start() >>> print(p, p.is_alive()) <Process(Process-1, started)> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process(Process-1, stopped[SIGTERM])> False >>> p.exitcode == -signal.SIGTERM True
-
-
exception
multiprocessing.
ProcessError
¶ La classe de base de toutes les exceptions de
multiprocessing
.
-
exception
multiprocessing.
BufferTooShort
¶ Exception levée par
Connection.recv_bytes_into()
quand l’objet tampon fourni est trop petit pour le message à lire.Si
e
est une instance deBufferTooShort
alorse.args[0]
donnera un message sous forme d’une chaîne d’octets.
-
exception
multiprocessing.
AuthenticationError
¶ Levée quand il y a une erreur d’authentification.
-
exception
multiprocessing.
TimeoutError
¶ Levée par les méthodes avec temps d’exécution limité, quand ce temps expire.
17.2.2.2. Tubes (pipes) et Queues¶
Quand de multiples processus sont utilisés, de l’échange de messages est souvent mis en place pour la communication entre processus et éviter d’avoir à utiliser des primitives de synchronisation telles que des verrous.
Pour échanger des messages vous pouvez utiliser un Pipe()
(pour une connexion entre deux processus) ou une queue (qui autorise de multiples producteurs et consommateurs).
Les types Queue
, SimpleQueue
et JoinableQueue
sont des queues FIFO multi-producteurs et multi-consommateurs modelées sur la classe queue.Queue
de la bibliothèque standard. Elles diffèrent par l’absence dans Queue
des méthodes task_done()
et join()
introduites dans la classe queue.Queue
par Python 2.5.
Si vous utilisez JoinableQueue
alors vous devez appeler JoinableQueue.task_done()
pour chaque tâche retirée de la queue, sans quoi le sémaphore utilisé pour compter le nombre de tâches non accomplies pourra éventuellement déborder, levant une exception.
Notez que vous pouvez aussi créer une queue partagée en utilisant un objet gestionnaire – voir Gestionnaires.
Note
multiprocessing
utilise les exceptions habituelles queue.Empty
et queue.Full
pour signaler un dépassement du temps maximal autorisé. Elles ne sont pas disponibles dans l’espace de nommage multiprocessing
donc vous devez les importer depuis le module queue
.
Note
Quand un objet est placé dans une queue, l’objet est sérialisé par pickle et un fil d’exécution en arrière-plan transmettra ensuite les données sérialisées sur un tube sous-jacent. Cela a certaines conséquences qui peuvent être un peu surprenantes, mais ne devrait causer aucune difficultés pratiques – si elles vous embêtent vraiment, alors vous pouvez à la place utiliser une queue créée avec un manager.
Après avoir placé un objet dans une queue vide il peut y avoir un délai infinitésimal avant que la méthode
empty()
de la queue renvoieFalse
et queget_nowait()
renvoie une valeur sans lever dequeue.Empty
.Si plusieurs processus placent des objets dans la queue, il est possible pour les objets d’être reçus de l’autre côté dans le désordre. Cependant, les objets placés par un même processus seront toujours récupérés dans l’ordre attendu.
Avertissement
Si un processus est tué à l’aide de Process.terminate()
ou os.kill()
pendant qu’il tente d’utiliser une Queue
, alors les données de la queue peuvent être corrompues. Cela peut par la suite causer des levées d’exceptions dans les autres processus quand ils tenteront d’utiliser la queue.
Avertissement
Comme mentionné plus haut, si un processus fils a placé des éléments dans la queue (et qu’il n’a pas utilisé JoinableQueue.cancel_join_thread
), alors le processus ne se terminera pas tant que les éléments placés dans le tampon n’auront pas été transmis au tube.
Cela signifie que si vous essayez d’attendre ce processus vous pouvez obtenir un interblocage, à moins que vous ne soyez sûr que tous les éléments placés dans la queue ont été consommés. De même, si le processus fils n’est pas un daemon alors le processus parent pourrait bloquer à la fermeture quand il tentera d’attendre tous ses enfants non daemons.
Notez que la queue créée à l’aide d’un gestionnaire n’a pas ce problème. Voir Lignes directrices de programmation.
Pour un exemple d’utilisation de queues pour de la communication entre les processus, voir Exemples.
-
multiprocessing.
Pipe
([duplex])¶ Renvoie une paire
(conn1, conn2)
d’objetsConnection
représentant les bouts d’un tube.Si duplex vaut
True
(par défaut), alors le tube est bidirectionnel. Si duplex vautFalse
il est unidirectionnel :conn1
ne peut être utilisé que pour recevoir des messages etconn2
que pour en envoyer.
-
class
multiprocessing.
Queue
([maxsize])¶ Renvoie une queue partagée entre les processus utilisant un tube et quelques verrous/sémaphores. Quand un processus place initialement un élément sur la queue, un fil d’exécution feeder est démarré pour transférer les objets du tampon vers le tube.
Les exceptions habituelles
queue.Empty
etqueue.Full
du modulequeue
de la bibliothèque standard sont levées pour signaler les timeouts.Queue
implémente toutes les méthodes dequeue.Queue
à l’exception detask_done()
etjoin()
.-
qsize
()¶ Renvoie la taille approximative de la queue. Ce nombre n’est pas fiable en raison des problématiques de multithreading et multiprocessing.
Notez que cela peut lever une
NotImplementedError
sous les plateformes Unix telles que Mac OS X oùsem_getvalue()
n’est pas implémentée.
-
empty
()¶ Renvoie
True
si la queue est vide,False
sinon. Cette valeur n’est pas fiable en raison des problématiques de multithreading et multiprocessing.
-
full
()¶ Renvoie
True
si la queue est pleine,False
sinon. Cette valeur n’est pas fiable en raison des problématiques de multithreading et multiprocessing.
-
put
(obj[, block[, timeout]])¶ Place obj dans la queue. Si l’argument optionnel block vaut
True
(par défaut) est que timeout estNone
(par défaut), bloque jusqu’à ce qu’un slot libre soit disponible. Si timeout est un nombre positif, la méthode bloquera au maximum timeout secondes et lèvera une exceptionqueue.Full
si aucun slot libre n’a été trouvé dans le temps imparti. Autrement (block vautFalse
), place un élément dans la queue si un slot libre est immédiatement disponible, ou lève une exceptionqueue.Full
dans le cas contraire (timeout est ignoré dans ce cas).
-
put_nowait
(obj)¶ Équivalent à
put(obj, False)
.
-
get
([block[, timeout]])¶ Retire et renvoie un élément de la queue. Si l’argument optionnel block vaut
True
(par défaut) et que timeout estNone
(par défaut), bloque jusqu’à ce qu’un élément soit disponible. Si timeout (le délai maximal autorisé) est un nombre positif, la méthode bloquera au maximum timeout secondes et lèvera une exceptionqueue.Empty
si aucun élément n’est disponible dans le temps imparti. Autrement (block vautFalse
), renvoie un élément s’il est immédiatement disponible, ou lève une exceptionqueue.Empty
dans le cas contraire (timeout est ignoré dans ce cas).
-
get_nowait
()¶ Équivalent à
get(False)
.
multiprocessing.Queue
possède quelques méthodes additionnelles non présentes dansqueue.Queue
. Ces méthodes ne sont habituellement pas nécessaires pour la plupart des codes :-
close
()¶ Indique que plus aucune donnée ne peut être placée sur la queue par le processus courant. Le fil d’exécution en arrière-plan se terminera quand il aura transféré toutes les données du tampon vers le tube. Elle est appelée automatiquement quand la queue est collectée par le ramasse-miettes.
-
join_thread
()¶ Attend le fil d’exécution d’arrière-plan. Elle peut seulement être utilisée une fois que
close()
a été appelée. Elle bloque jusqu’à ce que le fil d’arrière-plan se termine, assurant que toutes les données du tampon ont été transmises au tube.Par défaut si un processus n’est pas le créateur de la queue alors à la fermeture elle essaiera d’attendre le fil d’exécution d’arrière-plan de la queue. Le processus peut appeler
cancel_join_thread()
pour quejoin_thread()
ne fasse rien.
-
cancel_join_thread
()¶ Empêche
join_thread()
de bloquer. En particulier, cela empêche le fil d’arrière-plan d’être attendu automatiquement quand le processus se ferme – voirjoin_thread()
.Un meilleur nom pour cette méthode pourrait être
allow_exit_without_flush()
. Cela peut provoquer des pertes de données placées dans la queue, et vous ne devriez certainement pas avoir besoin de l’utiliser. Elle n’est là que si vous souhaitez terminer immédiatement le processus sans transférer les données du tampon, et que vous ne vous inquiétez pas de perdre des données.
Note
Le fonctionnement de cette classe requiert une implémentation de sémaphore partagé sur le système d’exploitation hôte. Sans cela, la fonctionnalité sera désactivée et la tentative d’instancier une
Queue
lèvera uneImportError
. Voir bpo-3770 pour plus d’informations. Cette remarque reste valable pour les autres types de queues spécialisées définies par la suite.-
-
class
multiprocessing.
SimpleQueue
¶ Un type de
Queue
simplifié, très proche d’unPipe
avec verrou.-
empty
()¶ Renvoie
True
si la queue est vide,False
sinon.
-
get
()¶ Supprime et renvoie un élément de la queue.
-
put
(item)¶ Place item dans la queue.
-
-
class
multiprocessing.
JoinableQueue
([maxsize])¶ JoinableQueue
, une sous-classe deQueue
, est une queue qui ajoute des méthodestask_done()
etjoin()
.-
task_done
()¶ Indique qu’une tâche précédemment placée dans la queue est complétée. Utilisé par les consommateurs de la queue. Pour chaque
get()
utilisé pour récupérer une tâche, un appel ultérieur àtask_done()
indique à la queue que le traitement de la tâche est terminé.Si un
join()
est actuellement bloquant, il se débloquera quand tous les éléments auront été traités (signifiant qu’un appel àtask_done()
a été reçu pour chaque élément ayant été placé viaput()
dans la queue).Lève une exception
ValueError
si appelée plus de fois qu’il y avait d’éléments dans la file.
-
join
()¶ Bloque jusqu’à ce que tous les éléments de la queue aient été récupérés et traités.
Le compteur des tâches non accomplies augmente chaque fois qu’un élément est ajouté à la queue. Le compteur redescend chaque fois qu’un consommateur appelle
task_done()
pour indiquer qu’un élément a été récupéré et que tout le travail qui le concerne est complété. Quand le compteur des tâches non accomplies atteint zéro,join()
est débloquée.
-
17.2.2.3. Divers¶
-
multiprocessing.
active_children
()¶ Renvoie la liste de tous les enfants vivants du processus courant.
Appeler cette méthode provoque l’effet de bord d’attendre tout processus qui n’a pas encore terminé.
-
multiprocessing.
cpu_count
()¶ Renvoie le nombre de CPU sur le système.
Ce nombre n’est pas équivalent au nombre de CPUs que le processus courant peut utiliser. Le nombre de CPUs utilisables peut être obtenu avec
len(os.sched_getaffinity(0))
Peut lever une
NotImplementedError
.Voir aussi
-
multiprocessing.
current_process
()¶ Renvoie l’objet
Process
correspondant au processus courant.Un analogue à
threading.current_thread()
.
-
multiprocessing.
freeze_support
()¶ Ajoute le support des programmes utilisant
multiprocessing
qui ont été gelés pour produire un exécutable Windows. (Testé avec py2exe, PyInstaller et cx_Freeze.)Cette fonction doit être appelée juste après la ligne
if __name__ == '__main__'
du module principal. Par exemple :from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
Si la ligne
freeze_support()
est omise, alors tenter de lancer l’exécutable gelé lèvera uneRuntimeError
.Appeler
freeze_support()
n’a pas d’effet quand elle est invoquée sur un système d’exploitation autre que Windows. De plus, si le module est lancé normalement par l’interpréteur Python sous Windows (le programme n’a pas été gelé), alorsfreeze_support()
n’a pas d’effet.
-
multiprocessing.
get_all_start_methods
()¶ Renvoie la liste des méthodes de démarrage supportées, la première étant celle par défaut. Les méthodes de démarrage possibles sont
'fork'
,'spawn'
et'forkserver'
. Sous Windows seule'spawn'
est disponible. Sous Unix'fork'
et'spawn'
sont disponibles,'fork'
étant celle par défaut.Nouveau dans la version 3.4.
-
multiprocessing.
get_context
(method=None)¶ Renvoie un contexte ayant les mêmes attributs que le module
multiprocessing
.Si method est
None
le contexte par défaut est renvoyé. Sinon method doit valoir'fork'
,'spawn'
ou'forkserver'
. UneValueError
est levée si la méthode de démarrage spécifiée n’est pas disponible.Nouveau dans la version 3.4.
-
multiprocessing.
get_start_method
(allow_none=False)¶ Renvoie le nom de la méthode de démarrage utilisée pour démarrer le processus.
Si le nom de la méthode n’a pas été fixé et que allow_none est faux, alors la méthode de démarrage est réglée à celle par défaut et son nom est renvoyé. Si la méthode n’a pas été fixée et que allow_none est vrai,
None
est renvoyé.La valeur de retour peut être
'fork'
,'spawn'
,'forkserver'
ouNone
.'fork'
est la valeur par défaut sous Unix,'spawn'
est celle sous Windows.Nouveau dans la version 3.4.
-
multiprocessing.
set_executable
()¶ Définit le chemin de l’interpréteur Python à utiliser pour démarrer un processus fils. (Par défaut
sys.executable
est utilisé). Les intégrateurs devront probablement faire quelque chose commeset_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
avant de pouvoir créer des processus fils.
Modifié dans la version 3.4: Maintenant supporté sous Unix quand la méthode de démarrage
'spawn'
est utilisée.
-
multiprocessing.
set_start_method
(method)¶ Règle la méthode qui doit être utilisée pour démarrer un processus fils. method peut être
'fork'
,'spawn'
ou'forkserver'
.Notez que cette fonction ne devrait être appelée qu’une fois au plus, et l’appel devrait être protégé à l’intérieur d’une clause
if __name__ == '__main__'
dans le module principal.Nouveau dans la version 3.4.
Note
multiprocessing
ne contient pas d’analogues à threading.active_count()
, threading.enumerate()
, threading.settrace()
, threading.setprofile()
, threading.Timer
, ou threading.local
.
17.2.2.4. Objets de connexions¶
Les objets de connexion autorisent l’envoi et la réception d’objets sérialisables ou de chaînes de caractères. Ils peuvent être vus comme des interfaces de connexion (sockets) connectées orientées messages.
Les objets de connexion sont habituellement créés via Pipe
– voir aussi Auditeurs et Clients.
-
class
multiprocessing.connection.
Connection
¶ -
send
(obj)¶ Envoie un objet sur l’autre bout de la connexion, qui devra être lu avec
recv()
.The object must be picklable. Very large pickles (approximately 32 MB+, though it depends on the OS) may raise a
ValueError
exception.
-
recv
()¶ Renvoie un objet envoyé depuis l’autre bout de la connexion en utilisant
send()
. Bloque jusqu’à ce que quelque chose soit reçu. Lève uneEOFError
s’il n’y a plus rien à recevoir et que l’autre bout a été fermé.
-
fileno
()¶ Renvoie le descripteur de fichier ou identifiant utilisé par la connexion.
-
close
()¶ Ferme la connexion.
Elle est appelée automatiquement quand la connexion est collectée par le ramasse-miettes.
-
poll
([timeout])¶ Renvoie vrai ou faux selon si des données sont disponibles à la lecture.
Si timeout n’est pas spécifié la méthode renverra immédiatement. Si timeout est un nombre alors il spécifie le temps maximum de blocage en secondes. Si timeout est
None
, un temps d’attente infini est utilisé.Notez que plusieurs objets de connexions peuvent être attendus en même temps à l’aide de
multiprocessing.connection.wait()
.
-
send_bytes
(buffer[, offset[, size]])¶ Envoie des données binaires depuis un bytes-like object comme un message complet.
If offset is given then data is read from that position in buffer. If size is given then that many bytes will be read from buffer. Very large buffers (approximately 32 MB+, though it depends on the OS) may raise a
ValueError
exception
-
recv_bytes
([maxlength])¶ Renvoie un message complet de données binaires envoyées depuis l’autre bout de la connexion comme une chaîne de caractères. Bloque jusqu’à ce qu’il y ait quelque chose à recevoir. Lève une
EOFError
s’il ne reste rien à recevoir et que l’autre côté de la connexion a été fermé.Si maxlength est précisé que que le message est plus long que maxlength alors une
OSError
est levée et la connexion n’est plus lisible.
-
recv_bytes_into
(buffer[, offset])¶ Lit et stocke dans buffer un message complet de données binaires envoyées depuis l’autre bout de la connexion et renvoie le nombre d’octets du message. Bloque jusqu’à ce qu’il y ait quelque chose à recevoir. Lève une
EOFError
s’il ne reste rien à recevoir et que l’autre côté de la connexion a été fermé.buffer doit être un bytes-like object accessible en écriture. Si offset est donné, le message sera écrit dans le tampon à partir de cette position. offset doit être un entier positif, inférieur à la taille de buffer (en octets).
Si le tampon est trop petit une exception
BufferTooShort
est levée et le message complet est accessible viae.args[0]
oùe
est l’instance de l’exception.
Modifié dans la version 3.3: Les objets de connexions eux-mêmes peuvent maintenant être transférés entre les processus en utilisant
Connection.send()
etConnection.recv()
.Nouveau dans la version 3.3: Les objets de connexions supportent maintenant le protocole des gestionnaires de contexte – voir Le type gestionnaire de contexte.
__enter__()
renvoie l’objet de connexion, et__exit__()
appelleclose()
.-
Par exemple :
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Avertissement
La méthode Connection.recv()
désérialise automatiquement les données qu’elle reçoit, ce qui peut être un risque de sécurité à moins que vous ne fassiez réellement confiance au processus émetteur du message.
Par conséquent, à moins que l’objet de connexion soit instancié par Pipe()
, vous ne devriez uniquement utiliser les méthodes recv()
et send()
après avoir effectué une quelconque forme d’authentification. Voir Clés d’authentification.
Avertissement
Si un processus est tué pendant qu’il essaye de lire ou écrire sur le tube, alors les données du tube ont des chances d’être corrompues, parce qu’il devient impossible d’être sûr d’où se trouvent les bornes du message.
17.2.2.5. Primitives de synchronisation¶
Généralement les primitives de synchronisation ne sont pas nécessaire dans un programme multi-processus comme elles le sont dans un programme multi-fils d’exécution. Voir la documentation du module threading
.
Notez que vous pouvez aussi créer des primitives de synchronisation en utilisant un objet gestionnaire – voir Gestionnaires.
-
class
multiprocessing.
Barrier
(parties[, action[, timeout]])¶ Un objet barrière : un clone de
threading.Barrier
.Nouveau dans la version 3.3.
-
class
multiprocessing.
BoundedSemaphore
([value])¶ Un objet sémaphore lié : un analogue proche de
threading.BoundedSemaphore
.Une seule différence existe avec son proche analogue : le premier argument de sa méthode
acquire
est appelé block, pour la cohérence avecLock.acquire()
.Note
Sur Mac OS X, elle n’est pas distinguable de la classe
Semaphore
parce quesem_getvalue()
n’est pas implémentée sur cette plateforme.
-
class
multiprocessing.
Condition
([lock])¶ Une variable conditionnelle : un alias pour
threading.Condition
.Si lock est spécifié il doit être un objet
Lock
ouRLock
du modulemultiprocessing
.Modifié dans la version 3.3: La méthode
wait_for()
a été ajoutée.
-
class
multiprocessing.
Event
¶ Un clone de
threading.Event
.
-
class
multiprocessing.
Lock
¶ Un verrou non récursif : un analogue proche de
threading.Lock
. Une fois que le processus ou le fil d’exécution a acquis un verrou, les tentatives suivantes d’acquisition depuis n’importe quel processus ou fil d’exécution bloqueront jusqu’à ce qu’il soit libéré ; n’importe quel processus ou fil peut le libérer. Les concepts et comportements dethreading.Lock
qui s’appliquent aux fils d’exécution sont répliqués ici dansmultiprocessing.Lock
et s’appliquent aux processus et aux fils d’exécution, à l’exception de ce qui est indiqué.Notez que
Lock
est en fait une fonction factory qui renvoie une instance demultiprocessing.synchronize.Lock
initialisée avec un contexte par défaut.Lock
supporte le protocole context manager et peut ainsi être utilisé avec une instructionwith
.-
acquire
(block=True, timeout=None)¶ Acquiert un verrou, bloquant ou non bloquant.
Avec l’argument block à
True
(par défaut), l’appel de méthode bloquera jusqu’à ce que le verrou soit dans déverrouillé, puis le verrouillera avant de renvoyerTrue
. Notez que le nom de ce premier argument diffère de celui dethreading.Lock.acquire()
.Avec l’argument block à
False
, l’appel de méthode ne bloque pas. Si le verrou est actuellement verrouillé, renvoieFalse
; autrement verrouille le verrou et renvoieTrue
.Quand invoqué avec un nombre flottant positif comme timeout, bloque au maximum pendant ce nombre spécifié de secondes, tant que le verrou ne peut être acquis. Les invocations avec une valeur de timeout négatives sont équivalents à zéro. Les invocations avec un timeout à
None
(par défaut) correspondent à un délai d’attente infini. Notez que le traitement des valeurs de timeout négatives etNone
diffère du comportement implémenté dansthreading.Lock.acquire()
. L’argument timeout n’a pas d’implication pratique si l’argument block est mis )False
et est alors ignoré. RenvoieTrue
si le verrou a été acquis etFalse
si le temps de timeout a expiré.
-
release
()¶ Libère un verrou. Elle peut être appelée depuis n’importe quel processus ou fil d’exécution, pas uniquement le processus ou le fil qui a acquis le verrou à l’origine.
Le comportement est le même que
threading.Lock.release()
excepté que lorsque la méthode est appelée sur un verrou déverrouillé, uneValueError
est levée.
-
-
class
multiprocessing.
RLock
¶ Un objet verrou récursif : un analogue proche de
threading.RLock
. Un verrou récursif doit être libéré par le processus ou le fil d’exécution qui l’a acquis. Quand un processus ou un fil acquiert un verrou récursif, le même processus/fil peut l’acquérir à nouveau sans bloquer ; le processus/fil doit le libérer autant de fois qu’il l’acquiert.Notez que
RLock
est en fait une fonction factory qui renvoie une instance demultiprocessing.synchronize.RLock
initialisée avec un contexte par défaut.RLock
supporte le protocole context manager et peut ainsi être utilisée avec une instructionwith
.-
acquire
(block=True, timeout=None)¶ Acquiert un verrou, bloquant ou non bloquant.
Quand invoqué avec l’argument block à
True
, bloque jusqu’à ce que le verrou soit déverrouillé (n’appartenant à aucun processus ou fil d’exécution) sauf s’il appartient déjà au processus ou fil d’exécution courant. Le processus ou fil d’exécution courant prend la possession du verrou (s’il ne l’a pas déjà) et incrémente d’un le niveau de récursion du verrou, renvoyant ainsiTrue
. Notez qu’il y a plusieurs différences dans le comportement de ce premier argument comparé à l’implémentation dethreading.RLock.acquire()
, à commencer par le nom de l’argument lui-même.Quand invoqué avec l’argument block à
False
, ne bloque pas. Si le verrou est déjà acquis (et possédé) par un autre processus ou fil d’exécution, le processus/fil courant n’en prend pas la possession et le niveau de récursion n’est pas incrémenté, résultant en une valeur de retour àFalse
. Si le verrou est déverrouillé, le processus/fil courant en prend possession et incrémente son niveau de récursion, renvoyantTrue
.L’usage et les comportements de l’argument timeout sont les mêmes que pour
Lock.acquire()
. Notez que certains de ces comportements diffèrent par rapport à ceux implémentés parthreading.RLock.acquire()
.
-
release
()¶ Libère un verrou, décrémentant son niveau de récursion. Si après la décrémentation le niveau de récursion est zéro, réinitialise le verrou à un état déverrouillé (n’appartenant à aucun processus ou fil d’exécution) et si des processus/fils attendent que le verrou se déverrouillé, autorise un seul d’entre-eux à continuer. Si après cette décrémentation le niveau de récursion est toujours strictement positif, le verrou reste verrouillé et propriété du processus/fil appelant.
N’appelez cette méthode que si le processus ou fil d’exécution appelant est propriétaire du verrou. Une
AssertionError
est levée si cette méthode est appelée par un processus/fil autre que le propriétaire ou si le verrou n’est pas verrouillé (possédé). Notez que le type d’exception levé dans cette situation diffère du comportement dethreading.RLock.release()
.
-
-
class
multiprocessing.
Semaphore
([value])¶ Un objet sémaphore, proche analogue de
threading.Semaphore
.Une seule différence existe avec son proche analogue : le premier argument de sa méthode
acquire
est appelé block, pour la cohérence avecLock.acquire()
.
Note
Sous Mac OS X, sem_timedwait
n’est pas supporté, donc appeler acquire()
avec un temps d’exécution limité émulera le comportement de cette fonction en utilisant une boucle d’attente.
Note
Si le signal SIGINT généré par un Ctrl-C survient pendant que le fil d’exécution principal est bloqué par un appel à BoundedSemaphore.acquire()
, Lock.acquire()
, RLock.acquire()
, Semaphore.acquire()
, Condition.acquire()
ou Condition.wait()
, l’appel sera immédiatement interrompu et une KeyboardInterrupt
sera levée.
Cela diffère du comportement de threading
où le SIGINT est ignoré tant que les appels bloquants sont en cours.
Note
Certaines des fonctionnalités de ce paquet requièrent une implémentation fonctionnelle de sémaphores partagés sur le système hôte. Sans cela, le module multiprocessing.synchronize
sera désactivé, et les tentatives de l’importer lèveront une ImportError
. Voir bpo-3770 pour plus d’informations.
17.2.2.7. Gestionnaires¶
Les gestionnaires fournissent un moyen de créer des données qui peuvent être partagées entre les différents processus, incluant le partage à travers le réseau entre des processus tournant sur des machines différentes. Un objet gestionnaire contrôle un processus serveur qui gère les shared objects. Les autres processus peuvent accéder aux objets partagés à l’aide de mandataires.
Renvoie un objet
SyncManager
démarré qui peut être utilisé pour partager des objets entre les processus. L’objet gestionnaire renvoyé correspond à un processus enfant instancié et possède des méthodes pour créer des objets partagés et renvoyer les mandataires correspondants.
Les processus gestionnaires seront arrêtés dès qu’ils seront collectés par le ramasse-miettes ou que leur processus parent se terminera. Les classes gestionnaires sont définies dans le module multiprocessing.managers
:
-
class
multiprocessing.managers.
BaseManager
([address[, authkey]])¶ Crée un objet BaseManager.
Une fois créé il faut appeler
start()
ouget_server().serve_forever()
pour assurer que l’objet gestionnaire référence un processus gestionnaire démarré.address est l’adresse sur laquelle le processus gestionnaire écoute pour de nouvelles connexions. Si address est
None
, une adresse arbitraire est choisie.authkey est la clé d’authentification qui sera utilisée pour vérifier la validité des connexions entrantes sur le processus serveur. Si authkey est
None
alorscurrent_process().authkey
est utilisée. Autrement authkey est utilisée et doit être une chaîne d’octets.-
start
([initializer[, initargs]])¶ Démarre un sous-processus pour démarrer le gestionnaire. Si initializer n’est pas
None
alors le sous-processus appellerainitializer(*initargs)
quand il démarrera.
-
get_server
()¶ Renvoie un objet
Server
qui représente le serveur sous le contrôle du gestionnaire. L’objetServer
supporte la méthodeserve_forever()
:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
possède en plus un attributaddress
.
-
connect
()¶ Connecte un objet gestionnaire local au processus gestionnaire distant :
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 5000), authkey=b'abc') >>> m.connect()
-
shutdown
()¶ Stoppe le processus utilisé par le gestionnaire. Cela est disponible uniquement si
start()
a été utilisée pour démarrer le processus serveur.Cette méthode peut être appelée plusieurs fois.
-
register
(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶ Une méthode de classe qui peut être utilisée pour enregistrer un type ou un appelable avec la classe gestionnaire.
typeif est un « type identifier » qui est utilisé pour identifier un type particulier d’objet partagé. Cela doit être une chaîne de caractères.
callable est un objet appelable utilisé pour créer les objets avec cet identifiant de type. Si une instance de gestionnaire prévoit de se connecter au serveur en utilisant sa méthode
connect()
ou si l’argument create_method vautFalse
alors cet argument peut être laissé àNone
.proxytype est une sous-classe de
BaseProxy
utilisée pour créer des mandataires autour des objets partagés avec ce typeid. S’il estNone
, une classe mandataire sera créée automatiquement.exposed est utilisé pour préciser une séquence de noms de méthodes dont les mandataires pour ce typeid doivent être autorisés à accéder via
BaseProxy._callmethod()
. (Si exposed estNone
alorsproxytype._exposed_
est utilisé à la place s’il existe.) Dans le cas où aucune liste exposed n’est précisée, toutes les « méthodes publiques » de l’objet partagé seront accessibles. (Ici une « méthode publique » signifie n’importe quel attribut qui possède une méthode__call__()
et dont le nom ne commence pas par un'_'
.)method_to_typeid est un tableau associatif utilisé pour préciser le type de retour de ces méthodes exposées qui doivent renvoyer un mandataire. Il associé un nom de méthode à une chaîne de caractères typeid. (Si method_to_typeid est
None
,proxytype._method_to_typeid_
est utilisé à la place s’il existe). Si le nom d’une méthode n’est pas une clé de ce tableau associatif ou si la valeur associée estNone
, l’objet renvoyé par la méthode sera une copie de la valeur.create_method détermine si une méthode devrait être créée avec le nom typeid, qui peut être utilisée pour indiquer au processus serveur de créer un nouvel objet partagé et d’en renvoyer un mandataire. a valeur par défaut est
True
.
Les instances de
BaseManager
ont aussi une propriété en lecture seule :-
address
¶ L’adresse utilisée par le gestionnaire.
Modifié dans la version 3.3: Les objets gestionnaires supportent le protocole des gestionnaires de contexte – voir Le type gestionnaire de contexte.
__enter__()
démarre le processus serveur (s’il n’a pas déjà été démarré) et renvoie l’objet gestionnaire.__exit__()
appelleshutdown()
.Dans les versions précédentes
__enter__()
ne démarrait pas le processus serveur du gestionnaire s’il n’était pas déjà démarré.-
-
class
multiprocessing.managers.
SyncManager
¶ Une sous-classe de
BaseManager
qui peut être utilisée pour la synchronisation entre processus. Des objets de ce type sont renvoyés parmultiprocessing.Manager()
.Ces méthodes créent et renvoient des Objets mandataires pour un certain nombre de types de données communément utilisés pour être synchronisés entre les processus. Elles incluent notamment des listes et dictionnaires partagés.
-
Barrier
(parties[, action[, timeout]])¶ Crée un objet
threading.Barrier
partagé et renvoie un mandataire pour cet objet.Nouveau dans la version 3.3.
-
BoundedSemaphore
([value])¶ Crée un objet
threading.BoundedSemaphore
partagé et renvoie un mandataire pour cet objet.
-
Condition
([lock])¶ Crée un objet
threading.Condition
partagé et renvoie un mandataire pour cet objet.Si lock est fourni alors il doit être un mandataire pour un objet
threading.Lock
outhreading.RLock
.Modifié dans la version 3.3: La méthode
wait_for()
a été ajoutée.
-
Event
()¶ Crée un objet
threading.Event
partagé et renvoie un mandataire pour cet objet.
-
Lock
()¶ Crée un objet
threading.Lock
partagé et renvoie un mandataire pour cet objet.
-
Queue
([maxsize])¶ Crée un objet
queue.Queue
partagé et renvoie un mandataire pour cet objet.
-
RLock
()¶ Crée un objet
threading.RLock
partagé et renvoie un mandataire pour cet objet.
-
Semaphore
([value])¶ Crée un objet
threading.Semaphore
partagé et renvoie un mandataire pour cet objet.
-
Array
(typecode, sequence)¶ Crée un tableau et renvoie un mandataire pour cet objet.
-
Value
(typecode, value)¶ Crée un objet avec un attribut
value
accessible en écriture et renvoie un mandataire pour cet objet.
-
dict
()¶ -
dict
(mapping) -
dict
(sequence) Crée un objet
dict
partagé et renvoie un mandataire pour cet objet.
Modifié dans la version 3.6: Les objets partagés peuvent être imbriqués. Par exemple, un conteneur partagé tel qu’une liste partagée peu contenir d’autres objets partagés qui seront aussi gérés et synchronisés par le
SyncManager
.-
-
class
multiprocessing.managers.
Namespace
¶ Un type qui peut être enregistré avec
SyncManager
.Un espace de nommage n’a pas de méthodes publiques, mais possède des attributs accessibles en écriture. Sa représentation montre les valeurs de ses attributs.
Cependant, en utilisant un mandataire pour un espace de nommage, un attribut débutant par
'_'
est un attribut du mandataire et non de l’objet cible :>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
17.2.2.7.1. Gestionnaires personnalisés¶
Pour créer son propre gestionnaire, il faut créer une sous-classe de BaseManager
et utiliser la méthode de classe register()
pour enregistrer de nouveaux types ou callables au gestionnaire. Par exemple :
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
17.2.2.7.2. Utiliser un gestionnaire distant¶
Il est possible de lancer un serveur gestionnaire sur une machine et d’avoir des clients l’utilisant sur d’autres machines (en supposant que les pare-feus impliqués l’autorisent).
Exécuter les commandes suivantes crée un serveur pour une simple queue partagée à laquelle des clients distants peuvent accéder :
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Un client peut accéder au serveur comme suit :
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
Un autre client peut aussi l’utiliser :
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
Les processus locaux peuvent aussi accéder à cette queue, utilisant le code précédent sur le client pour y accéder à distance :
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super(Worker, self).__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
17.2.2.8. Objets mandataires¶
Un mandataire est un objet qui référence un objet partagé appartenant (supposément) à un processus différent. L’objet partagé est appelé le référent du mandataire. Plusieurs mandataires peuvent avoir un même référent.
Un mandataire possède des méthodes qui appellent les méthodes correspondantes du référent (bien que toutes les méthodes du référent ne soient pas nécessairement accessibles à travers le mandataire). De cette manière, un mandataire peut être utilisé comme le serait sont référent :
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Notez qu’appliquer str()
à un mandataire renvoie la représentation du référent, alors que repr()
renvoie celle du mandataire.
Une fonctionnalité importantes des objets mandataires est qu’ils sont sérialisables et peuvent donc être échangés entre les processus. Ainsi, un référent peut contenir des Objets mandataires. Cela permet d’imbriquer des listes et dictionnaires gérés ainsi que d’autres Objets mandataires :
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
De même, les mandataires de listes et dictionnaires peuvent être imbriqués dans d’autres :
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
Si des objets standards (non proxyfiés) list
ou dict
sont contenus dans un référent, les modifications sur ces valeurs mutables ne seront pas propagées à travers le gestionnaire parce que le mandataire n’a aucun moyen de savoir quand les valeurs contenues sont modifiées. Cependant, stocker une valeur dans un conteneur mandataire (qui déclenche un appel à __setitem__
sur le mandataire) propage bien la modification à travers le gestionnaire et modifie effectivement l’élément, il est ainsi possible de réassigner la valeur modifiée au conteneur mandataire :
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
Cette approche est peut-être moins pratique que d’utiliser des Objets mandataires imbriqués pour la majorité des cas d’utilisation, mais démontre aussi un certain niveau de contrôle sur la synchronisation.
Note
Les types de mandataires de multiprocessing
n’implémentent rien pour la comparaison par valeurs. Par exemple, on a :
>>> manager.list([1,2,3]) == [1,2,3]
False
Il faut à la place simplement utiliser une copie du référent pour faire les comparaisons.
-
class
multiprocessing.managers.
BaseProxy
¶ Les objets mandataires sont des instances de sous-classes de
BaseProxy
.-
_callmethod
(methodname[, args[, kwds]])¶ Appelle et renvoie le résultat d’une méthode du référent du mandataire.
Si
proxy
est un mandataire sont le référent estobj
, alors l’expressionproxy._callmethod(methodname, args, kwds)
s’évalue comme
getattr(obj, methodname)(*args, **kwds)
dans le processus du gestionnaire.
La valeur renvoyée sera une copie du résultat de l’appel ou un mandataire sur un nouvel objet partagé – voir l’a documentation de l’argument method_to_typeid de
BaseManager.register()
.Si une exception est levée par l’appel, elle est relayée par
_callmethod()
. Si une autre exception est levée par le processus du gestionnaire, elle est convertie en uneRemoteError
et est levée par_callmethod()
.Notez en particulier qu’une exception est levée si methodname n’est pas exposée.
Un exemple d’utilisation de
_callmethod()
:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
-
_getvalue
()¶ Renvoie une copie du référent.
Si le référent n’est pas sérialisable, une exception est levée.
-
__repr__
()¶ Renvoie la représentation de l’objet mandataire.
-
__str__
()¶ Renvoie la représentation du référent.
-
17.2.2.8.1. Nettoyage¶
Un mandataire utilise un callback sous une référence faible de façon à ce que quand il est collecté par le ramasse-miettes, il se désenregistre auprès du gestionnaire qui possède le référent.
Un objet partagé est supprimé par le processus gestionnaire quand plus aucun mandataire ne le référence.
17.2.2.9. Bassins de processus¶
On peut créer un bassin de processus qui exécuteront les tâches qui lui seront soumises avec la classe Pool
.
-
class
multiprocessing.pool.
Pool
([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶ Un objet process pool qui contrôle un bassin de processus workers auquel sont soumises des tâches. Il supporte les résultats asynchrones avec des timeouts et des callabacks et possède une implémentation parallèle de map.
processes est le nombre de processus workers à utiliser. Si processes est
None
, le nombre renvoyé paros.cpu_count()
est utilisé.Si initializer n’est pas
None
, chaque processus worker appellerainitializer(*initargs)
en démarrant.maxtasksperchild est le nombre de tâches qu’un processus worker peut accomplir avant de se fermer et d’être remplacé par un worker frais, pour permettre aux ressources inutilisées d’être libérées. Par défaut maxtasksperchild est
None
, ce qui signifie que le worker vit aussi longtemps que le bassin.context peut être utilisé pour préciser le contexte utilisé pour démarrer les processus workers. Habituellement un bassin est créé à l’aide de la fonction
multiprocessing.Pool()
ou de la méthodePool()
d’un objet de contexte. Dans les deux cas context est réglé de façon appropriée.Notez que les méthodes de l’objet pool ne doivent être appelées que par le processus qui l’a créé.
Nouveau dans la version 3.2: maxtasksperchild
Nouveau dans la version 3.4: context
Note
Les processus workers à l’intérieur d’une
Pool
vivent par défaut aussi longtemps que la queue de travail du bassin. Un modèle fréquent chez d’autres systèmes (tels qu’Apache, mod_wsgi, etc.) pour libérer les ressources détenues par les workers est d’autoriser un worker dans le bassin à accomplir seulement une certaine charge de travail avant de se fermer, se retrouvant nettoyé et remplacé par un nouvelle processus fraîchement lancé. L’argument maxtasksperchild dePool
expose cette fonctionnalité à l’utilisateur final.-
apply
(func[, args[, kwds]])¶ Appelle func avec les arguments args et les arguments nommés kwds. Bloque jusqu’à ce que que le résultat soit prêt. En raison de ce blocage,
apply_async()
est préférable pour exécuter du travail en parallèle. De plus, func est exécutée sur un seul des workers du bassin.
-
apply_async
(func[, args[, kwds[, callback[, error_callback]]]])¶ Une variante de la méthode
apply()
qui renvoie un objet résultat.Si callback est précisé alors ce doit être un objet appelable qui accepte un seul argument. Quand le résultat est prêt, callback est appelé avec ce résultat, si l’appel n’échoue pas auquel cas error_callback est appelé à la place.
Si error_callback est précisé alors ce doit être un objet appelable qui accepte un seul argument. Si la fonction cible échoue, alors error_callback est appelé avec l’instance de l’exception.
Les callbacks doivent se terminer immédiatement, autrement le fil d’exécution qui gère les résultats se retrouverait bloqué.
-
map
(func, iterable[, chunksize])¶ Un équivalent parallèle à la fonction built-in
map()
(qui ne supporte cependant qu’un itérable en argument). Elle bloque jusqu’à ce que le résultat soit prêt.La méthode découpe l’itérable en un nombre de morceaux qu’elle envoie au bassin de processus comme des tâches séparées. La taille (approximative) de ces morceaux peut être précisée en passant à chunksize un entier positif.
-
map_async
(func, iterable[, chunksize[, callback[, error_callback]]])¶ Une variante de la méthode
map()
qui renvoie un objet résultat.Si callback est précisé alors ce doit être un objet appelable qui accepte un seul argument. Quand le résultat est prêt, callback est appelé avec ce résultat, si l’appel n’échoue pas auquel cas error_callback est appelé à la place.
Si error_callback est précisé alors ce doit être un objet appelable qui accepte un seul argument. Si la fonction cible échoue, alors error_callback est appelé avec l’instance de l’exception.
Les callbacks doivent se terminer immédiatement, autrement le fil d’exécution qui gère les résultats se retrouverait bloqué.
-
imap
(func, iterable[, chunksize])¶ A lazier version of
map()
.L’argument chunksize est le même que celui utilisé par la méthode
map()
. Pour de très longs itérables, utiliser une grande valeur pour chunksize peut faire s’exécuter la tâche beaucoup plus rapidement qu’en utilisant la valeur par défaut de1
.Aussi, si chucksize vaut
1
alors la méthodenext()
de l’itérateur renvoyé parimap()
prend un paramètre optionnel timeout :next(timeout)
lève unemultiprocessing.TimeoutError
si le résultat ne peut pas être renvoyé avant timeout secondes.
-
imap_unordered
(func, iterable[, chunksize])¶ Identique à
imap()
si ce n’est que l’ordre des résultats de l’itérateur renvoyé doit être considéré comme arbitraire. (L’ordre n’est garanti que quand il n’y a qu’un worker.)
-
starmap
(func, iterable[, chunksize])¶ Semblable à
map()
à l’exception que les éléments d”iterable doivent être des itérables qui seront dépaquetés comme arguments pour la fonction.Par conséquent un iterable
[(1,2), (3, 4)]
donnera pour résultat[func(1,2), func(3,4)]
.Nouveau dans la version 3.3.
-
starmap_async
(func, iterable[, chunksize[, callback[, error_callback]]])¶ Une combinaison de
starmap()
etmap_async()
qui itère sur iterable (composé d’itérables) et appelle func pour chaque itérable dépaqueté. Renvoie l’objet résultat.Nouveau dans la version 3.3.
-
close
()¶ Empêche de nouvelles tâches d’être envoyées à la pool. Les processus workers se terminent une fois que toutes les tâches ont été complétées.
-
terminate
()¶ Stoppe immédiatement les processus workers sans finaliser les travaux courants. Quand l’objet pool est collecté par le ramasse-miettes, sa méthode
terminate()
est appelée immédiatement.
-
join
()¶ Attend que les processus workers se terminent. Il est nécessaire d’appeler
close()
outerminate()
avant d’utiliserjoin()
.
Nouveau dans la version 3.3: Les bassins de workers supportent maintenant le protocole des gestionnaires de contexte – voir Le type gestionnaire de contexte.
__enter__()
renvoie l’objet pool et__exit__()
appelleterminate()
.-
-
class
multiprocessing.pool.
AsyncResult
¶ La classe des résultats renvoyés par
Pool.apply_async()
etPool.map_async()
.-
get
([timeout])¶ Renvoie le résultat quand il arrive. Si timeout n’est pas
None
et que le résultat n’arrive pas avant timeout secondes, unemultiprocessing.TimeoutError
est levée. Si l’appel distance lève une exception, alors elle est relayée parget()
.
-
wait
([timeout])¶ Attend que le résultat soit disponible ou que timeout secondes s’écoulent.
-
ready
()¶ Renvoie
True
ouFalse
suivant si la tâche est accomplie.
-
successful
()¶ Renvoie
True
ouFalse
suivant si la tâche est accomplie sans lever d’exception. Lève uneAssertionError
si le résultat n’est pas prêt.
-
Les exemples suivants présentent l’utilisation d’un bassin de workers :
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
17.2.2.10. Auditeurs et Clients¶
Habituellement l’échange de messages entre processus est réalisé en utilisant des queues ou des objets Connection
renvoyés par Pipe()
.
Cependant, le module multiprocessing.connection
permet un peu plus de flexibilité. Il fournit un message de plus haut-niveau orienté API pour gérer des connecteurs ou des tubes nommés sous Windows. Il gère aussi l’authentification par condensat (digest authentication en anglais) en utilisant le module hmac
, et pour interroger de multiples connexions en même temps.
-
multiprocessing.connection.
deliver_challenge
(connection, authkey)¶ Envoie un message généré aléatoirement à l’autre bout de la connexion et attend une réponse.
Si la réponse correspond au condensat du message avec la clé authkey, alors un message de bienvenue est envoyé à l’autre bout de la connexion. Autrement, une
AuthenticationError
est levée.
-
multiprocessing.connection.
answer_challenge
(connection, authkey)¶ Reçoit un message, calcule le condensat du message en utilisant la clé authkey, et envoie le condensat en réponse.
Si un message de bienvenue n’est pas reçu, une
AuthenticationError
est levée.
-
multiprocessing.connection.
Client
(address[, family[, authkey]])¶ Essaie d’établir une connexion avec l’auditeur qui utilise l’adresse address, renvoie une
Connection
.Le type de la connexion est déterminé par l’argument family, mais il peut généralement être omis puisqu’il peut être inféré depuis le format d”address. (Voir Formats d’adresses)
Si authkey est passée et n’est pas
None
, elle doit être une chaîne d’octets et sera utilisée comme clé secrète pour le défi d’authentification basé sur HMAC. Aucune authentification n’est réalisée si authkey estNone
. UneAuthenticationError
est levée si l’authentification échoue. Voir Clés d’authentification.
-
class
multiprocessing.connection.
Listener
([address[, family[, backlog[, authkey]]]])¶ Une enveloppe autour d’un connecteur lié ou un tube nommé sous Windows qui écoute pour des connexions.
address est l’adresse à utiliser par le connecteur lié ou le tube nommé de l’objet auditeur.
Note
Si une adresse “0.0.0.0” est utilisée, l’adresse ne sera pas un point d’accès connectable sous Windows. Si vous avez besoin d’un point d’accès connectable, utilisez “127.0.0.1”.
family est le type de connecteur (ou tube nommé) à utiliser. Cela peut être l’une des chaînes
'AF_INET'
(pour un connecteur TCP),'AF_UNIX'
(pour un connecteur Unix) ou'AF_PIPE'
(pour un tube nommé sous Windows). Seulement le premier d’entre eux est garanti d’être disponible. Si family estNone
, la famille est inférée depuis le format d”address. Si address est aussiNone
, la famille par défaut est utilisée. La famille par défaut est supposée être la plus rapide disponible. Voir Formats d’adresses. Notez que si la family est'AF_UNIX'
et qu”address estNone
, le connecteur est créé dans un répertoire temporaire privé créé avectempfile.mkstemp()
.Si l’objet auditeur utilise un connecteur alors backlog (1 par défaut) est passé à la méthode
listen()
du connecteur une fois qu’il a été lié.Si authkey est passée et n’est pas
None
, elle doit être une chaîne d’octets et sera utilisée comme clé secrète pour le défi d’authentification basé sur HMAC. Aucune authentification n’est réalisée si authkey estNone
. UneAuthenticationError
est levée si l’authentification échoue. Voir Clés d’authentification.-
accept
()¶ Accepte une connexion sur le connecteur lié ou le tube nommé de l’objet auditeur et renvoie un objet
Connection
. Si la tentative d’authentification échoue, uneAuthenticationError
est levée.
-
close
()¶ Ferme le connecteur lié ou le tube nommé de l’objet auditeur. La méthode est appelée automatiquement quand l’auditeur est collecté par le ramasse-miettes. Il est cependant conseillé de l’appeler explicitement.
Les objets auditeurs ont aussi les propriétés en lecture seule suivantes :
-
address
¶ L’adresse utilisée par l’objet auditeur.
-
last_accepted
¶ L’adresse depuis laquelle a été établie la dernière connexion.
None
si aucune n’est disponible.
Nouveau dans la version 3.3: Les objets auditeurs supportent maintenant le protocole des gestionnaires de contexte – voir Le type gestionnaire de contexte.
__enter__()
renvoie l’objet auditeur, et__exit__()
appelleclose()
.-
-
multiprocessing.connection.
wait
(object_list, timeout=None)¶ Attend qu’un objet d”object_list soit prêt. Renvoie la liste de ces objets d”object_list qui sont prêts. Si timeout est un nombre flottant, l’appel bloquera au maximum ce nombre de secondes. Si timeout est
None
, l’appelle bloquera pour une durée non limitée. Un timeout négatif est équivalent à un timeout nul.Pour Unix et Windows, un objet peut apparaître dans object_list s’il est
un objet
Connection
accessible en lecture ;un objet
socket.socket
connecté et accessible en lecture ; ou
Une connexion (socket en anglais) est prête quand il y a des données disponibles en lecture dessus, ou que l’autre bout a été fermé.
Unix:
wait(object_list, timeout)
est en grande partie équivalente àselect.select(object_list, [], [], timeout)
. La différence est que, siselect.select()
est interrompue par un signal, elle peut lever uneOSError
avec un numéro d’erreurEINTR
, alors quewait()
ne le fera pas.Windows : un élément d”object_list doit être soit un identifiant waitable (en accord avec la définition utilisée par la documentation de la fonction Win32
WaitForMultipleObjects()
), soit un objet avec une méthodefileno()
qui renvoie un identifiant de connecteur ou de tube (notez que les identifiants de tubes et de connecteurs ne sont pas des identifiants waitables).Nouveau dans la version 3.3.
Exemples
Le code serveur suivant crée un auditeur qui utilise 'secret password'
comme clé d’authentification. Il attend ensuite une connexion et envoie les données au client :
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
Le code suivant se connecte au serveur et en reçoit des données :
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
Le code suivant utilise wait()
pour attendre des messages depuis plusieurs processus à la fois :
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
17.2.2.10.1. Formats d’adresses¶
Une adresse
'AF_INET'
est un tuple de la forme(hostname, port)
où hostname est une chaîne et port un entier.Une adresse
'AF_UNIX'
est une chaîne représentant un nom de fichier sur le système de fichiers.- Une adresse
'AF_PIPE'
est une chaîne de la forme r'\.\pipe{PipeName}'
. Pour utiliser unClient()
pour se connecter à un tube nommé sur une machine distante appelée ServerName, il faut plutôt utiliser une adresse de la former'\ServerName\pipe{PipeName}'
.
- Une adresse
Notez que toute chaîne commençant par deux antislashs est considérée par défaut comme l’adresse d’un 'AF_PIPE'
plutôt qu’une adresse 'AF_UNIX'
.
17.2.2.11. Clés d’authentification¶
Quand Connection.recv
est utilisée, les données reçues sont automatiquement désérialisées par pickle. Malheureusement désérialiser des données depuis une source non sûre constitue un risque de sécurité. Par conséquent Listener
et Client()
utilisent le module hmac
pour fournir une authentification par condensat.
Une clé d’authentification est une chaîne d’octets qui peut être vue comme un mot de passe : quand une connexion est établie, les deux interlocuteurs vont demander à l’autre une preuve qu’il connaît la clé d’authentification. (Démontrer que les deux utilisent la même clé n’implique pas d’échanger la clé sur la connexion.)
Si l’authentification est requise et qu’aucune clé n’est spécifiée alors la valeur de retour de current_process().authkey
est utilisée (voir Process
). Celle valeur est automatiquement héritée par tout objet Process
créé par le processus courant. Cela signifie que (par défaut) tous les processus d’un programme multi-processus partageront une clé d’authentification unique qui peut être utilisée pour mettre en place des connexions entre-eux.
Des clés d’authentification adaptées peuvent aussi être générées par os.urandom()
.
17.2.2.12. Journalisation¶
Un certain support de la journalisation est disponible. Notez cependant que le le paquet logging
n’utilise pas de verrous partagés entre les processus et il est donc possible (dépendant du type de gestionnaire) que les messages de différents processus soient mélangés.
-
multiprocessing.
get_logger
()¶ Renvoie le journaliseur utilisé par
multiprocessing
. Si nécessaire, un nouveau sera créé.À sa première création le journaliseur a pour niveau
logging.NOTSET
et pas de gestionnaire par défaut. Les messages envoyés à ce journaliseur ne seront pas propagés par défaut au journaliseur principal.Notez que sous Windows les processus fils n’hériteront que du niveau du journaliseur du processus parent – toute autre personnalisation du journaliseur ne sera pas héritée.
-
multiprocessing.
log_to_stderr
()¶ Cette fonction effectue un appel à
get_logger()
mais en plus de renvoyer le journaliseur créé par get_logger, elle ajoute un gestionnaire qui envoie la sortie sursys.stderr
en utilisant le format'[%(levelname)s/%(processName)s] %(message)s'
.
L’exemple ci-dessous présente une session avec la journalisation activée :
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
Pour un tableau complet des niveaux de journalisation, voir le module logging
.
17.2.2.13. Le module multiprocessing.dummy
¶
multiprocessing.dummy
réplique toute l’API de multiprocessing
mais n’est rien de plus qu’un wrapper autour du module threading
.
17.2.3. Lignes directrices de programmation¶
Il y a certaines lignes directrices et idiomes auxquels il faut adhérer en utilisant multiprocessing
.
17.2.3.1. Toutes les méthodes de démarrage¶
Les règles suivantes s’appliquent aux méthodes de démarrage.
Éviter les états partagés
Autant que possible, vous devriez éviter de déplacer de larges données entre les processus.
Il est probablement meilleur de s’en tenir à l’utilisation de queues et tubes pour la communication entre processus plutôt que d’utiliser des primitives de synchronisation plus bas-niveau.
Sérialisation
Assurez-vous que les arguments passés aux méthodes des mandataires soient sérialisables (pickables).
Sûreté des mandataires à travers les fils d’exécution
N’utilisez pas d’objet mandataire depuis plus d’un fil d’exécution à moins que vous ne le protégiez avec un verrou.
(Il n’y a jamais de problème avec plusieurs processus utilisant un même mandataire.)
Attendre les processus zombies
Sous Unix quand un processus se termine mais n’est pas attendu, il devient un zombie. Il ne devrait jamais y en avoir beaucoup parce que chaque fois qu’un nouveau processus démarre (ou que
active_children()
est appelée) tous les processus complétés qui n’ont pas été attendus le seront. Aussi appeler la méthodeProcess.is_alive
d’un processus terminé attendra le processus. Toutefois il est probablement une bonne pratique d’attendre explicitement tous les processus que vous démarrez.
Préférez hériter que sérialiser/désérialiser
Quand vous utilisez les méthodes de démarrage spawn ou forkserver, de nombreux types de
multiprocessing
nécessitent d’être sérialisées pour que les processus enfants puissent les utiliser. Cependant, il faut généralement éviter d’envoyer des objets partagés aux autres processus en utilisant des tubes ou des queues. Vous devriez plutôt vous arranger pour qu’un processus qui nécessite l’accès à une ressource partagée créée autre part qu’il en hérite depuis un de ses processus ancêtres.
Éviter de terminer les processus
Utiliser la méthode
Process.terminate
pour stopper un processus risque de casser ou de rendre indisponible aux autres processus des ressources partagées (comme des verrous, sémaphores, tubes et queues) actuellement utilisée par le processus.Il est donc probablement préférable de n’utiliser
Process.terminate
que sur les processus qui n’utilisent jamais de ressources partagées.
Attendre les processus qui utilisent des queues
Gardez à l’esprit qu’un processus qui a placé des éléments dans une queue attendra que tous les éléments mis en tampon soient consommés par le fil d’exécution « consommateur » du tube sous-jacent avant de se terminer. (Le processus enfant peut appeler la méthode
Queue.cancel_join_thread
de la queue pour éviter ce comportement.)Cela signifie que chaque fois que vous utilisez une queue vous devez vous assurer que tous les éléments qui y ont été placés seront effectivement supprimés avant que le processus ne soit attendu. Autrement vous ne pouvez pas être sûr que les processus qui ont placé des éléments dans la queue se termineront. Souvenez-vous aussi que tous les processus non daemons seront attendus automatiquement.
L’exemple suivant provoquera un interblocage :
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()Une solution ici serait d’intervertir les deux dernières lignes (ou simplement supprimer la ligne
p.join()
).
Passer explicitement les ressources aux processus fils
Sous Unix en utilisant la méthode de démarrage fork, un processus fils peut utiliser une ressource partagée créée par un processus parent en utilisant une ressource globale. Cependant, il est préférable de passer l’objet en argument au constructeur du processus fils.
En plus de rendre le code (potentiellement) compatible avec Windows et les autres méthodes de démarrage, cela assure aussi que tant que le processus fils est en vie, l’objet ne sera pas collecté par le ramasse-miettes du processus parent. Cela peut être important si certaines ressources sont libérées quand l’objet est collecté par le ramasse-miettes du processus parent.
Donc par exemple
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()devrait être réécrit comme
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
Faire attention à remplacer sys.stdin
par un objet « file-like »
À l’origine,
multiprocessing
appelait inconditionnellement :os.close(sys.stdin.fileno())dans la méthode
multiprocessing.Process._bootstrap()
— cela provoquait des problèmes avec les processus imbriqués. Cela peut être changé en :sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)Qui résout le problème fondamental des collisions entre processus provoquant des erreurs de mauvais descripteurs de fichiers, mais introduit un potentiel danger pour les applications qui remplacent
sys.stdin()
avec un « file-like object » ayant une sortie bufferisée. Ce danger est que si plusieurs processus appellentclose()
sur cet objet file-like, cela peut amener les données à être transmises à l’objet à plusieurs reprises, résultant en une corruption.Si vous écrivez un objet file-like et implémentez votre propre cache, vous pouvez le rendre sûr pour les forks en stockant le pid chaque fois que vous ajoutez des données au cache, et annulez le cache quand le pip change. Par exemple :
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cachePour plus d’informations, voir bpo-5155, bpo-5313 et bpo-5331
17.2.3.2. Les méthodes de démarrage spawn et forkserver¶
Certaines restrictions ne s’appliquent pas à la méthode de démarrage fork.
Plus de sérialisation
Assurez-vous que tous les argument de
Process.__init__()
sont sérialisables avec pickle. Aussi, si vous héritez deProcess
, assurez-vous que toutes les instances sont sérialisables quand la méthodeProcess.start
est appelée.
Variables globales
Gardez en tête que si le code exécuté dans un processus fils essaie d’accéder à une variable globale, alors la valeur qu’il voit (s’il y en a une) pourrait ne pas être la même que la valeur du processus parent au moment même où
Process.start
est appelée.Cependant, les variables globales qui sont juste des constantes de modules ne posent pas de problèmes.
Importation sûre du module principal
Assurez-vous que le module principal peut être importé en toute sécurité par un nouvel interpréteur Python sans causer d’effets de bord inattendus (comme le démarrage d’un nouveau processus).
Par exemple, utiliser la méthode de démarrage spawn ou forkserver pour lancer le module suivant échouerait avec une
RuntimeError
:from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()Vous devriez plutôt protéger le « point d’entrée » du programme en utilisant
if __name__ == '__main__':
comme suit :from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(La ligne
freeze_support()
peut être omise si le programme est uniquement lancé normalement et pas gelé.)Cela permet aux interpréteurs Python fraîchement instanciés d’importer en toute sécurité le module et d’exécution ensuite la fonction
foo()
.Des restrictions similaires s’appliquent si une pool ou un gestionnaire est créé dans le module principal.
17.2.4. Exemples¶
Démonstration de comment créer et utiliser des gestionnaires et mandataires personnalisés :
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
En utilisant Pool
:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
Un exemple montrant comment utiliser des queues pour alimenter en tâches une collection de processus workers et collecter les résultats :
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()