16.6. multiprocessing
— Process-based « threading » interface¶
Nouveau dans la version 2.6.
16.6.1. Introduction¶
multiprocessing
est un paquet qui permet l’instanciation de processus via la même API que le module threading
. Le paquet multiprocessing
offre à la fois des possibilités de programmation concurrente locale ou à distance, contournant les problèmes du Global Interpreter Lock en utilisant des processus plutôt que des fils d’exécution. Ainsi, le module multiprocessing
permet au développeur de bénéficier entièrement des multiples processeurs sur une machine. Il tourne à la fois sur les systèmes Unix et Windows.
The multiprocessing
module also introduces APIs which do not have
analogs in the threading
module. A prime example of this is the
Pool
object which offers a convenient means of parallelizing the
execution of a function across multiple input values, distributing the
input data across processes (data parallelism). The following example
demonstrates the common practice of defining such functions in a module so
that child processes can successfully import that module. This basic example
of data parallelism using Pool
,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [1, 2, 3]))
affiche sur la sortie standard :
[1, 4, 9]
16.6.1.1. La classe Process
¶
Dans le module multiprocessing
, les processus sont instanciés en créant un objet Process
et en appelant sa méthode start()
. La classe Process
suit la même API que threading.Thread
. Un exemple trivial d’un programme multi-processus est :
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
Pour afficher les IDs des processus impliqués, voici un exemple plus étoffé :
from multiprocessing import Process
import os
def info(title):
print title
print 'module name:', __name__
if hasattr(os, 'getppid'): # only available on Unix
print 'parent process:', os.getppid()
print 'process id:', os.getpid()
def f(name):
info('function f')
print 'hello', name
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
For an explanation of why (on Windows) the if __name__ == '__main__'
part is
necessary, see Lignes directrices de programmation.
16.6.1.2. Échange d’objets entre les processus¶
multiprocessing
gère deux types de canaux de communication entre les processus :
Queues
The
Queue
class is a near clone ofQueue.Queue
. For example:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print q.get() # prints "[42, None, 'hello']" p.join()Les queues peuvent être utilisées par plusieurs fils d’exécution ou processus.
Pipes
La fonction
Pipe()
renvoie une paire d’objets de connexion connectés à un tube qui est par défaut à double-sens. Par exemple :from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print parent_conn.recv() # prints "[42, None, 'hello']" p.join()Les deux objets de connexion renvoyés par
Pipe()
représentent les deux bouts d’un tube. Chaque objet de connexion possède (entre autres) des méthodessend()
etrecv()
. Notez que les données d’un tube peuvent être corrompues si deux processus (ou fils d’exécution) essaient de lire ou d’écrire sur le même bout du tube en même temps. Évidemment il n’y a pas de risque de corruption si les processus utilisent deux bouts différents en même temps.
16.6.1.3. Synchronisation entre processus¶
multiprocessing
contient des équivalents à toutes les primitives de synchronisation de threading
. Par exemple il est possible d’utiliser un verrou pour s’assurer qu’un seul processus à la fois écrit sur la sortie standard :
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
print 'hello world', i
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
Sans le verrou, les sorties des différents processus risquent d’être mélangées.
16.6.1.4. Partager un état entre les processus¶
Comme mentionné plus haut, il est généralement préférable d’éviter autant que possible d’utiliser des états partagés en programmation concurrente. C’est particulièrement vrai quand plusieurs processus sont utilisés.
Cependant, si vous devez réellement partager des données, multiprocessing
permet de le faire de deux manières.
Mémoire partagée
Les données peuvent être stockées dans une mémoire partagée en utilisant des
Value
ou desArray
. Par exemple, le code suivant :from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print num.value print arr[:]affiche :
3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]Les arguments
'd'
et'i'
utilisés à la création desnum
et arr` sont des codes de types tels qu’utilisés par le modulearray
:'d'
indique un flottant double-précision et'i'
indique un entier signé. Ces objets partagés seront sûr d’utilisation entre processus et fils d’exécution.Pour plus de flexibilité dans l’utilisation de mémoire partagée, vous pouvez utiliser le module
multiprocessing.sharedctypes
qui permet la création d’objets arbitraires ctypes alloués depuis la mémoire partagée.
Processus serveur
Un objet gestionnaire renvoyé par
Manager()
contrôle un processus serveur qui détient les objets Python et autorise les autres processus à les manipuler à l’aide de mandataires.A manager returned by
Manager()
will support typeslist
,dict
,Namespace
,Lock
,RLock
,Semaphore
,BoundedSemaphore
,Condition
,Event
,Queue
,Value
andArray
. For example,from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': manager = Manager() d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print d print laffiche :
{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]Les processus serveurs de gestionnaires sont plus flexibles que les mémoires partagées parce qu’ils peuvent gérer des types d’objets arbitraires. Aussi, un gestionnaire unique peut être partagé par les processus sur différentes machines à travers le réseau. Cependant, ils sont plus lents que les mémoires partagées.
16.6.1.5. Utiliser un réservoir de workers¶
La classe Pool
représente une pool de processus de travail. Elle possède des méthodes qui permettent aux tâches d’être déchargées vers les processus de travail de différentes manières.
Par exemple :
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
# print "[0, 1, 4,..., 81]"
print pool.map(f, range(10))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print i
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print res.get(timeout=1) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print res.get(timeout=1) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print [res.get(timeout=1) for res in multiple_results]
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print res.get(timeout=1)
except TimeoutError:
print "We lacked patience and got a multiprocessing.TimeoutError"
Notez que les méthodes d’une pool ne devraient être utilisées que par le processus qui l’a créée.
Note
Functionality within this package requires that the __main__
module be
importable by the children. This is covered in Lignes directrices de programmation
however it is worth pointing out here. This means that some examples, such
as the Pool
examples will not work in the interactive interpreter.
For example:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(Si vous essayez ce code, il affichera trois traces d’appels complètes entrelacées de manière semi-aléatoire, et vous aurez alors à stopper le processus maître.)
16.6.2. Référence¶
Le paquet multiprocessing
reproduit en grande partie l’API du module threading
.
16.6.2.1. Process
et exceptions¶
-
class
multiprocessing.
Process
(group=None, target=None, name=None, args=(), kwargs={})¶ Les objets process représentent une activité exécutée dans un processus séparé. La classe
Process
a des équivalents à toutes les méthodes dethreading.Thread
.The constructor should always be called with keyword arguments. group should always be
None
; it exists solely for compatibility withthreading.Thread
. target is the callable object to be invoked by therun()
method. It defaults toNone
, meaning nothing is called. name is the process name. By default, a unique name is constructed of the form “Process-N1:N2:…:Nk” where N1,N2,…,Nk is a sequence of integers whose length is determined by the generation of the process. args is the argument tuple for the target invocation. kwargs is a dictionary of keyword arguments for the target invocation. By default, no arguments are passed to target.Si une sous-classe redéfinit le constructeur, elle doit s’assurer d’invoquer le constructeur de la classe de base (
Process.__init__()
) avant de faire autre chose du processus.-
run
()¶ Méthode représentant l’activité du processus.
Vous pouvez redéfinir cette méthode dans une sous-classe. La méthode standard
run()
invoque l’objet appelable passé au constructeur comme argument target, si fourni, avec les arguments séquentiels et nommés respectivement pris depuis les paramètres args et kwargs.
-
start
()¶ Démarre l’activité du processus.
Elle doit être appelée au plus une fois par objet processus. Elle s’arrange pour que la méthode
run()
de l’objet soit invoquée dans un processus séparé.
-
join
([timeout])¶ Block the calling thread until the process whose
join()
method is called terminates or until the optional timeout occurs.If timeout is
None
then there is no timeout.join peut être appelée plusieurs fois sur un même processus.
Un processus ne peut pas s’attendre lui-même car cela causerait un interblocage. C’est une erreur d’essayer d’attendre un processus avant qu’il ne soit démarré.
-
name
¶ The process’s name.
The name is a string used for identification purposes only. It has no semantics. Multiple processes may be given the same name. The initial name is set by the constructor.
-
is_alive
()¶ Renvoie vrai si le processus est en vie, faux sinon.
Grossièrement, un objet processus est en vie depuis le moment où la méthode
start()
finit de s’exécuter jusqu’à ce que le processus fils se termine.
-
daemon
¶ L’option daemon du processus, une valeur booléenne. L’option doit être réglée avant que la méthode
start()
ne soit appelée.La valeur initiale est héritée par le processus créateur.
Quand un processus se ferme, il tente de terminer tous ses processus enfants daemon.
Notez qu’un processus daemon n’est pas autorisé à créer des processus fils. Sinon un processus daemon laisserait ses enfants orphelins lorsqu’il se termine par la fermeture de son parent. De plus, ce ne sont pas des daemons ou services Unix, ce sont des processus normaux qui seront terminés (et non attendus) si un processus non daemon se ferme.
En plus de l’API
threading.Thread
, les objetsProcess
supportent aussi les attributs et méthodes suivants :-
pid
¶ Renvoie l’ID du processus. Avant que le processus ne soit lancé, la valeur est
None
.
-
exitcode
¶ Le code de fermeture de l’enfant. La valeur est
None
si le processus ne s’est pas encore terminé. Une valeur négative -N indique que le fils a été terminé par un signal N.
-
authkey
¶ La clé d’authentification du processus (une chaîne d’octets).
Quand
multiprocessing
est initialisé, une chaîne aléatoire est assignée au processus principal, en utilisantos.urandom()
.Quand un objet
Process
est créé, il hérité de la clé d’authentification de son parent, bien que cela puisse être changé à l’aide du paramètreauthkey
pour une autre chaîne d’octets.Voir Clés d’authentification.
-
terminate
()¶ Termine le processus. Sous Unix cela est réalisé à l’aide d’un signal
SIGTERM
, sous WindowsTerminateProcess()
est utilisé. Notez que les gestionnaires de sortie, les clauses finally etc. ne sont pas exécutées.Notez que les descendants du processus ne seront pas terminés – ils deviendront simplement orphelins.
Avertissement
Si cette méthode est utilisée quand le processus associé utilise un tube ou une queue, alors le tube ou la queue sont susceptibles d’être corrompus et peuvent devenir inutilisables par les autres processus. De façon similaire, si le processus a acquis un verrou, un sémaphore ou autre, alors le terminer est susceptible de provoquer des blocages dans les autres processus.
Notez que les méthodes
start()
,join()
,is_alive()
,terminate()
etexitcode
ne devraient être appelées que par le processus ayant créé l’objet process.Exemple d’utilisation de quelques méthodes de
Process
:>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print p, p.is_alive() <Process(Process-1, initial)> False >>> p.start() >>> print p, p.is_alive() <Process(Process-1, started)> True >>> p.terminate() >>> time.sleep(0.1) >>> print p, p.is_alive() <Process(Process-1, stopped[SIGTERM])> False >>> p.exitcode == -signal.SIGTERM True
-
-
exception
multiprocessing.
BufferTooShort
¶ Exception levée par
Connection.recv_bytes_into()
quand l’objet tampon fourni est trop petit pour le message à lire.Si
e
est une instance deBufferTooShort
alorse.args[0]
donnera un message sous forme d’une chaîne d’octets.
16.6.2.2. Tubes (pipes) et Queues¶
Quand de multiples processus sont utilisés, de l’échange de messages est souvent mis en place pour la communication entre processus et éviter d’avoir à utiliser des primitives de synchronisation telles que des verrous.
Pour échanger des messages vous pouvez utiliser un Pipe()
(pour une connexion entre deux processus) ou une queue (qui autorise de multiples producteurs et consommateurs).
The Queue
, multiprocessing.queues.SimpleQueue
and JoinableQueue
types are multi-producer,
multi-consumer FIFO queues modelled on the Queue.Queue
class in the
standard library. They differ in that Queue
lacks the
task_done()
and join()
methods introduced
into Python 2.5’s Queue.Queue
class.
Si vous utilisez JoinableQueue
alors vous devez appeler JoinableQueue.task_done()
pour chaque tâche retirée de la queue, sans quoi le sémaphore utilisé pour compter le nombre de tâches non accomplies pourra éventuellement déborder, levant une exception.
Notez que vous pouvez aussi créer une queue partagée en utilisant un objet gestionnaire – voir Gestionnaires.
Note
multiprocessing
uses the usual Queue.Empty
and
Queue.Full
exceptions to signal a timeout. They are not available in
the multiprocessing
namespace so you need to import them from
Queue
.
Note
Quand un objet est placé dans une queue, l’objet est sérialisé par pickle et un fil d’exécution en arrière-plan transmettra ensuite les données sérialisées sur un tube sous-jacent. Cela a certaines conséquences qui peuvent être un peu surprenantes, mais ne devrait causer aucune difficultés pratiques – si elles vous embêtent vraiment, alors vous pouvez à la place utiliser une queue créée avec un manager.
After putting an object on an empty queue there may be an infinitesimal delay before the queue’s
empty()
method returnsFalse
andget_nowait()
can return without raisingQueue.Empty
.Si plusieurs processus placent des objets dans la queue, il est possible pour les objets d’être reçus de l’autre côté dans le désordre. Cependant, les objets placés par un même processus seront toujours récupérés dans l’ordre attendu.
Avertissement
If a process is killed using Process.terminate()
or os.kill()
while it is trying to use a Queue
, then the data in the queue is
likely to become corrupted. This may cause any other process to get an
exception when it tries to use the queue later on.
Avertissement
Comme mentionné plus haut, si un processus fils a placé des éléments dans la queue (et qu’il n’a pas utilisé JoinableQueue.cancel_join_thread
), alors le processus ne se terminera pas tant que les éléments placés dans le tampon n’auront pas été transmis au tube.
Cela signifie que si vous essayez d’attendre ce processus vous pouvez obtenir un interblocage, à moins que vous ne soyez sûr que tous les éléments placés dans la queue ont été consommés. De même, si le processus fils n’est pas un daemon alors le processus parent pourrait bloquer à la fermeture quand il tentera d’attendre tous ses enfants non daemons.
Notez que la queue créée à l’aide d’un gestionnaire n’a pas ce problème. Voir Lignes directrices de programmation.
Pour un exemple d’utilisation de queues pour de la communication entre les processus, voir Exemples.
-
multiprocessing.
Pipe
([duplex])¶ Returns a pair
(conn1, conn2)
ofConnection
objects representing the ends of a pipe.Si duplex vaut
True
(par défaut), alors le tube est bidirectionnel. Si duplex vautFalse
il est unidirectionnel :conn1
ne peut être utilisé que pour recevoir des messages etconn2
que pour en envoyer.
-
class
multiprocessing.
Queue
([maxsize])¶ Renvoie une queue partagée entre les processus utilisant un tube et quelques verrous/sémaphores. Quand un processus place initialement un élément sur la queue, un fil d’exécution feeder est démarré pour transférer les objets du tampon vers le tube.
The usual
Queue.Empty
andQueue.Full
exceptions from the standard library’sQueue
module are raised to signal timeouts.Queue
implements all the methods ofQueue.Queue
except fortask_done()
andjoin()
.-
qsize
()¶ Renvoie la taille approximative de la queue. Ce nombre n’est pas fiable en raison des problématiques de multithreading et multiprocessing.
Notez que cela peut lever une
NotImplementedError
sous les plateformes Unix telles que Mac OS X oùsem_getvalue()
n’est pas implémentée.
-
empty
()¶ Renvoie
True
si la queue est vide,False
sinon. Cette valeur n’est pas fiable en raison des problématiques de multithreading et multiprocessing.
-
full
()¶ Renvoie
True
si la queue est pleine,False
sinon. Cette valeur n’est pas fiable en raison des problématiques de multithreading et multiprocessing.
-
put
(obj[, block[, timeout]])¶ Put obj into the queue. If the optional argument block is
True
(the default) and timeout isNone
(the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises theQueue.Full
exception if no free slot was available within that time. Otherwise (block isFalse
), put an item on the queue if a free slot is immediately available, else raise theQueue.Full
exception (timeout is ignored in that case).
-
put_nowait
(obj)¶ Équivalent à
put(obj, False)
.
-
get
([block[, timeout]])¶ Remove and return an item from the queue. If optional args block is
True
(the default) and timeout isNone
(the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises theQueue.Empty
exception if no item was available within that time. Otherwise (block isFalse
), return an item if one is immediately available, else raise theQueue.Empty
exception (timeout is ignored in that case).
-
get_nowait
()¶ Équivalent à
get(False)
.
Queue
has a few additional methods not found inQueue.Queue
. These methods are usually unnecessary for most code:-
close
()¶ Indique que plus aucune donnée ne peut être placée sur la queue par le processus courant. Le fil d’exécution en arrière-plan se terminera quand il aura transféré toutes les données du tampon vers le tube. Elle est appelée automatiquement quand la queue est collectée par le ramasse-miettes.
-
join_thread
()¶ Attend le fil d’exécution d’arrière-plan. Elle peut seulement être utilisée une fois que
close()
a été appelée. Elle bloque jusqu’à ce que le fil d’arrière-plan se termine, assurant que toutes les données du tampon ont été transmises au tube.Par défaut si un processus n’est pas le créateur de la queue alors à la fermeture elle essaiera d’attendre le fil d’exécution d’arrière-plan de la queue. Le processus peut appeler
cancel_join_thread()
pour quejoin_thread()
ne fasse rien.
-
cancel_join_thread
()¶ Empêche
join_thread()
de bloquer. En particulier, cela empêche le fil d’arrière-plan d’être attendu automatiquement quand le processus se ferme – voirjoin_thread()
.Un meilleur nom pour cette méthode pourrait être
allow_exit_without_flush()
. Cela peut provoquer des pertes de données placées dans la queue, et vous ne devriez certainement pas avoir besoin de l’utiliser. Elle n’est là que si vous souhaitez terminer immédiatement le processus sans transférer les données du tampon, et que vous ne vous inquiétez pas de perdre des données.
Note
Le fonctionnement de cette classe requiert une implémentation de sémaphore partagé sur le système d’exploitation hôte. Sans cela, la fonctionnalité sera désactivée et la tentative d’instancier une
Queue
lèvera uneImportError
. Voir bpo-3770 pour plus d’informations. Cette remarque reste valable pour les autres types de queues spécialisées définies par la suite.-
-
class
multiprocessing.queues.
SimpleQueue
¶ It is a simplified
Queue
type, very close to a lockedPipe
.-
empty
()¶ Renvoie
True
si la queue est vide,False
sinon.
-
get
()¶ Supprime et donne un élément de la queue.
-
put
(item)¶ Place item dans la queue.
-
-
class
multiprocessing.
JoinableQueue
([maxsize])¶ JoinableQueue
, aQueue
subclass, is a queue which additionally hastask_done()
andjoin()
methods.-
task_done
()¶ Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each
get()
used to fetch a task, a subsequent call totask_done()
tells the queue that the processing on the task is complete.If a
join()
is currently blocking, it will resume when all items have been processed (meaning that atask_done()
call was received for every item that had beenput()
into the queue).Lève une exception
ValueError
si appelée plus de fois qu’il y avait d’éléments dans la file.
-
join
()¶ Bloque jusqu’à ce que tous les éléments de la queue aient été récupérés et traités.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls
task_done()
to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero,join()
unblocks.
-
16.6.2.3. Divers¶
-
multiprocessing.
active_children
()¶ Renvoie la liste de tous les enfants vivants du processus courant.
Appeler cette méthode provoque l’effet de bord d’attendre tout processus qui n’a pas encore terminé.
-
multiprocessing.
cpu_count
()¶ Return the number of CPUs in the system. May raise
NotImplementedError
.
-
multiprocessing.
current_process
()¶ Renvoie l’objet
Process
correspondant au processus courant.Un analogue à
threading.current_thread()
.
-
multiprocessing.
freeze_support
()¶ Ajoute le support des programmes utilisant
multiprocessing
qui ont été gelés pour produire un exécutable Windows. (Testé avec py2exe, PyInstaller et cx_Freeze.)Cette fonction doit être appelée juste après la ligne
if __name__ == '__main__'
du module principal. Par exemple :from multiprocessing import Process, freeze_support def f(): print 'hello world!' if __name__ == '__main__': freeze_support() Process(target=f).start()
Si la ligne
freeze_support()
est omise, alors tenter de lancer l’exécutable gelé lèvera uneRuntimeError
.Appeler
freeze_support()
n’a pas d’effet quand elle est invoquée sur un système d’exploitation autre que Windows. De plus, si le module est lancé normalement par l’interpréteur Python sous Windows (le programme n’a pas été gelé), alorsfreeze_support()
n’a pas d’effet.
-
multiprocessing.
set_executable
()¶ Définit le chemin de l’interpréteur Python à utiliser pour démarrer un processus fils. (Par défaut
sys.executable
est utilisé). Les intégrateurs devront probablement faire quelque chose comme :set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
before they can create child processes. (Windows only)
Note
multiprocessing
ne contient pas d’analogues à threading.active_count()
, threading.enumerate()
, threading.settrace()
, threading.setprofile()
, threading.Timer
, ou threading.local
.
16.6.2.4. Objets de connexions¶
Les objets de connexion autorisent l’envoi et la réception d’objets sérialisables ou de chaînes de caractères. Ils peuvent être vus comme des interfaces de connexion (sockets) connectées orientées messages.
Connection objects are usually created using
Pipe
– see also
Auditeurs et Clients.
-
class
Connection
¶ -
send
(obj)¶ Envoie un objet sur l’autre bout de la connexion, qui devra être lu avec
recv()
.The object must be picklable. Very large pickles (approximately 32 MB+, though it depends on the OS) may raise a
ValueError
exception.
-
recv
()¶ Return an object sent from the other end of the connection using
send()
. Blocks until there is something to receive. RaisesEOFError
if there is nothing left to receive and the other end was closed.
-
fileno
()¶ Renvoie le descripteur de fichier ou identifiant utilisé par la connexion.
-
close
()¶ Ferme la connexion.
Elle est appelée automatiquement quand la connexion est collectée par le ramasse-miettes.
-
poll
([timeout])¶ Renvoie vrai ou faux selon si des données sont disponibles à la lecture.
Si timeout n’est pas spécifié la méthode renverra immédiatement. Si timeout est un nombre alors il spécifie le temps maximum de blocage en secondes. Si timeout est
None
, un temps d’attente infini est utilisé.
-
send_bytes
(buffer[, offset[, size]])¶ Send byte data from an object supporting the buffer interface as a complete message.
If offset is given then data is read from that position in buffer. If size is given then that many bytes will be read from buffer. Very large buffers (approximately 32 MB+, though it depends on the OS) may raise a
ValueError
exception
-
recv_bytes
([maxlength])¶ Renvoie un message complet de données binaires envoyées depuis l’autre bout de la connexion comme une chaîne de caractères. Bloque jusqu’à ce qu’il y ait quelque chose à recevoir. Lève une
EOFError
s’il ne reste rien à recevoir et que l’autre côté de la connexion a été fermé.If maxlength is specified and the message is longer than maxlength then
IOError
is raised and the connection will no longer be readable.
-
recv_bytes_into
(buffer[, offset])¶ Lit et stocke dans buffer un message complet de données binaires envoyées depuis l’autre bout de la connexion et renvoie le nombre d’octets du message. Bloque jusqu’à ce qu’il y ait quelque chose à recevoir. Lève une
EOFError
s’il ne reste rien à recevoir et que l’autre côté de la connexion a été fermé.buffer must be an object satisfying the writable buffer interface. If offset is given then the message will be written into the buffer from that position. Offset must be a non-negative integer less than the length of buffer (in bytes).
Si le tampon est trop petit une exception
BufferTooShort
est levée et le message complet est accessible viae.args[0]
oùe
est l’instance de l’exception.
-
Par exemple :
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes('thank you')
>>> a.recv_bytes()
'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Avertissement
La méthode Connection.recv()
désérialise automatiquement les données qu’elle reçoit, ce qui peut être un risque de sécurité à moins que vous ne fassiez réellement confiance au processus émetteur du message.
Par conséquent, à moins que l’objet de connexion soit instancié par Pipe()
, vous ne devriez uniquement utiliser les méthodes recv()
et send()
après avoir effectué une quelconque forme d’authentification. Voir Clés d’authentification.
Avertissement
Si un processus est tué pendant qu’il essaye de lire ou écrire sur le tube, alors les données du tube ont des chances d’être corrompues, parce qu’il devient impossible d’être sûr d’où se trouvent les bornes du message.
16.6.2.5. Primitives de synchronisation¶
Généralement les primitives de synchronisation ne sont pas nécessaire dans un programme multi-processus comme elles le sont dans un programme multi-fils d’exécution. Voir la documentation du module threading
.
Notez que vous pouvez aussi créer des primitives de synchronisation en utilisant un objet gestionnaire – voir Gestionnaires.
-
class
multiprocessing.
BoundedSemaphore
([value])¶ Un objet sémaphore lié : un analogue proche de
threading.BoundedSemaphore
.A solitary difference from its close analog exists: its
acquire
method’s first argument is named block and it supports an optional second argument timeout, as is consistent withLock.acquire()
.Note
Sur Mac OS X, elle n’est pas distinguable de la classe
Semaphore
parce quesem_getvalue()
n’est pas implémentée sur cette plateforme.
-
class
multiprocessing.
Condition
([lock])¶ A condition variable: a clone of
threading.Condition
.Si lock est spécifié il doit être un objet
Lock
ouRLock
du modulemultiprocessing
.
-
class
multiprocessing.
Event
¶ A clone of
threading.Event
. This method returns the state of the internal semaphore on exit, so it will always returnTrue
except if a timeout is given and the operation times out.Modifié dans la version 2.7: Previously, the method always returned
None
.
-
class
multiprocessing.
Lock
¶ Un verrou non récursif : un analogue proche de
threading.Lock
. Une fois que le processus ou le fil d’exécution a acquis un verrou, les tentatives suivantes d’acquisition depuis n’importe quel processus ou fil d’exécution bloqueront jusqu’à ce qu’il soit libéré ; n’importe quel processus ou fil peut le libérer. Les concepts et comportements dethreading.Lock
qui s’appliquent aux fils d’exécution sont répliqués ici dansmultiprocessing.Lock
et s’appliquent aux processus et aux fils d’exécution, à l’exception de ce qui est indiqué.Notez que
Lock
est en fait une fonction factory qui renvoie une instance demultiprocessing.synchronize.Lock
initialisée avec un contexte par défaut.Lock
supporte le protocole context manager et peut ainsi être utilisé avec une instructionwith
.-
acquire
(block=True, timeout=None)¶ Acquiert un verrou, bloquant ou non bloquant.
Avec l’argument block à
True
(par défaut), l’appel de méthode bloquera jusqu’à ce que le verrou soit dans déverrouillé, puis le verrouillera avant de renvoyerTrue
. Notez que le nom de ce premier argument diffère de celui dethreading.Lock.acquire()
.Avec l’argument block à
False
, l’appel de méthode ne bloque pas. Si le verrou est actuellement verrouillé, renvoieFalse
; autrement verrouille le verrou et renvoieTrue
.When invoked with a positive, floating-point value for timeout, block for at most the number of seconds specified by timeout as long as the lock can not be acquired. Invocations with a negative value for timeout are equivalent to a timeout of zero. Invocations with a timeout value of
None
(the default) set the timeout period to infinite. The timeout argument has no practical implications if the block argument is set toFalse
and is thus ignored. ReturnsTrue
if the lock has been acquired orFalse
if the timeout period has elapsed. Note that the timeout argument does not exist in this method’s analog,threading.Lock.acquire()
.
-
release
()¶ Libère un verrou. Elle peut être appelée depuis n’importe quel processus ou fil d’exécution, pas uniquement le processus ou le fil qui a acquis le verrou à l’origine.
Le comportement est le même que
threading.Lock.release()
excepté que lorsque la méthode est appelée sur un verrou déverrouillé, uneValueError
est levée.
-
-
class
multiprocessing.
RLock
¶ Un objet verrou récursif : un analogue proche de
threading.RLock
. Un verrou récursif doit être libéré par le processus ou le fil d’exécution qui l’a acquis. Quand un processus ou un fil acquiert un verrou récursif, le même processus/fil peut l’acquérir à nouveau sans bloquer ; le processus/fil doit le libérer autant de fois qu’il l’acquiert.Notez que
RLock
est en fait une fonction factory qui renvoie une instance demultiprocessing.synchronize.RLock
initialisée avec un contexte par défaut.RLock
supporte le protocole context manager et peut ainsi être utilisée avec une instructionwith
.-
acquire
(block=True, timeout=None)¶ Acquiert un verrou, bloquant ou non bloquant.
Quand invoqué avec l’argument block à
True
, bloque jusqu’à ce que le verrou soit déverrouillé (n’appartenant à aucun processus ou fil d’exécution) sauf s’il appartient déjà au processus ou fil d’exécution courant. Le processus ou fil d’exécution courant prend la possession du verrou (s’il ne l’a pas déjà) et incrémente d’un le niveau de récursion du verrou, renvoyant ainsiTrue
. Notez qu’il y a plusieurs différences dans le comportement de ce premier argument comparé à l’implémentation dethreading.RLock.acquire()
, à commencer par le nom de l’argument lui-même.Quand invoqué avec l’argument block à
False
, ne bloque pas. Si le verrou est déjà acquis (et possédé) par un autre processus ou fil d’exécution, le processus/fil courant n’en prend pas la possession et le niveau de récursion n’est pas incrémenté, résultant en une valeur de retour àFalse
. Si le verrou est déverrouillé, le processus/fil courant en prend possession et incrémente son niveau de récursion, renvoyantTrue
.Use and behaviors of the timeout argument are the same as in
Lock.acquire()
. Note that the timeout argument does not exist in this method’s analog,threading.RLock.acquire()
.
-
release
()¶ Libère un verrou, décrémentant son niveau de récursion. Si après la décrémentation le niveau de récursion est zéro, réinitialise le verrou à un état déverrouillé (n’appartenant à aucun processus ou fil d’exécution) et si des processus/fils attendent que le verrou se déverrouillé, autorise un seul d’entre-eux à continuer. Si après cette décrémentation le niveau de récursion est toujours strictement positif, le verrou reste verrouillé et propriété du processus/fil appelant.
N’appelez cette méthode que si le processus ou fil d’exécution appelant est propriétaire du verrou. Une
AssertionError
est levée si cette méthode est appelée par un processus/fil autre que le propriétaire ou si le verrou n’est pas verrouillé (possédé). Notez que le type d’exception levé dans cette situation diffère du comportement dethreading.RLock.release()
.
-
-
class
multiprocessing.
Semaphore
([value])¶ Un objet sémaphore, proche analogue de
threading.Semaphore
.A solitary difference from its close analog exists: its
acquire
method’s first argument is named block and it supports an optional second argument timeout, as is consistent withLock.acquire()
.
Note
The acquire()
method of BoundedSemaphore
, Lock
,
RLock
and Semaphore
has a timeout parameter not supported
by the equivalents in threading
. The signature is
acquire(block=True, timeout=None)
with keyword parameters being
acceptable. If block is True
and timeout is not None
then it
specifies a timeout in seconds. If block is False
then timeout is
ignored.
Sous Mac OS X, sem_timedwait
n’est pas supporté, donc appeler acquire()
avec un temps d’exécution limité émulera le comportement de cette fonction en utilisant une boucle d’attente.
Note
Si le signal SIGINT généré par un Ctrl-C survient pendant que le fil d’exécution principal est bloqué par un appel à BoundedSemaphore.acquire()
, Lock.acquire()
, RLock.acquire()
, Semaphore.acquire()
, Condition.acquire()
ou Condition.wait()
, l’appel sera immédiatement interrompu et une KeyboardInterrupt
sera levée.
Cela diffère du comportement de threading
où le SIGINT est ignoré tant que les appels bloquants sont en cours.
Note
Certaines des fonctionnalités de ce paquet requièrent une implémentation fonctionnelle de sémaphores partagés sur le système hôte. Sans cela, le module multiprocessing.synchronize
sera désactivé, et les tentatives de l’importer lèveront une ImportError
. Voir bpo-3770 pour plus d’informations.
16.6.2.7. Gestionnaires¶
Managers provide a way to create data which can be shared between different processes. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.
Renvoie un objet
SyncManager
démarré qui peut être utilisé pour partager des objets entre les processus. L’objet gestionnaire renvoyé correspond à un processus enfant instancié et possède des méthodes pour créer des objets partagés et renvoyer les mandataires correspondants.
Les processus gestionnaires seront arrêtés dès qu’ils seront collectés par le ramasse-miettes ou que leur processus parent se terminera. Les classes gestionnaires sont définies dans le module multiprocessing.managers
:
-
class
multiprocessing.managers.
BaseManager
([address[, authkey]])¶ Crée un objet BaseManager.
Une fois créé il faut appeler
start()
ouget_server().serve_forever()
pour assurer que l’objet gestionnaire référence un processus gestionnaire démarré.address est l’adresse sur laquelle le processus gestionnaire écoute pour de nouvelles connexions. Si address est
None
, une adresse arbitraire est choisie.authkey is the authentication key which will be used to check the validity of incoming connections to the server process. If authkey is
None
thencurrent_process().authkey
. Otherwise authkey is used and it must be a string.-
start
([initializer[, initargs]])¶ Démarre un sous-processus pour démarrer le gestionnaire. Si initializer n’est pas
None
alors le sous-processus appellerainitializer(*initargs)
quand il démarrera.
-
get_server
()¶ Renvoie un objet
Server
qui représente le serveur sous le contrôle du gestionnaire. L’objetServer
supporte la méthodeserve_forever()
:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey='abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
possède en plus un attributaddress
.
-
connect
()¶ Connecte un objet gestionnaire local au processus gestionnaire distant :
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 5000), authkey='abc') >>> m.connect()
-
shutdown
()¶ Stoppe le processus utilisé par le gestionnaire. Cela est disponible uniquement si
start()
a été utilisée pour démarrer le processus serveur.Cette méthode peut être appelée plusieurs fois.
-
register
(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶ Une méthode de classe qui peut être utilisée pour enregistrer un type ou un appelable avec la classe gestionnaire.
typeif est un « type identifier » qui est utilisé pour identifier un type particulier d’objet partagé. Cela doit être une chaîne de caractères.
callable is a callable used for creating objects for this type identifier. If a manager instance will be created using the
from_address()
classmethod or if the create_method argument isFalse
then this can be left asNone
.proxytype est une sous-classe de
BaseProxy
utilisée pour créer des mandataires autour des objets partagés avec ce typeid. S’il estNone
, une classe mandataire sera créée automatiquement.exposed est utilisé pour préciser une séquence de noms de méthodes dont les mandataires pour ce typeid doivent être autorisés à accéder via
BaseProxy._callmethod()
. (Si exposed estNone
alorsproxytype._exposed_
est utilisé à la place s’il existe.) Dans le cas où aucune liste exposed n’est précisée, toutes les « méthodes publiques » de l’objet partagé seront accessibles. (Ici une « méthode publique » signifie n’importe quel attribut qui possède une méthode__call__()
et dont le nom ne commence pas par un'_'
.)method_to_typeid est un tableau associatif utilisé pour préciser le type de retour de ces méthodes exposées qui doivent renvoyer un mandataire. Il associé un nom de méthode à une chaîne de caractères typeid. (Si method_to_typeid est
None
,proxytype._method_to_typeid_
est utilisé à la place s’il existe). Si le nom d’une méthode n’est pas une clé de ce tableau associatif ou si la valeur associée estNone
, l’objet renvoyé par la méthode sera une copie de la valeur.create_method détermine si une méthode devrait être créée avec le nom typeid, qui peut être utilisée pour indiquer au processus serveur de créer un nouvel objet partagé et d’en renvoyer un mandataire. a valeur par défaut est
True
.
Les instances de
BaseManager
ont aussi une propriété en lecture seule :-
address
¶ L’adresse utilisée par le gestionnaire.
-
-
class
multiprocessing.managers.
SyncManager
¶ Une sous-classe de
BaseManager
qui peut être utilisée pour la synchronisation entre processus. Des objets de ce type sont renvoyés parmultiprocessing.Manager()
.It also supports creation of shared lists and dictionaries.
-
BoundedSemaphore
([value])¶ Crée un objet
threading.BoundedSemaphore
partagé et renvoie un mandataire pour cet objet.
-
Condition
([lock])¶ Crée un objet
threading.Condition
partagé et renvoie un mandataire pour cet objet.Si lock est fourni alors il doit être un mandataire pour un objet
threading.Lock
outhreading.RLock
.
-
Event
()¶ Crée un objet
threading.Event
partagé et renvoie un mandataire pour cet objet.
-
Lock
()¶ Crée un objet
threading.Lock
partagé et renvoie un mandataire pour cet objet.
-
Queue
([maxsize])¶ Create a shared
Queue.Queue
object and return a proxy for it.
-
RLock
()¶ Crée un objet
threading.RLock
partagé et renvoie un mandataire pour cet objet.
-
Semaphore
([value])¶ Crée un objet
threading.Semaphore
partagé et renvoie un mandataire pour cet objet.
-
Array
(typecode, sequence)¶ Crée un tableau et renvoie un mandataire pour cet objet.
-
Value
(typecode, value)¶ Crée un objet avec un attribut
value
accessible en écriture et renvoie un mandataire pour cet objet.
-
dict
()¶ -
dict
(mapping) -
dict
(sequence) Create a shared
dict
object and return a proxy for it.
-
list
()¶ -
list
(sequence) Create a shared
list
object and return a proxy for it.
Note
Modifications to mutable values or items in dict and list proxies will not be propagated through the manager, because the proxy has no way of knowing when its values or items are modified. To modify such an item, you can re-assign the modified object to the container proxy:
# create a list proxy and append a mutable object (a dictionary) lproxy = manager.list() lproxy.append({}) # now mutate the dictionary d = lproxy[0] d['a'] = 1 d['b'] = 2 # at this point, the changes to d are not yet synced, but by # reassigning the dictionary, the proxy is notified of the change lproxy[0] = d
-
-
class
multiprocessing.managers.
Namespace
¶ Un type qui peut être enregistré avec
SyncManager
.Un espace de nommage n’a pas de méthodes publiques, mais possède des attributs accessibles en écriture. Sa représentation montre les valeurs de ses attributs.
Cependant, en utilisant un mandataire pour un espace de nommage, un attribut débutant par
'_'
est un attribut du mandataire et non de l’objet cible :>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print Global Namespace(x=10, y='hello')
16.6.2.7.1. Gestionnaires personnalisés¶
Pour créer son propre gestionnaire, il faut créer une sous-classe de BaseManager
et utiliser la méthode de classe register()
pour enregistrer de nouveaux types ou callables au gestionnaire. Par exemple :
from multiprocessing.managers import BaseManager
class MathsClass(object):
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
manager = MyManager()
manager.start()
maths = manager.Maths()
print maths.add(4, 3) # prints 7
print maths.mul(7, 8) # prints 56
16.6.2.7.2. Utiliser un gestionnaire distant¶
Il est possible de lancer un serveur gestionnaire sur une machine et d’avoir des clients l’utilisant sur d’autres machines (en supposant que les pare-feus impliqués l’autorisent).
Exécuter les commandes suivantes crée un serveur pour une simple queue partagée à laquelle des clients distants peuvent accéder :
>>> from multiprocessing.managers import BaseManager
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Un client peut accéder au serveur comme suit :
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
Un autre client peut aussi l’utiliser :
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
Les processus locaux peuvent aussi accéder à cette queue, utilisant le code précédent sur le client pour y accéder à distance :
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super(Worker, self).__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
16.6.2.8. Objets mandataires¶
Un mandataire est un objet qui référence un objet partagé appartenant (supposément) à un processus différent. L’objet partagé est appelé le référent du mandataire. Plusieurs mandataires peuvent avoir un même référent.
A proxy object has methods which invoke corresponding methods of its referent (although not every method of the referent will necessarily be available through the proxy). A proxy can usually be used in most of the same ways that its referent can:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print l
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print repr(l)
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Notez qu’appliquer str()
à un mandataire renvoie la représentation du référent, alors que repr()
renvoie celle du mandataire.
An important feature of proxy objects is that they are picklable so they can be passed between processes. Note, however, that if a proxy is sent to the corresponding manager’s process then unpickling it will produce the referent itself. This means, for example, that one shared object can contain a second:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print a, b
[[]] []
>>> b.append('hello')
>>> print a, b
[['hello']] ['hello']
Note
Les types de mandataires de multiprocessing
n’implémentent rien pour la comparaison par valeurs. Par exemple, on a :
>>> manager.list([1,2,3]) == [1,2,3]
False
Il faut à la place simplement utiliser une copie du référent pour faire les comparaisons.
-
class
multiprocessing.managers.
BaseProxy
¶ Les objets mandataires sont des instances de sous-classes de
BaseProxy
.-
_callmethod
(methodname[, args[, kwds]])¶ Appelle et renvoie le résultat d’une méthode du référent du mandataire.
Si
proxy
est un mandataire sont le référent estobj
, alors l’expression :proxy._callmethod(methodname, args, kwds)
s’évalue comme :
getattr(obj, methodname)(*args, **kwds)
dans le processus du gestionnaire.
La valeur renvoyée sera une copie du résultat de l’appel ou un mandataire sur un nouvel objet partagé – voir l’a documentation de l’argument method_to_typeid de
BaseManager.register()
.Si une exception est levée par l’appel, elle est relayée par
_callmethod()
. Si une autre exception est levée par le processus du gestionnaire, elle est convertie en uneRemoteError
et est levée par_callmethod()
.Notez en particulier qu’une exception est levée si methodname n’est pas exposée.
Un exemple d’utilisation de
_callmethod()
:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getslice__', (2, 7)) # equiv to `l[2:7]` [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equiv to `l[20]` Traceback (most recent call last): ... IndexError: list index out of range
-
_getvalue
()¶ Renvoie une copie du référent.
Si le référent n’est pas sérialisable, une exception est levée.
-
__repr__
()¶ Renvoie la représentation de l’objet mandataire.
-
__str__
()¶ Renvoie la représentation du référent.
-
16.6.2.8.1. Nettoyage¶
Un mandataire utilise un callback sous une référence faible de façon à ce que quand il est collecté par le ramasse-miettes, il se désenregistre auprès du gestionnaire qui possède le référent.
Un objet partagé est supprimé par le processus gestionnaire quand plus aucun mandataire ne le référence.
16.6.2.9. Bassins de processus¶
On peut créer un bassin de processus qui exécuteront les tâches qui lui seront soumises avec la classe Pool
.
-
class
multiprocessing.
Pool
([processes[, initializer[, initargs[, maxtasksperchild]]]])¶ Un objet process pool qui contrôle un bassin de processus workers auquel sont soumises des tâches. Il supporte les résultats asynchrones avec des timeouts et des callabacks et possède une implémentation parallèle de map.
processes is the number of worker processes to use. If processes is
None
then the number returned bycpu_count()
is used. If initializer is notNone
then each worker process will callinitializer(*initargs)
when it starts.Notez que les méthodes de l’objet pool ne doivent être appelées que par le processus qui l’a créé.
Nouveau dans la version 2.7: maxtasksperchild est le nombre de tâches qu’un processus worker peut accomplir avant de se fermer et d’être remplacé par un worker frais, pour permettre aux ressources inutilisées d’être libérées. Par défaut maxtasksperchild est
None
, ce qui signifie que le worker vit aussi longtemps que le bassin.Note
Les processus workers à l’intérieur d’une
Pool
vivent par défaut aussi longtemps que la queue de travail du bassin. Un modèle fréquent chez d’autres systèmes (tels qu’Apache, mod_wsgi, etc.) pour libérer les ressources détenues par les workers est d’autoriser un worker dans le bassin à accomplir seulement une certaine charge de travail avant de se fermer, se retrouvant nettoyé et remplacé par un nouvelle processus fraîchement lancé. L’argument maxtasksperchild dePool
expose cette fonctionnalité à l’utilisateur final.-
apply
(func[, args[, kwds]])¶ Equivalent of the
apply()
built-in function. It blocks until the result is ready, soapply_async()
is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.
-
apply_async
(func[, args[, kwds[, callback]]])¶ Une variante de la méthode
apply()
qui renvoie un objet résultat.If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.
-
map
(func, iterable[, chunksize])¶ Un équivalent parallèle à la fonction built-in
map()
(qui ne supporte cependant qu’un itérable en argument). Elle bloque jusqu’à ce que le résultat soit prêt.La méthode découpe l’itérable en un nombre de morceaux qu’elle envoie au bassin de processus comme des tâches séparées. La taille (approximative) de ces morceaux peut être précisée en passant à chunksize un entier positif.
-
map_async
(func, iterable[, chunksize[, callback]])¶ Une variante de la méthode
map()
qui renvoie un objet résultat.If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.
-
imap
(func, iterable[, chunksize])¶ An equivalent of
itertools.imap()
.L’argument chunksize est le même que celui utilisé par la méthode
map()
. Pour de très longs itérables, utiliser une grande valeur pour chunksize peut faire s’exécuter la tâche beaucoup plus rapidement qu’en utilisant la valeur par défaut de1
.Aussi, si chucksize vaut
1
alors la méthodenext()
de l’itérateur renvoyé parimap()
prend un paramètre optionnel timeout :next(timeout)
lève unemultiprocessing.TimeoutError
si le résultat ne peut pas être renvoyé avant timeout secondes.
-
imap_unordered
(func, iterable[, chunksize])¶ Identique à
imap()
si ce n’est que l’ordre des résultats de l’itérateur renvoyé doit être considéré comme arbitraire. (L’ordre n’est garanti que quand il n’y a qu’un worker.)
-
close
()¶ Empêche de nouvelles tâches d’être envoyées à la pool. Les processus workers se terminent une fois que toutes les tâches ont été complétées.
-
terminate
()¶ Stoppe immédiatement les processus workers sans finaliser les travaux courants. Quand l’objet pool est collecté par le ramasse-miettes, sa méthode
terminate()
est appelée immédiatement.
-
join
()¶ Attend que les processus workers se terminent. Il est nécessaire d’appeler
close()
outerminate()
avant d’utiliserjoin()
.
-
-
class
multiprocessing.pool.
AsyncResult
¶ La classe des résultats renvoyés par
Pool.apply_async()
etPool.map_async()
.-
get
([timeout])¶ Renvoie le résultat quand il arrive. Si timeout n’est pas
None
et que le résultat n’arrive pas avant timeout secondes, unemultiprocessing.TimeoutError
est levée. Si l’appel distance lève une exception, alors elle est relayée parget()
.
-
wait
([timeout])¶ Attend que le résultat soit disponible ou que timeout secondes s’écoulent.
-
ready
()¶ Renvoie
True
ouFalse
suivant si la tâche est accomplie.
-
successful
()¶ Renvoie
True
ouFalse
suivant si la tâche est accomplie sans lever d’exception. Lève uneAssertionError
si le résultat n’est pas prêt.
-
Les exemples suivants présentent l’utilisation d’un bassin de workers :
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print it.next() # prints "0"
print it.next() # prints "1"
print it.next(timeout=1) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print result.get(timeout=1) # raises multiprocessing.TimeoutError
16.6.2.10. Auditeurs et Clients¶
Usually message passing between processes is done using queues or by using
Connection
objects returned by Pipe()
.
However, the multiprocessing.connection
module allows some extra
flexibility. It basically gives a high level message oriented API for dealing
with sockets or Windows named pipes, and also has support for digest
authentication using the hmac
module.
-
multiprocessing.connection.
deliver_challenge
(connection, authkey)¶ Envoie un message généré aléatoirement à l’autre bout de la connexion et attend une réponse.
If the reply matches the digest of the message using authkey as the key then a welcome message is sent to the other end of the connection. Otherwise
AuthenticationError
is raised.
-
multiprocessing.connection.
answer_challenge
(connection, authkey)¶ Reçoit un message, calcule le condensat du message en utilisant la clé authkey, et envoie le condensat en réponse.
If a welcome message is not received, then
AuthenticationError
is raised.
-
multiprocessing.connection.
Client
(address[, family[, authenticate[, authkey]]])¶ Attempt to set up a connection to the listener which is using address address, returning a
Connection
.Le type de la connexion est déterminé par l’argument family, mais il peut généralement être omis puisqu’il peut être inféré depuis le format d”address. (Voir Formats d’adresses)
If authenticate is
True
or authkey is a string then digest authentication is used. The key used for authentication will be either authkey orcurrent_process().authkey)
if authkey isNone
. If authentication fails thenAuthenticationError
is raised. See Clés d’authentification.
-
class
multiprocessing.connection.
Listener
([address[, family[, backlog[, authenticate[, authkey]]]]])¶ Un wrapper sur une socket liée ou un tube nommé sous Windows qui écoute pour des connexions.
address est l’adresse à utiliser par la socket liée ou le tube nommé de l’objet auditeur.
Note
Si une adresse “0.0.0.0” est utilisée, l’adresse ne sera pas un point d’accès connectable sous Windows. Si vous avez besoin d’un point d’accès connectable, utilisez “127.0.0.1”.
family est le type de socket (ou tube nommé) à utiliser. Cela peut être l’une des chaînes
'AF_INET'
(pour une socket TCP),'AF_UNIX'
(pour une socket Unix) ou'AF_PIPE'
(pour un tube nommé sous Windows). Seulement la première d’entre elles est garantie d’être disponible. Si family estNone
, la famille est inférée depuis le format d”address. Si address est aussiNone
, la famille par défaut est utilisée. La famille par défaut est supposée être la plus rapide disponible. Voir Formats d’adresses. Notez que si la family est'AF_UNIX'
et qu”address estNone
, la socket est créée dans un répertoire temporaire privé créé avectempfile.mkstemp()
.Si l’objet auditeur utilise une socket alors backlog (1 par défaut) es passé à la méthode
listen()
de la socket une fois qu’elle a été liée.If authenticate is
True
(False
by default) or authkey is notNone
then digest authentication is used.If authkey is a string then it will be used as the authentication key; otherwise it must be
None
.If authkey is
None
and authenticate isTrue
thencurrent_process().authkey
is used as the authentication key. If authkey isNone
and authenticate isFalse
then no authentication is done. If authentication fails thenAuthenticationError
is raised. See Clés d’authentification.-
accept
()¶ Accept a connection on the bound socket or named pipe of the listener object and return a
Connection
object. If authentication is attempted and fails, thenAuthenticationError
is raised.
-
close
()¶ Ferme la socket liée ou le tube nommé de l’objet auditeur. La méthode est appelée automatiquement quand l’auditeur est collecté par le ramasse-miettes. Il est cependant conseillé de l’appeler explicitement.
Les objets auditeurs ont aussi les propriétés en lecture seule suivantes :
-
address
¶ L’adresse utilisée par l’objet auditeur.
-
last_accepted
¶ L’adresse depuis laquelle a été établie la dernière connexion.
None
si aucune n’est disponible.
-
The module defines the following exceptions:
-
exception
multiprocessing.connection.
ProcessError
¶ The base class of all
multiprocessing
exceptions.
-
exception
multiprocessing.connection.
BufferTooShort
¶ Exception levée par
Connection.recv_bytes_into()
quand l’objet tampon fourni est trop petit pour le message à lire.
-
exception
multiprocessing.connection.
AuthenticationError
¶ Raised when there is an authentication error.
-
exception
multiprocessing.connection.
TimeoutError
¶ Raised by methods with a timeout when the timeout expires.
Exemples
Le code serveur suivant crée un auditeur qui utilise 'secret password'
comme clé d’authentification. Il attend ensuite une connexion et envoie les données au client :
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
listener = Listener(address, authkey='secret password')
conn = listener.accept()
print 'connection accepted from', listener.last_accepted
conn.send([2.25, None, 'junk', float])
conn.send_bytes('hello')
conn.send_bytes(array('i', [42, 1729]))
conn.close()
listener.close()
Le code suivant se connecte au serveur et en reçoit des données :
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
conn = Client(address, authkey='secret password')
print conn.recv() # => [2.25, None, 'junk', float]
print conn.recv_bytes() # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print conn.recv_bytes_into(arr) # => 8
print arr # => array('i', [42, 1729, 0, 0, 0])
conn.close()
16.6.2.10.1. Formats d’adresses¶
Une adresse
'AF_INET'
est un tuple de la forme(hostname, port)
où hostname est une chaîne et port un entier.Une adresse
'AF_UNIX'
est une chaîne représentant un nom de fichier sur le système de fichiers.- Une adresse
'AF_PIPE'
est une chaîne de la forme r'\.\pipe{PipeName}'
. Pour utiliser unClient()
pour se connecter à un tube nommé sur une machine distante appelée ServerName, il faut plutôt utiliser une adresse de la former'\ServerName\pipe{PipeName}'
.
- Une adresse
Notez que toute chaîne commençant par deux antislashs est considérée par défaut comme l’adresse d’un 'AF_PIPE'
plutôt qu’une adresse 'AF_UNIX'
.
16.6.2.11. Clés d’authentification¶
When one uses Connection.recv()
, the
data received is automatically
unpickled. Unfortunately unpickling data from an untrusted source is a security
risk. Therefore Listener
and Client()
use the hmac
module
to provide digest authentication.
An authentication key is a string which can be thought of as a password: once a connection is established both ends will demand proof that the other knows the authentication key. (Demonstrating that both ends are using the same key does not involve sending the key over the connection.)
Si l’authentification est requise et qu’aucune clé n’est spécifiée alors la valeur de retour de current_process().authkey
est utilisée (voir Process
). Celle valeur est automatiquement héritée par tout objet Process
créé par le processus courant. Cela signifie que (par défaut) tous les processus d’un programme multi-processus partageront une clé d’authentification unique qui peut être utilisée pour mettre en place des connexions entre-eux.
Des clés d’authentification adaptées peuvent aussi être générées par os.urandom()
.
16.6.2.12. Journalisation¶
Un certain support de la journalisation est disponible. Notez cependant que le le paquet logging
n’utilise pas de verrous partagés entre les processus et il est donc possible (dépendant du type de gestionnaire) que les messages de différents processus soient mélangés.
-
multiprocessing.
get_logger
()¶ Renvoie le journaliseur utilisé par
multiprocessing
. Si nécessaire, un nouveau sera créé.À sa première création le journaliseur a pour niveau
logging.NOTSET
et pas de gestionnaire par défaut. Les messages envoyés à ce journaliseur ne seront pas propagés par défaut au journaliseur principal.Notez que sous Windows les processus fils n’hériteront que du niveau du journaliseur du processus parent – toute autre personnalisation du journaliseur ne sera pas héritée.
-
multiprocessing.
log_to_stderr
()¶ Cette fonction effectue un appel à
get_logger()
mais en plus de renvoyer le journaliseur créé par get_logger, elle ajoute un gestionnaire qui envoie la sortie sursys.stderr
en utilisant le format'[%(levelname)s/%(processName)s] %(message)s'
.
L’exemple ci-dessous présente une session avec la journalisation activée :
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
In addition to having these two logging functions, the multiprocessing also
exposes two additional logging level attributes. These are SUBWARNING
and SUBDEBUG
. The table below illustrates where theses fit in the
normal level hierarchy.
Niveau |
Valeur numérique |
---|---|
|
25 |
|
5 |
Pour un tableau complet des niveaux de journalisation, voir le module logging
.
These additional logging levels are used primarily for certain debug messages
within the multiprocessing module. Below is the same example as above, except
with SUBDEBUG
enabled:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(multiprocessing.SUBDEBUG)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../pymp-djGBXN/listener-...'
>>> del m
[SUBDEBUG/MainProcess] finalizer calling ...
[INFO/MainProcess] sending shutdown message to manager
[DEBUG/SyncManager-...] manager received shutdown message
[SUBDEBUG/SyncManager-...] calling <Finalize object, callback=unlink, ...
[SUBDEBUG/SyncManager-...] finalizer calling <built-in function unlink> ...
[SUBDEBUG/SyncManager-...] calling <Finalize object, dead>
[SUBDEBUG/SyncManager-...] finalizer calling <function rmtree at 0x5aa730> ...
[INFO/SyncManager-...] manager exiting with exitcode 0
16.6.2.13. Le module multiprocessing.dummy
¶
multiprocessing.dummy
réplique toute l’API de multiprocessing
mais n’est rien de plus qu’un wrapper autour du module threading
.
16.6.3. Lignes directrices de programmation¶
Il y a certaines lignes directrices et idiomes auxquels il faut adhérer en utilisant multiprocessing
.
16.6.3.1. All platforms¶
Éviter les états partagés
Autant que possible, vous devriez éviter de déplacer de larges données entre les processus.
It is probably best to stick to using queues or pipes for communication between processes rather than using the lower level synchronization primitives from the
threading
module.
Sérialisation
Assurez-vous que les arguments passés aux méthodes des mandataires soient sérialisables (pickables).
Sûreté des mandataires à travers les fils d’exécution
N’utilisez pas d’objet mandataire depuis plus d’un fil d’exécution à moins que vous ne le protégiez avec un verrou.
(Il n’y a jamais de problème avec plusieurs processus utilisant un même mandataire.)
Attendre les processus zombies
Sous Unix quand un processus se termine mais n’est pas attendu, il devient un zombie. Il ne devrait jamais y en avoir beaucoup parce que chaque fois qu’un nouveau processus démarre (ou que
active_children()
est appelée) tous les processus complétés qui n’ont pas été attendus le seront. Aussi appeler la méthodeProcess.is_alive
d’un processus terminé attendra le processus. Toutefois il est probablement une bonne pratique d’attendre explicitement tous les processus que vous démarrez.
Préférez hériter que sérialiser/désérialiser
On Windows many types from
multiprocessing
need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.
Éviter de terminer les processus
Utiliser la méthode
Process.terminate
pour stopper un processus risque de casser ou de rendre indisponible aux autres processus des ressources partagées (comme des verrous, sémaphores, tubes et queues) actuellement utilisée par le processus.Il est donc probablement préférable de n’utiliser
Process.terminate
que sur les processus qui n’utilisent jamais de ressources partagées.
Attendre les processus qui utilisent des queues
Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the « feeder » thread to the underlying pipe. (The child process can call the
cancel_join_thread()
method of the queue to avoid this behaviour.)Cela signifie que chaque fois que vous utilisez une queue vous devez vous assurer que tous les éléments qui y ont été placés seront effectivement supprimés avant que le processus ne soit attendu. Autrement vous ne pouvez pas être sûr que les processus qui ont placé des éléments dans la queue se termineront. Souvenez-vous aussi que tous les processus non daemons seront attendus automatiquement.
L’exemple suivant provoquera un interblocage :
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()Une solution ici serait d’intervertir les deux dernières lignes (ou simplement supprimer la ligne
p.join()
).
Passer explicitement les ressources aux processus fils
On Unix a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process.
Apart from making the code (potentially) compatible with Windows this also ensures that as long as the child process is still alive the object will not be garbage collected in the parent process. This might be important if some resource is freed when the object is garbage collected in the parent process.
Donc par exemple :
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()devrait être réécrit comme :
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
Faire attention à remplacer sys.stdin
par un objet « file-like »
À l’origine,
multiprocessing
appelait inconditionnellement :os.close(sys.stdin.fileno())dans la méthode
multiprocessing.Process._bootstrap()
— cela provoquait des problèmes avec les processus imbriqués. Cela peut être changé en :sys.stdin.close() sys.stdin = open(os.devnull)Qui résout le problème fondamental des collisions entre processus provoquant des erreurs de mauvais descripteurs de fichiers, mais introduit un potentiel danger pour les applications qui remplacent
sys.stdin()
avec un « file-like object » ayant une sortie bufferisée. Ce danger est que si plusieurs processus appellentclose()
sur cet objet file-like, cela peut amener les données à être transmises à l’objet à plusieurs reprises, résultant en une corruption.Si vous écrivez un objet file-like et implémentez votre propre cache, vous pouvez le rendre sûr pour les forks en stockant le pid chaque fois que vous ajoutez des données au cache, et annulez le cache quand le pip change. Par exemple :
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cachePour plus d’informations, voir bpo-5155, bpo-5313 et bpo-5331
16.6.3.2. Windows¶
Since Windows lacks os.fork()
it has a few extra restrictions:
Plus de sérialisation
Ensure that all arguments to
Process.__init__()
are picklable. This means, in particular, that bound or unbound methods cannot be used directly as thetarget
argument on Windows — just define a function and use that instead.Also, if you subclass
Process
then make sure that instances will be picklable when theProcess.start
method is called.
Variables globales
Gardez en tête que si le code exécuté dans un processus fils essaie d’accéder à une variable globale, alors la valeur qu’il voit (s’il y en a une) pourrait ne pas être la même que la valeur du processus parent au moment même où
Process.start
est appelée.Cependant, les variables globales qui sont juste des constantes de modules ne posent pas de problèmes.
Importation sûre du module principal
Assurez-vous que le module principal peut être importé en toute sécurité par un nouvel interpréteur Python sans causer d’effets de bord inattendus (comme le démarrage d’un nouveau processus).
For example, under Windows running the following module would fail with a
RuntimeError
:from multiprocessing import Process def foo(): print 'hello' p = Process(target=foo) p.start()Vous devriez plutôt protéger le « point d’entrée » du programme en utilisant
if __name__ == '__main__':
comme suit :from multiprocessing import Process, freeze_support def foo(): print 'hello' if __name__ == '__main__': freeze_support() p = Process(target=foo) p.start()(La ligne
freeze_support()
peut être omise si le programme est uniquement lancé normalement et pas gelé.)Cela permet aux interpréteurs Python fraîchement instanciés d’importer en toute sécurité le module et d’exécution ensuite la fonction
foo()
.Des restrictions similaires s’appliquent si une pool ou un gestionnaire est créé dans le module principal.
16.6.4. Exemples¶
Démonstration de comment créer et utiliser des gestionnaires et mandataires personnalisés :
#
# This module shows how to use arbitrary callables with a subclass of
# `BaseManager`.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo(object):
def f(self):
print 'you called Foo.f()'
def g(self):
print 'you called Foo.g()'
def _h(self):
print 'you called Foo._h()'
# A simple generator function
def baz():
for i in xrange(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ('next', '__next__')
def __iter__(self):
return self
def next(self):
return self._callmethod('next')
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print '-' * 20
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print '-' * 20
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print '-' * 20
it = manager.baz()
for i in it:
print '<%d>' % i,
print
print '-' * 20
op = manager.operator()
print 'op.add(23, 45) =', op.add(23, 45)
print 'op.pow(2, 94) =', op.pow(2, 94)
print 'op.getslice(range(10), 2, 6) =', op.getslice(range(10), 2, 6)
print 'op.repeat(range(5), 3) =', op.repeat(range(5), 3)
print 'op._exposed_ =', op._exposed_
##
if __name__ == '__main__':
freeze_support()
test()
En utilisant Pool
:
#
# A test of `multiprocessing.Pool` class
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
def f(x):
return 1.0 / (x-5.0)
def pow3(x):
return x**3
def noop(x):
pass
#
# Test code
#
def test():
print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
#
# Create pool
#
PROCESSES = 4
print 'Creating pool with %d processes\n' % PROCESSES
pool = multiprocessing.Pool(PROCESSES)
print 'pool = %s' % pool
print
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print 'Ordered results using pool.apply_async():'
for r in results:
print '\t', r.get()
print
print 'Ordered results using pool.imap():'
for x in imap_it:
print '\t', x
print
print 'Unordered results using pool.imap_unordered():'
for x in imap_unordered_it:
print '\t', x
print
print 'Ordered results using pool.map() --- will block till complete:'
for x in pool.map(calculatestar, TASKS):
print '\t', x
print
#
# Simple benchmarks
#
N = 100000
print 'def pow3(x): return x**3'
t = time.time()
A = map(pow3, xrange(N))
print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
(N, time.time() - t)
t = time.time()
B = pool.map(pow3, xrange(N))
print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
(N, time.time() - t)
t = time.time()
C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
' seconds' % (N, N//8, time.time() - t)
assert A == B == C, (len(A), len(B), len(C))
print
L = [None] * 1000000
print 'def noop(x): pass'
print 'L = [None] * 1000000'
t = time.time()
A = map(noop, L)
print '\tmap(noop, L):\n\t\t%s seconds' % \
(time.time() - t)
t = time.time()
B = pool.map(noop, L)
print '\tpool.map(noop, L):\n\t\t%s seconds' % \
(time.time() - t)
t = time.time()
C = list(pool.imap(noop, L, chunksize=len(L)//8))
print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
(len(L)//8, time.time() - t)
assert A == B == C, (len(A), len(B), len(C))
print
del A, B, C, L
#
# Test error handling
#
print 'Testing error handling:'
try:
print pool.apply(f, (5,))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from pool.apply()'
else:
raise AssertionError('expected ZeroDivisionError')
try:
print pool.map(f, range(10))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from pool.map()'
else:
raise AssertionError('expected ZeroDivisionError')
try:
print list(pool.imap(f, range(10)))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from list(pool.imap())'
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, range(10))
for i in range(10):
try:
x = it.next()
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
print
#
# Testing timeouts
#
print 'Testing ApplyResult.get() with timeout:',
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print
print
print 'Testing IMapIterator.next() with timeout:',
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print
print
#
# Testing callback
#
print 'Testing callback:'
A = []
B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
r = pool.apply_async(mul, (7, 8), callback=A.append)
r.wait()
r = pool.map_async(pow3, range(10), callback=A.extend)
r.wait()
if A == B:
print '\tcallbacks succeeded\n'
else:
print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
#
# Check there are no outstanding tasks
#
assert not pool._cache, 'cache = %r' % pool._cache
#
# Check close() methods
#
print 'Testing close():'
for worker in pool._pool:
assert worker.is_alive()
result = pool.apply_async(time.sleep, [0.5])
pool.close()
pool.join()
assert result.get() is None
for worker in pool._pool:
assert not worker.is_alive()
print '\tclose() succeeded\n'
#
# Check terminate() method
#
print 'Testing terminate():'
pool = multiprocessing.Pool(2)
DELTA = 0.1
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
pool.terminate()
pool.join()
for worker in pool._pool:
assert not worker.is_alive()
print '\tterminate() succeeded\n'
#
# Check garbage collection
#
print 'Testing garbage collection:'
pool = multiprocessing.Pool(2)
DELTA = 0.1
processes = pool._pool
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
results = pool = None
time.sleep(DELTA * 2)
for worker in processes:
assert not worker.is_alive()
print '\tgarbage collection succeeded\n'
if __name__ == '__main__':
multiprocessing.freeze_support()
assert len(sys.argv) in (1, 2)
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print ' Using processes '.center(79, '-')
elif sys.argv[1] == 'threads':
print ' Using threads '.center(79, '-')
import multiprocessing.dummy as multiprocessing
else:
print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
raise SystemExit(2)
test()
Synchronization types like locks, conditions and queues:
#
# A test file for the `multiprocessing` package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import time, sys, random
from Queue import Empty
import multiprocessing # may get overwritten
#### TEST_VALUE
def value_func(running, mutex):
random.seed()
time.sleep(random.random()*4)
mutex.acquire()
print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
running.value -= 1
mutex.release()
def test_value():
TASKS = 10
running = multiprocessing.Value('i', TASKS)
mutex = multiprocessing.Lock()
for i in range(TASKS):
p = multiprocessing.Process(target=value_func, args=(running, mutex))
p.start()
while running.value > 0:
time.sleep(0.08)
mutex.acquire()
print running.value,
sys.stdout.flush()
mutex.release()
print
print 'No more running processes'
#### TEST_QUEUE
def queue_func(queue):
for i in range(30):
time.sleep(0.5 * random.random())
queue.put(i*i)
queue.put('STOP')
def test_queue():
q = multiprocessing.Queue()
p = multiprocessing.Process(target=queue_func, args=(q,))
p.start()
o = None
while o != 'STOP':
try:
o = q.get(timeout=0.3)
print o,
sys.stdout.flush()
except Empty:
print 'TIMEOUT'
print
#### TEST_CONDITION
def condition_func(cond):
cond.acquire()
print '\t' + str(cond)
time.sleep(2)
print '\tchild is notifying'
print '\t' + str(cond)
cond.notify()
cond.release()
def test_condition():
cond = multiprocessing.Condition()
p = multiprocessing.Process(target=condition_func, args=(cond,))
print cond
cond.acquire()
print cond
cond.acquire()
print cond
p.start()
print 'main is waiting'
cond.wait()
print 'main has woken up'
print cond
cond.release()
print cond
cond.release()
p.join()
print cond
#### TEST_SEMAPHORE
def semaphore_func(sema, mutex, running):
sema.acquire()
mutex.acquire()
running.value += 1
print running.value, 'tasks are running'
mutex.release()
random.seed()
time.sleep(random.random()*2)
mutex.acquire()
running.value -= 1
print '%s has finished' % multiprocessing.current_process()
mutex.release()
sema.release()
def test_semaphore():
sema = multiprocessing.Semaphore(3)
mutex = multiprocessing.RLock()
running = multiprocessing.Value('i', 0)
processes = [
multiprocessing.Process(target=semaphore_func,
args=(sema, mutex, running))
for i in range(10)
]
for p in processes:
p.start()
for p in processes:
p.join()
#### TEST_JOIN_TIMEOUT
def join_timeout_func():
print '\tchild sleeping'
time.sleep(5.5)
print '\n\tchild terminating'
def test_join_timeout():
p = multiprocessing.Process(target=join_timeout_func)
p.start()
print 'waiting for process to finish'
while 1:
p.join(timeout=1)
if not p.is_alive():
break
print '.',
sys.stdout.flush()
#### TEST_EVENT
def event_func(event):
print '\t%r is waiting' % multiprocessing.current_process()
event.wait()
print '\t%r has woken up' % multiprocessing.current_process()
def test_event():
event = multiprocessing.Event()
processes = [multiprocessing.Process(target=event_func, args=(event,))
for i in range(5)]
for p in processes:
p.start()
print 'main is sleeping'
time.sleep(2)
print 'main is setting event'
event.set()
for p in processes:
p.join()
#### TEST_SHAREDVALUES
def sharedvalues_func(values, arrays, shared_values, shared_arrays):
for i in range(len(values)):
v = values[i][1]
sv = shared_values[i].value
assert v == sv
for i in range(len(values)):
a = arrays[i][1]
sa = list(shared_arrays[i][:])
assert a == sa
print 'Tests passed'
def test_sharedvalues():
values = [
('i', 10),
('h', -2),
('d', 1.25)
]
arrays = [
('i', range(100)),
('d', [0.25 * i for i in range(100)]),
('H', range(1000))
]
shared_values = [multiprocessing.Value(id, v) for id, v in values]
shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
p = multiprocessing.Process(
target=sharedvalues_func,
args=(values, arrays, shared_values, shared_arrays)
)
p.start()
p.join()
assert p.exitcode == 0
####
def test(namespace=multiprocessing):
global multiprocessing
multiprocessing = namespace
for func in [ test_value, test_queue, test_condition,
test_semaphore, test_join_timeout, test_event,
test_sharedvalues ]:
print '\n\t######## %s\n' % func.__name__
func()
ignore = multiprocessing.active_children() # cleanup any old processes
if hasattr(multiprocessing, '_debug_info'):
info = multiprocessing._debug_info()
if info:
print info
raise ValueError('there should be no positive refcounts left')
if __name__ == '__main__':
multiprocessing.freeze_support()
assert len(sys.argv) in (1, 2)
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print ' Using processes '.center(79, '-')
namespace = multiprocessing
elif sys.argv[1] == 'manager':
print ' Using processes and a manager '.center(79, '-')
namespace = multiprocessing.Manager()
namespace.Process = multiprocessing.Process
namespace.current_process = multiprocessing.current_process
namespace.active_children = multiprocessing.active_children
elif sys.argv[1] == 'threads':
print ' Using threads '.center(79, '-')
import multiprocessing.dummy as namespace
else:
print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
raise SystemExit(2)
test(namespace)
Un exemple montrant comment utiliser des queues pour alimenter en tâches une collection de processus workers et collecter les résultats :
#
# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue. If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS1)):
print '\t', done_queue.get()
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()
An example of how a pool of worker processes can each run a
SimpleHTTPServer.HttpServer
instance while sharing a single listening
socket.
#
# Example where a pool of http servers share a single listening socket
#
# On Windows this module depends on the ability to pickle a socket
# object so that the worker processes can inherit a copy of the server
# object. (We import `multiprocessing.reduction` to enable this pickling.)
#
# Not sure if we should synchronize access to `socket.accept()` method by
# using a process-shared lock -- does not seem to be necessary.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import os
import sys
from multiprocessing import Process, current_process, freeze_support
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
if sys.platform == 'win32':
import multiprocessing.reduction # make sockets pickable/inheritable
def note(format, *args):
sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args))
class RequestHandler(SimpleHTTPRequestHandler):
# we override log_message() to show which process is handling the request
def log_message(self, format, *args):
note(format, *args)
def serve_forever(server):
note('starting server')
try:
server.serve_forever()
except KeyboardInterrupt:
pass
def runpool(address, number_of_processes):
# create a single server object -- children will each inherit a copy
server = HTTPServer(address, RequestHandler)
# create child processes to act as workers
for i in range(number_of_processes-1):
Process(target=serve_forever, args=(server,)).start()
# main process also acts as a worker
serve_forever(server)
def test():
DIR = os.path.join(os.path.dirname(__file__), '..')
ADDRESS = ('localhost', 8000)
NUMBER_OF_PROCESSES = 4
print 'Serving at http://%s:%d using %d worker processes' % \
(ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES)
print 'To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32']
os.chdir(DIR)
runpool(ADDRESS, NUMBER_OF_PROCESSES)
if __name__ == '__main__':
freeze_support()
test()
Some simple benchmarks comparing multiprocessing
with threading
:
#
# Simple benchmarks for the multiprocessing package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import time, sys, multiprocessing, threading, Queue, gc
if sys.platform == 'win32':
_timer = time.clock
else:
_timer = time.time
delta = 1
#### TEST_QUEUESPEED
def queuespeed_func(q, c, iterations):
a = '0' * 256
c.acquire()
c.notify()
c.release()
for i in xrange(iterations):
q.put(a)
q.put('STOP')
def test_queuespeed(Process, q, c):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = Process(target=queuespeed_func, args=(q, c, iterations))
c.acquire()
p.start()
c.wait()
c.release()
result = None
t = _timer()
while result != 'STOP':
result = q.get()
elapsed = _timer() - t
p.join()
print iterations, 'objects passed through the queue in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_PIPESPEED
def pipe_func(c, cond, iterations):
a = '0' * 256
cond.acquire()
cond.notify()
cond.release()
for i in xrange(iterations):
c.send(a)
c.send('STOP')
def test_pipespeed():
c, d = multiprocessing.Pipe()
cond = multiprocessing.Condition()
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = multiprocessing.Process(target=pipe_func,
args=(d, cond, iterations))
cond.acquire()
p.start()
cond.wait()
cond.release()
result = None
t = _timer()
while result != 'STOP':
result = c.recv()
elapsed = _timer() - t
p.join()
print iterations, 'objects passed through connection in',elapsed,'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_SEQSPEED
def test_seqspeed(seq):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
t = _timer()
for i in xrange(iterations):
a = seq[5]
elapsed = _timer()-t
print iterations, 'iterations in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_LOCK
def test_lockspeed(l):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
t = _timer()
for i in xrange(iterations):
l.acquire()
l.release()
elapsed = _timer()-t
print iterations, 'iterations in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_CONDITION
def conditionspeed_func(c, N):
c.acquire()
c.notify()
for i in xrange(N):
c.wait()
c.notify()
c.release()
def test_conditionspeed(Process, c):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
c.acquire()
p = Process(target=conditionspeed_func, args=(c, iterations))
p.start()
c.wait()
t = _timer()
for i in xrange(iterations):
c.notify()
c.wait()
elapsed = _timer()-t
c.release()
p.join()
print iterations * 2, 'waits in', elapsed, 'seconds'
print 'average number/sec:', iterations * 2 / elapsed
####
def test():
manager = multiprocessing.Manager()
gc.disable()
print '\n\t######## testing Queue.Queue\n'
test_queuespeed(threading.Thread, Queue.Queue(),
threading.Condition())
print '\n\t######## testing multiprocessing.Queue\n'
test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
multiprocessing.Condition())
print '\n\t######## testing Queue managed by server process\n'
test_queuespeed(multiprocessing.Process, manager.Queue(),
manager.Condition())
print '\n\t######## testing multiprocessing.Pipe\n'
test_pipespeed()
print
print '\n\t######## testing list\n'
test_seqspeed(range(10))
print '\n\t######## testing list managed by server process\n'
test_seqspeed(manager.list(range(10)))
print '\n\t######## testing Array("i", ..., lock=False)\n'
test_seqspeed(multiprocessing.Array('i', range(10), lock=False))
print '\n\t######## testing Array("i", ..., lock=True)\n'
test_seqspeed(multiprocessing.Array('i', range(10), lock=True))
print
print '\n\t######## testing threading.Lock\n'
test_lockspeed(threading.Lock())
print '\n\t######## testing threading.RLock\n'
test_lockspeed(threading.RLock())
print '\n\t######## testing multiprocessing.Lock\n'
test_lockspeed(multiprocessing.Lock())
print '\n\t######## testing multiprocessing.RLock\n'
test_lockspeed(multiprocessing.RLock())
print '\n\t######## testing lock managed by server process\n'
test_lockspeed(manager.Lock())
print '\n\t######## testing rlock managed by server process\n'
test_lockspeed(manager.RLock())
print
print '\n\t######## testing threading.Condition\n'
test_conditionspeed(threading.Thread, threading.Condition())
print '\n\t######## testing multiprocessing.Condition\n'
test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
print '\n\t######## testing condition managed by a server process\n'
test_conditionspeed(multiprocessing.Process, manager.Condition())
gc.enable()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()