16.6. multiprocessing — Process-based « threading » interface

Nouveau dans la version 2.6.

16.6.1. Introduction

multiprocessing est un paquet qui permet l’instanciation de processus via la même API que le module threading. Le paquet multiprocessing offre à la fois des possibilités de programmation concurrente locale ou à distance, contournant les problèmes du Global Interpreter Lock en utilisant des processus plutôt que des fils d’exécution. Ainsi, le module multiprocessing permet au développeur de bénéficier entièrement des multiples processeurs sur une machine. Il tourne à la fois sur les systèmes Unix et Windows.

The multiprocessing module also introduces APIs which do not have analogs in the threading module. A prime example of this is the Pool object which offers a convenient means of parallelizing the execution of a function across multiple input values, distributing the input data across processes (data parallelism). The following example demonstrates the common practice of defining such functions in a module so that child processes can successfully import that module. This basic example of data parallelism using Pool,

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))

affiche sur la sortie standard :

[1, 4, 9]

16.6.1.1. La classe Process

Dans le module multiprocessing, les processus sont instanciés en créant un objet Process et en appelant sa méthode start(). La classe Process suit la même API que threading.Thread. Un exemple trivial d’un programme multi-processus est :

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Pour afficher les IDs des processus impliqués, voici un exemple plus étoffé :

from multiprocessing import Process
import os

def info(title):
    print title
    print 'module name:', __name__
    if hasattr(os, 'getppid'):  # only available on Unix
        print 'parent process:', os.getppid()
    print 'process id:', os.getpid()

def f(name):
    info('function f')
    print 'hello', name

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

For an explanation of why (on Windows) the if __name__ == '__main__' part is necessary, see Lignes directrices de programmation.

16.6.1.2. Échange d’objets entre les processus

multiprocessing gère deux types de canaux de communication entre les processus :

Queues

The Queue class is a near clone of Queue.Queue. For example:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()    # prints "[42, None, 'hello']"
    p.join()

Les queues peuvent être utilisées par plusieurs fils d’exécution ou processus.

Pipes

La fonction Pipe() renvoie une paire d’objets de connexion connectés à un tube qui est par défaut à double-sens. Par exemple :

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print parent_conn.recv()   # prints "[42, None, 'hello']"
    p.join()

Les deux objets de connexion renvoyés par Pipe() représentent les deux bouts d’un tube. Chaque objet de connexion possède (entre autres) des méthodes send() et recv(). Notez que les données d’un tube peuvent être corrompues si deux processus (ou fils d’exécution) essaient de lire ou d’écrire sur le même bout du tube en même temps. Évidemment il n’y a pas de risque de corruption si les processus utilisent deux bouts différents en même temps.

16.6.1.3. Synchronisation entre processus

multiprocessing contient des équivalents à toutes les primitives de synchronisation de threading. Par exemple il est possible d’utiliser un verrou pour s’assurer qu’un seul processus à la fois écrit sur la sortie standard :

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    print 'hello world', i
    l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

Sans le verrou, les sorties des différents processus risquent d’être mélangées.

16.6.1.4. Partager un état entre les processus

Comme mentionné plus haut, il est généralement préférable d’éviter autant que possible d’utiliser des états partagés en programmation concurrente. C’est particulièrement vrai quand plusieurs processus sont utilisés.

Cependant, si vous devez réellement partager des données, multiprocessing permet de le faire de deux manières.

Mémoire partagée

Les données peuvent être stockées dans une mémoire partagée en utilisant des Value ou des Array. Par exemple, le code suivant :

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print num.value
    print arr[:]

affiche :

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Les arguments 'd' et 'i' utilisés à la création des num et arr` sont des codes de types tels qu’utilisés par le module array : 'd' indique un flottant double-précision et 'i' indique un entier signé. Ces objets partagés seront sûr d’utilisation entre processus et fils d’exécution.

Pour plus de flexibilité dans l’utilisation de mémoire partagée, vous pouvez utiliser le module multiprocessing.sharedctypes qui permet la création d’objets arbitraires ctypes alloués depuis la mémoire partagée.

Processus serveur

Un objet gestionnaire renvoyé par Manager() contrôle un processus serveur qui détient les objets Python et autorise les autres processus à les manipuler à l’aide de mandataires.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value and Array. For example,

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    l = manager.list(range(10))

    p = Process(target=f, args=(d, l))
    p.start()
    p.join()

    print d
    print l

affiche :

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Les processus serveurs de gestionnaires sont plus flexibles que les mémoires partagées parce qu’ils peuvent gérer des types d’objets arbitraires. Aussi, un gestionnaire unique peut être partagé par les processus sur différentes machines à travers le réseau. Cependant, ils sont plus lents que les mémoires partagées.

16.6.1.5. Utiliser un réservoir de workers

La classe Pool représente une pool de processus de travail. Elle possède des méthodes qui permettent aux tâches d’être déchargées vers les processus de travail de différentes manières.

Par exemple :

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    # print "[0, 1, 4,..., 81]"
    print pool.map(f, range(10))

    # print same numbers in arbitrary order
    for i in pool.imap_unordered(f, range(10)):
        print i

    # evaluate "f(20)" asynchronously
    res = pool.apply_async(f, (20,))      # runs in *only* one process
    print res.get(timeout=1)              # prints "400"

    # evaluate "os.getpid()" asynchronously
    res = pool.apply_async(os.getpid, ()) # runs in *only* one process
    print res.get(timeout=1)              # prints the PID of that process

    # launching multiple evaluations asynchronously *may* use more processes
    multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
    print [res.get(timeout=1) for res in multiple_results]

    # make a single worker sleep for 10 secs
    res = pool.apply_async(time.sleep, (10,))
    try:
        print res.get(timeout=1)
    except TimeoutError:
        print "We lacked patience and got a multiprocessing.TimeoutError"

Notez que les méthodes d’une pool ne devraient être utilisées que par le processus qui l’a créée.

Note

Functionality within this package requires that the __main__ module be importable by the children. This is covered in Lignes directrices de programmation however it is worth pointing out here. This means that some examples, such as the Pool examples will not work in the interactive interpreter. For example:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(Si vous essayez ce code, il affichera trois traces d’appels complètes entrelacées de manière semi-aléatoire, et vous aurez alors à stopper le processus maître.)

16.6.2. Référence

Le paquet multiprocessing reproduit en grande partie l’API du module threading.

16.6.2.1. Process et exceptions

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})

Les objets process représentent une activité exécutée dans un processus séparé. La classe Process a des équivalents à toutes les méthodes de threading.Thread.

The constructor should always be called with keyword arguments. group should always be None; it exists solely for compatibility with threading.Thread. target is the callable object to be invoked by the run() method. It defaults to None, meaning nothing is called. name is the process name. By default, a unique name is constructed of the form “Process-N1:N2:…:Nk” where N1,N2,…,Nk is a sequence of integers whose length is determined by the generation of the process. args is the argument tuple for the target invocation. kwargs is a dictionary of keyword arguments for the target invocation. By default, no arguments are passed to target.

Si une sous-classe redéfinit le constructeur, elle doit s’assurer d’invoquer le constructeur de la classe de base (Process.__init__()) avant de faire autre chose du processus.

run()

Méthode représentant l’activité du processus.

Vous pouvez redéfinir cette méthode dans une sous-classe. La méthode standard run() invoque l’objet appelable passé au constructeur comme argument target, si fourni, avec les arguments séquentiels et nommés respectivement pris depuis les paramètres args et kwargs.

start()

Démarre l’activité du processus.

Elle doit être appelée au plus une fois par objet processus. Elle s’arrange pour que la méthode run() de l’objet soit invoquée dans un processus séparé.

join([timeout])

Block the calling thread until the process whose join() method is called terminates or until the optional timeout occurs.

If timeout is None then there is no timeout.

join peut être appelée plusieurs fois sur un même processus.

Un processus ne peut pas s’attendre lui-même car cela causerait un interblocage. C’est une erreur d’essayer d’attendre un processus avant qu’il ne soit démarré.

name

The process’s name.

The name is a string used for identification purposes only. It has no semantics. Multiple processes may be given the same name. The initial name is set by the constructor.

is_alive()

Renvoie vrai si le processus est en vie, faux sinon.

Grossièrement, un objet processus est en vie depuis le moment où la méthode start() finit de s’exécuter jusqu’à ce que le processus fils se termine.

daemon

L’option daemon du processus, une valeur booléenne. L’option doit être réglée avant que la méthode start() ne soit appelée.

La valeur initiale est héritée par le processus créateur.

Quand un processus se ferme, il tente de terminer tous ses processus enfants daemon.

Notez qu’un processus daemon n’est pas autorisé à créer des processus fils. Sinon un processus daemon laisserait ses enfants orphelins lorsqu’il se termine par la fermeture de son parent. De plus, ce ne sont pas des daemons ou services Unix, ce sont des processus normaux qui seront terminés (et non attendus) si un processus non daemon se ferme.

En plus de l’API threading.Thread, les objets Process supportent aussi les attributs et méthodes suivants :

pid

Renvoie l’ID du processus. Avant que le processus ne soit lancé, la valeur est None.

exitcode

Le code de fermeture de l’enfant. La valeur est None si le processus ne s’est pas encore terminé. Une valeur négative -N indique que le fils a été terminé par un signal N.

authkey

La clé d’authentification du processus (une chaîne d’octets).

Quand multiprocessing est initialisé, une chaîne aléatoire est assignée au processus principal, en utilisant os.urandom().

Quand un objet Process est créé, il hérité de la clé d’authentification de son parent, bien que cela puisse être changé à l’aide du paramètre authkey pour une autre chaîne d’octets.

Voir Clés d’authentification.

terminate()

Termine le processus. Sous Unix cela est réalisé à l’aide d’un signal SIGTERM, sous Windows TerminateProcess() est utilisé. Notez que les gestionnaires de sortie, les clauses finally etc. ne sont pas exécutées.

Notez que les descendants du processus ne seront pas terminés – ils deviendront simplement orphelins.

Avertissement

Si cette méthode est utilisée quand le processus associé utilise un tube ou une queue, alors le tube ou la queue sont susceptibles d’être corrompus et peuvent devenir inutilisables par les autres processus. De façon similaire, si le processus a acquis un verrou, un sémaphore ou autre, alors le terminer est susceptible de provoquer des blocages dans les autres processus.

Notez que les méthodes start(), join(), is_alive(), terminate() et exitcode ne devraient être appelées que par le processus ayant créé l’objet process.

Exemple d’utilisation de quelques méthodes de Process :

>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print p, p.is_alive()
<Process(Process-1, initial)> False
>>> p.start()
>>> print p, p.is_alive()
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print p, p.is_alive()
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.BufferTooShort

Exception levée par Connection.recv_bytes_into() quand l’objet tampon fourni est trop petit pour le message à lire.

Si e est une instance de BufferTooShort alors e.args[0] donnera un message sous forme d’une chaîne d’octets.

16.6.2.2. Tubes (pipes) et Queues

Quand de multiples processus sont utilisés, de l’échange de messages est souvent mis en place pour la communication entre processus et éviter d’avoir à utiliser des primitives de synchronisation telles que des verrous.

Pour échanger des messages vous pouvez utiliser un Pipe() (pour une connexion entre deux processus) ou une queue (qui autorise de multiples producteurs et consommateurs).

The Queue, multiprocessing.queues.SimpleQueue and JoinableQueue types are multi-producer, multi-consumer FIFO queues modelled on the Queue.Queue class in the standard library. They differ in that Queue lacks the task_done() and join() methods introduced into Python 2.5’s Queue.Queue class.

Si vous utilisez JoinableQueue alors vous devez appeler JoinableQueue.task_done() pour chaque tâche retirée de la queue, sans quoi le sémaphore utilisé pour compter le nombre de tâches non accomplies pourra éventuellement déborder, levant une exception.

Notez que vous pouvez aussi créer une queue partagée en utilisant un objet gestionnaire – voir Gestionnaires.

Note

multiprocessing uses the usual Queue.Empty and Queue.Full exceptions to signal a timeout. They are not available in the multiprocessing namespace so you need to import them from Queue.

Note

Quand un objet est placé dans une queue, l’objet est sérialisé par pickle et un fil d’exécution en arrière-plan transmettra ensuite les données sérialisées sur un tube sous-jacent. Cela a certaines conséquences qui peuvent être un peu surprenantes, mais ne devrait causer aucune difficultés pratiques – si elles vous embêtent vraiment, alors vous pouvez à la place utiliser une queue créée avec un manager.

  1. After putting an object on an empty queue there may be an infinitesimal delay before the queue’s empty() method returns False and get_nowait() can return without raising Queue.Empty.

  2. Si plusieurs processus placent des objets dans la queue, il est possible pour les objets d’être reçus de l’autre côté dans le désordre. Cependant, les objets placés par un même processus seront toujours récupérés dans l’ordre attendu.

Avertissement

If a process is killed using Process.terminate() or os.kill() while it is trying to use a Queue, then the data in the queue is likely to become corrupted. This may cause any other process to get an exception when it tries to use the queue later on.

Avertissement

Comme mentionné plus haut, si un processus fils a placé des éléments dans la queue (et qu’il n’a pas utilisé JoinableQueue.cancel_join_thread), alors le processus ne se terminera pas tant que les éléments placés dans le tampon n’auront pas été transmis au tube.

Cela signifie que si vous essayez d’attendre ce processus vous pouvez obtenir un interblocage, à moins que vous ne soyez sûr que tous les éléments placés dans la queue ont été consommés. De même, si le processus fils n’est pas un daemon alors le processus parent pourrait bloquer à la fermeture quand il tentera d’attendre tous ses enfants non daemons.

Notez que la queue créée à l’aide d’un gestionnaire n’a pas ce problème. Voir Lignes directrices de programmation.

Pour un exemple d’utilisation de queues pour de la communication entre les processus, voir Exemples.

multiprocessing.Pipe([duplex])

Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe.

Si duplex vaut True (par défaut), alors le tube est bidirectionnel. Si duplex vaut False il est unidirectionnel : conn1 ne peut être utilisé que pour recevoir des messages et conn2 que pour en envoyer.

class multiprocessing.Queue([maxsize])

Renvoie une queue partagée entre les processus utilisant un tube et quelques verrous/sémaphores. Quand un processus place initialement un élément sur la queue, un fil d’exécution feeder est démarré pour transférer les objets du tampon vers le tube.

The usual Queue.Empty and Queue.Full exceptions from the standard library’s Queue module are raised to signal timeouts.

Queue implements all the methods of Queue.Queue except for task_done() and join().

qsize()

Renvoie la taille approximative de la queue. Ce nombre n’est pas fiable en raison des problématiques de multithreading et multiprocessing.

Notez que cela peut lever une NotImplementedError sous les plateformes Unix telles que Mac OS X où sem_getvalue() n’est pas implémentée.

empty()

Renvoie True si la queue est vide, False sinon. Cette valeur n’est pas fiable en raison des problématiques de multithreading et multiprocessing.

full()

Renvoie True si la queue est pleine, False sinon. Cette valeur n’est pas fiable en raison des problématiques de multithreading et multiprocessing.

put(obj[, block[, timeout]])

Put obj into the queue. If the optional argument block is True (the default) and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Queue.Full exception if no free slot was available within that time. Otherwise (block is False), put an item on the queue if a free slot is immediately available, else raise the Queue.Full exception (timeout is ignored in that case).

put_nowait(obj)

Équivalent à put(obj, False).

get([block[, timeout]])

Remove and return an item from the queue. If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the Queue.Empty exception (timeout is ignored in that case).

get_nowait()

Équivalent à get(False).

Queue has a few additional methods not found in Queue.Queue. These methods are usually unnecessary for most code:

close()

Indique que plus aucune donnée ne peut être placée sur la queue par le processus courant. Le fil d’exécution en arrière-plan se terminera quand il aura transféré toutes les données du tampon vers le tube. Elle est appelée automatiquement quand la queue est collectée par le ramasse-miettes.

join_thread()

Attend le fil d’exécution d’arrière-plan. Elle peut seulement être utilisée une fois que close() a été appelée. Elle bloque jusqu’à ce que le fil d’arrière-plan se termine, assurant que toutes les données du tampon ont été transmises au tube.

Par défaut si un processus n’est pas le créateur de la queue alors à la fermeture elle essaiera d’attendre le fil d’exécution d’arrière-plan de la queue. Le processus peut appeler cancel_join_thread() pour que join_thread() ne fasse rien.

cancel_join_thread()

Empêche join_thread() de bloquer. En particulier, cela empêche le fil d’arrière-plan d’être attendu automatiquement quand le processus se ferme – voir join_thread().

Un meilleur nom pour cette méthode pourrait être allow_exit_without_flush(). Cela peut provoquer des pertes de données placées dans la queue, et vous ne devriez certainement pas avoir besoin de l’utiliser. Elle n’est là que si vous souhaitez terminer immédiatement le processus sans transférer les données du tampon, et que vous ne vous inquiétez pas de perdre des données.

Note

Le fonctionnement de cette classe requiert une implémentation de sémaphore partagé sur le système d’exploitation hôte. Sans cela, la fonctionnalité sera désactivée et la tentative d’instancier une Queue lèvera une ImportError. Voir bpo-3770 pour plus d’informations. Cette remarque reste valable pour les autres types de queues spécialisées définies par la suite.

class multiprocessing.queues.SimpleQueue

It is a simplified Queue type, very close to a locked Pipe.

empty()

Renvoie True si la queue est vide, False sinon.

get()

Supprime et donne un élément de la queue.

put(item)

Place item dans la queue.

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.

task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Lève une exception ValueError si appelée plus de fois qu’il y avait d’éléments dans la file.

join()

Bloque jusqu’à ce que tous les éléments de la queue aient été récupérés et traités.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

16.6.2.3. Divers

multiprocessing.active_children()

Renvoie la liste de tous les enfants vivants du processus courant.

Appeler cette méthode provoque l’effet de bord d’attendre tout processus qui n’a pas encore terminé.

multiprocessing.cpu_count()

Return the number of CPUs in the system. May raise NotImplementedError.

multiprocessing.current_process()

Renvoie l’objet Process correspondant au processus courant.

Un analogue à threading.current_thread().

multiprocessing.freeze_support()

Ajoute le support des programmes utilisant multiprocessing qui ont été gelés pour produire un exécutable Windows. (Testé avec py2exe, PyInstaller et cx_Freeze.)

Cette fonction doit être appelée juste après la ligne if __name__ == '__main__' du module principal. Par exemple :

from multiprocessing import Process, freeze_support

def f():
    print 'hello world!'

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

Si la ligne freeze_support() est omise, alors tenter de lancer l’exécutable gelé lèvera une RuntimeError.

Appeler freeze_support() n’a pas d’effet quand elle est invoquée sur un système d’exploitation autre que Windows. De plus, si le module est lancé normalement par l’interpréteur Python sous Windows (le programme n’a pas été gelé), alors freeze_support() n’a pas d’effet.

multiprocessing.set_executable()

Définit le chemin de l’interpréteur Python à utiliser pour démarrer un processus fils. (Par défaut sys.executable est utilisé). Les intégrateurs devront probablement faire quelque chose comme :

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

before they can create child processes. (Windows only)

16.6.2.4. Objets de connexions

Les objets de connexion autorisent l’envoi et la réception d’objets sérialisables ou de chaînes de caractères. Ils peuvent être vus comme des interfaces de connexion (sockets) connectées orientées messages.

Connection objects are usually created using Pipe – see also Auditeurs et Clients.

class Connection
send(obj)

Envoie un objet sur l’autre bout de la connexion, qui devra être lu avec recv().

The object must be picklable. Very large pickles (approximately 32 MB+, though it depends on the OS) may raise a ValueError exception.

recv()

Return an object sent from the other end of the connection using send(). Blocks until there is something to receive. Raises EOFError if there is nothing left to receive and the other end was closed.

fileno()

Renvoie le descripteur de fichier ou identifiant utilisé par la connexion.

close()

Ferme la connexion.

Elle est appelée automatiquement quand la connexion est collectée par le ramasse-miettes.

poll([timeout])

Renvoie vrai ou faux selon si des données sont disponibles à la lecture.

Si timeout n’est pas spécifié la méthode renverra immédiatement. Si timeout est un nombre alors il spécifie le temps maximum de blocage en secondes. Si timeout est None, un temps d’attente infini est utilisé.

send_bytes(buffer[, offset[, size]])

Send byte data from an object supporting the buffer interface as a complete message.

If offset is given then data is read from that position in buffer. If size is given then that many bytes will be read from buffer. Very large buffers (approximately 32 MB+, though it depends on the OS) may raise a ValueError exception

recv_bytes([maxlength])

Renvoie un message complet de données binaires envoyées depuis l’autre bout de la connexion comme une chaîne de caractères. Bloque jusqu’à ce qu’il y ait quelque chose à recevoir. Lève une EOFError s’il ne reste rien à recevoir et que l’autre côté de la connexion a été fermé.

If maxlength is specified and the message is longer than maxlength then IOError is raised and the connection will no longer be readable.

recv_bytes_into(buffer[, offset])

Lit et stocke dans buffer un message complet de données binaires envoyées depuis l’autre bout de la connexion et renvoie le nombre d’octets du message. Bloque jusqu’à ce qu’il y ait quelque chose à recevoir. Lève une EOFError s’il ne reste rien à recevoir et que l’autre côté de la connexion a été fermé.

buffer must be an object satisfying the writable buffer interface. If offset is given then the message will be written into the buffer from that position. Offset must be a non-negative integer less than the length of buffer (in bytes).

Si le tampon est trop petit une exception BufferTooShort est levée et le message complet est accessible via e.args[0]e est l’instance de l’exception.

Par exemple :

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes('thank you')
>>> a.recv_bytes()
'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

Avertissement

La méthode Connection.recv() désérialise automatiquement les données qu’elle reçoit, ce qui peut être un risque de sécurité à moins que vous ne fassiez réellement confiance au processus émetteur du message.

Par conséquent, à moins que l’objet de connexion soit instancié par Pipe(), vous ne devriez uniquement utiliser les méthodes recv() et send() après avoir effectué une quelconque forme d’authentification. Voir Clés d’authentification.

Avertissement

Si un processus est tué pendant qu’il essaye de lire ou écrire sur le tube, alors les données du tube ont des chances d’être corrompues, parce qu’il devient impossible d’être sûr d’où se trouvent les bornes du message.

16.6.2.5. Primitives de synchronisation

Généralement les primitives de synchronisation ne sont pas nécessaire dans un programme multi-processus comme elles le sont dans un programme multi-fils d’exécution. Voir la documentation du module threading.

Notez que vous pouvez aussi créer des primitives de synchronisation en utilisant un objet gestionnaire – voir Gestionnaires.

class multiprocessing.BoundedSemaphore([value])

Un objet sémaphore lié : un analogue proche de threading.BoundedSemaphore.

A solitary difference from its close analog exists: its acquire method’s first argument is named block and it supports an optional second argument timeout, as is consistent with Lock.acquire().

Note

Sur Mac OS X, elle n’est pas distinguable de la classe Semaphore parce que sem_getvalue() n’est pas implémentée sur cette plateforme.

class multiprocessing.Condition([lock])

A condition variable: a clone of threading.Condition.

Si lock est spécifié il doit être un objet Lock ou RLock du module multiprocessing.

class multiprocessing.Event

A clone of threading.Event. This method returns the state of the internal semaphore on exit, so it will always return True except if a timeout is given and the operation times out.

Modifié dans la version 2.7: Previously, the method always returned None.

class multiprocessing.Lock

Un verrou non récursif : un analogue proche de threading.Lock. Une fois que le processus ou le fil d’exécution a acquis un verrou, les tentatives suivantes d’acquisition depuis n’importe quel processus ou fil d’exécution bloqueront jusqu’à ce qu’il soit libéré ; n’importe quel processus ou fil peut le libérer. Les concepts et comportements de threading.Lock qui s’appliquent aux fils d’exécution sont répliqués ici dans multiprocessing.Lock et s’appliquent aux processus et aux fils d’exécution, à l’exception de ce qui est indiqué.

Notez que Lock est en fait une fonction factory qui renvoie une instance de multiprocessing.synchronize.Lock initialisée avec un contexte par défaut.

Lock supporte le protocole context manager et peut ainsi être utilisé avec une instruction with.

acquire(block=True, timeout=None)

Acquiert un verrou, bloquant ou non bloquant.

Avec l’argument block à True (par défaut), l’appel de méthode bloquera jusqu’à ce que le verrou soit dans déverrouillé, puis le verrouillera avant de renvoyer True. Notez que le nom de ce premier argument diffère de celui de threading.Lock.acquire().

Avec l’argument block à False, l’appel de méthode ne bloque pas. Si le verrou est actuellement verrouillé, renvoie False ; autrement verrouille le verrou et renvoie True.

When invoked with a positive, floating-point value for timeout, block for at most the number of seconds specified by timeout as long as the lock can not be acquired. Invocations with a negative value for timeout are equivalent to a timeout of zero. Invocations with a timeout value of None (the default) set the timeout period to infinite. The timeout argument has no practical implications if the block argument is set to False and is thus ignored. Returns True if the lock has been acquired or False if the timeout period has elapsed. Note that the timeout argument does not exist in this method’s analog, threading.Lock.acquire().

release()

Libère un verrou. Elle peut être appelée depuis n’importe quel processus ou fil d’exécution, pas uniquement le processus ou le fil qui a acquis le verrou à l’origine.

Le comportement est le même que threading.Lock.release() excepté que lorsque la méthode est appelée sur un verrou déverrouillé, une ValueError est levée.

class multiprocessing.RLock

Un objet verrou récursif : un analogue proche de threading.RLock. Un verrou récursif doit être libéré par le processus ou le fil d’exécution qui l’a acquis. Quand un processus ou un fil acquiert un verrou récursif, le même processus/fil peut l’acquérir à nouveau sans bloquer ; le processus/fil doit le libérer autant de fois qu’il l’acquiert.

Notez que RLock est en fait une fonction factory qui renvoie une instance de multiprocessing.synchronize.RLock initialisée avec un contexte par défaut.

RLock supporte le protocole context manager et peut ainsi être utilisée avec une instruction with.

acquire(block=True, timeout=None)

Acquiert un verrou, bloquant ou non bloquant.

Quand invoqué avec l’argument block à True, bloque jusqu’à ce que le verrou soit déverrouillé (n’appartenant à aucun processus ou fil d’exécution) sauf s’il appartient déjà au processus ou fil d’exécution courant. Le processus ou fil d’exécution courant prend la possession du verrou (s’il ne l’a pas déjà) et incrémente d’un le niveau de récursion du verrou, renvoyant ainsi True. Notez qu’il y a plusieurs différences dans le comportement de ce premier argument comparé à l’implémentation de threading.RLock.acquire(), à commencer par le nom de l’argument lui-même.

Quand invoqué avec l’argument block à False, ne bloque pas. Si le verrou est déjà acquis (et possédé) par un autre processus ou fil d’exécution, le processus/fil courant n’en prend pas la possession et le niveau de récursion n’est pas incrémenté, résultant en une valeur de retour à False. Si le verrou est déverrouillé, le processus/fil courant en prend possession et incrémente son niveau de récursion, renvoyant True.

Use and behaviors of the timeout argument are the same as in Lock.acquire(). Note that the timeout argument does not exist in this method’s analog, threading.RLock.acquire().

release()

Libère un verrou, décrémentant son niveau de récursion. Si après la décrémentation le niveau de récursion est zéro, réinitialise le verrou à un état déverrouillé (n’appartenant à aucun processus ou fil d’exécution) et si des processus/fils attendent que le verrou se déverrouillé, autorise un seul d’entre-eux à continuer. Si après cette décrémentation le niveau de récursion est toujours strictement positif, le verrou reste verrouillé et propriété du processus/fil appelant.

N’appelez cette méthode que si le processus ou fil d’exécution appelant est propriétaire du verrou. Une AssertionError est levée si cette méthode est appelée par un processus/fil autre que le propriétaire ou si le verrou n’est pas verrouillé (possédé). Notez que le type d’exception levé dans cette situation diffère du comportement de threading.RLock.release().

class multiprocessing.Semaphore([value])

Un objet sémaphore, proche analogue de threading.Semaphore.

A solitary difference from its close analog exists: its acquire method’s first argument is named block and it supports an optional second argument timeout, as is consistent with Lock.acquire().

Note

The acquire() method of BoundedSemaphore, Lock, RLock and Semaphore has a timeout parameter not supported by the equivalents in threading. The signature is acquire(block=True, timeout=None) with keyword parameters being acceptable. If block is True and timeout is not None then it specifies a timeout in seconds. If block is False then timeout is ignored.

Sous Mac OS X, sem_timedwait n’est pas supporté, donc appeler acquire() avec un temps d’exécution limité émulera le comportement de cette fonction en utilisant une boucle d’attente.

Note

Si le signal SIGINT généré par un Ctrl-C survient pendant que le fil d’exécution principal est bloqué par un appel à BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() ou Condition.wait(), l’appel sera immédiatement interrompu et une KeyboardInterrupt sera levée.

Cela diffère du comportement de threading où le SIGINT est ignoré tant que les appels bloquants sont en cours.

Note

Certaines des fonctionnalités de ce paquet requièrent une implémentation fonctionnelle de sémaphores partagés sur le système hôte. Sans cela, le module multiprocessing.synchronize sera désactivé, et les tentatives de l’importer lèveront une ImportError. Voir bpo-3770 pour plus d’informations.

16.6.2.6. Objets ctypes partagés

Il est possible de créer des objets partagés utilisant une mémoire partagée pouvant être héritée par les processus enfants.

multiprocessing.Value(typecode_or_type, *args[, lock])

Return a ctypes object allocated from shared memory. By default the return value is actually a synchronized wrapper for the object.

typecode_or_type détermine le type de l’objet renvoyé : il s’agit soit d’un type ctype soit d’un caractère typecode tel qu’utilisé par le module array. *args est passé au constructeur de ce type.

Si lock vaut True (par défaut), alors un nouveau verrou récursif est créé pour synchroniser l’accès à la valeur. Si lock est un objet Lock ou RLock alors il sera utilisé pour synchroniser l’accès à la valeur. Si lock vaut False, l’accès à l’objet renvoyé ne sera pas automatiquement protégé par un verrou, donc il ne sera pas forcément « process-safe »

Les opérations telles que += qui impliquent une lecture et une écriture ne sont pas atomique. Ainsi si vous souhaitez par exemple réaliser une incrémentation atomique sur une valeur partagée, vous ne pouvez pas simplement faire :

counter.value += 1

En supposant que le verrou associé est récursif (ce qui est le cas par défaut), vous pouvez à la place faire :

with counter.get_lock():
    counter.value += 1

Notez que lock est un argument keyword-only.

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

Renvoie un tableau ctypes alloué depuis la mémoire partagée. Par défaut la valeur de retour est en fait un wrapper synchronisé autour du tableau.

typecode_or_type détermine le type des éléments du tableau renvoyé : il s’agit soit d’un type ctype soit d’un caractère typecode tel qu’utilisé par le module array. Si size_or_initialize est un entier, alors il détermine la taille du tableau, et le tableau sera initialisé avec des zéros. Autrement, size*or_initializer est une séquence qui sera utilisée pour initialiser le tableau et dont la taille détermine celle du tableau.

Si lock vaut True (par défaut), alors un nouveau verrou est créé pour synchroniser l’accès à la valeur. Si lock est un objet Lock ou RLock alors il sera utilisé pour synchroniser l’accès à la valeur. Si lock vaut False, l’accès à l’objet renvoyé ne sera pas automatiquement protégé par un verrou, donc il ne sera pas forcément « process-safe »

Notez que lock est un argument keyword-only.

Notez qu’un tableau de ctypes.c_char a ses attributs value et raw qui permettent de l’utiliser pour stocker et récupérer des chaînes de caractères.

16.6.2.6.1. Le module multiprocessing.sharedctypes

Le module multiprocessing.sharedctypes fournit des fonctions pour allouer des objets ctypes depuis la mémoire partagée, qui peuvent être hérités par les processus fils.

Note

Bien qu’il soit possible de stocker un pointeur dans une mémoire partagée, souvenez-vous qu’il référencera un emplacement dans l’espace d’adressage d’un processus particulier. Ainsi, le pointeur risque d’être invalide dans le contexte d’un second processus et déréférencer le pointeur depuis ce second processus pourrait causer un crash.

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

Renvoie un tableau ctypes alloué depuis la mémoire partagée.

typecode_or_type détermine le type des éléments du tableau renvoyé : il s’agit soit d’un type ctype soit d’un caractère codant le type des éléments du tableau (typecode) tel qu’utilisé par le module array. Si size_or_initialize est un entier, alors il détermine la taille du tableau, et le tableau sera initialisé avec des zéros. Autrement, size*or_initializer est une séquence qui sera utilisée pour initialiser le tableau et dont la taille détermine celle du tableau.

Notez que définir ou récupérer un élément est potentiellement non atomique – utilisez plutôt Array() pour vous assurer de synchroniser automatiquement avec un verrou.

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

Renvoie un objet ctypes alloué depuis la mémoire partagée.

typecode_or_type détermine le type de l’objet renvoyé : il s’agit soit d’un type ctype soit d’un caractère typecode tel qu’utilisé par le module array. *args est passé au constructeur de ce type.

Notez que définir ou récupérer un élément est potentiellement non atomique – utilisez plutôt Value() pour vous assurer de synchroniser automatiquement avec un verrou.

Notez qu’un tableau de ctypes.c_char a ses attributs value et raw qui permettent de l’utiliser pour stocker et récupérer des chaînes de caractères – voir la documentation de ctypes.

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *args[, lock])

Identique à RawArray() à l’exception que suivant la valeur de lock un wrapper de synchronisation process-safe pourra être renvoyé à la place d’un tableau ctypes brut.

Si lock vaut True (par défaut), alors un nouveau verrou est créé pour synchroniser l’accès à la valeur. Si lock est un objet Lock ou RLock alors il sera utilisé pour synchroniser l’accès à la valeur. Si lock vaut False, l’accès à l’objet renvoyé ne sera pas automatiquement protégé par un verrou, donc il ne sera pas forcément « process-safe »

Notez que lock est un argument keyword-only.

multiprocessing.sharedctypes.Value(typecode_or_type, *args[, lock])

Identique à RawValue() à l’exception que suivant la valeur de lock un wrapper de synchronisation process-safe pourra être renvoyé à la place d’un objet ctypes brut.

Si lock vaut True (par défaut), alors un nouveau verrou est créé pour synchroniser l’accès à la valeur. Si lock est un objet Lock ou RLock alors il sera utilisé pour synchroniser l’accès à la valeur. Si lock vaut False, l’accès à l’objet renvoyé ne sera pas automatiquement protégé par un verrou, donc il ne sera pas forcément « process-safe »

Notez que lock est un argument keyword-only.

multiprocessing.sharedctypes.copy(obj)

Renvoie un objet ctypes alloué depuis la mémoire partagée, qui est une copie de l’objet ctypes obj.

multiprocessing.sharedctypes.synchronized(obj[, lock])

Renvoie un wrapper process-safe autour de l’objet ctypes qui utilise lock pour synchroniser l’accès. Si lock est None (par défaut), un objet multiprocessing.RLock est créé automatiquement.

Un wrapper synchronisé aura deux méthodes en plus de celles de l’objet qu’il enveloppe : get_obj() renvoie l’objet wrappé et get_lock() renvoie le verrou utilisé pour la synchronisation.

Notez qu’accéder à l’objet ctypes à travers le wrapper peut s’avérer beaucoup plus lent qu’accéder directement à l’objet ctypes brut.

Le tableau ci-dessous compare la syntaxe de création des objets ctypes partagés depuis une mémoire partagée avec la syntaxe normale ctypes. (Dans le tableau, MyStruct est une sous-classe quelconque de ctypes.Structure.)

ctypes

sharedctypes utilisant un type

sharedctypes utilisant un typecode

c_double(2.4)

RawValue(c_double, 2.4)

RawValue(“d”, 2.4)

MyStruct(4, 6)

RawValue(MyStruct, 4, 6)

(c_short * 7)()

RawArray(c_short, 7)

RawArray(“h”, 7)

(c_int * 3)(9, 2, 8)

RawArray(c_int, (9, 2, 8))

RawArray(“i”, (9, 2, 8))

Ci-dessous un exemple où des objets ctypes sont modifiés par un processus fils :

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', 'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print n.value
    print x.value
    print s.value
    print [(a.x, a.y) for a in A]

Les résultats affichés sont :

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

16.6.2.7. Gestionnaires

Managers provide a way to create data which can be shared between different processes. A manager object controls a server process which manages shared objects. Other processes can access the shared objects by using proxies.

multiprocessing.Manager()

Renvoie un objet SyncManager démarré qui peut être utilisé pour partager des objets entre les processus. L’objet gestionnaire renvoyé correspond à un processus enfant instancié et possède des méthodes pour créer des objets partagés et renvoyer les mandataires correspondants.

Les processus gestionnaires seront arrêtés dès qu’ils seront collectés par le ramasse-miettes ou que leur processus parent se terminera. Les classes gestionnaires sont définies dans le module multiprocessing.managers :

class multiprocessing.managers.BaseManager([address[, authkey]])

Crée un objet BaseManager.

Une fois créé il faut appeler start() ou get_server().serve_forever() pour assurer que l’objet gestionnaire référence un processus gestionnaire démarré.

address est l’adresse sur laquelle le processus gestionnaire écoute pour de nouvelles connexions. Si address est None, une adresse arbitraire est choisie.

authkey is the authentication key which will be used to check the validity of incoming connections to the server process. If authkey is None then current_process().authkey. Otherwise authkey is used and it must be a string.

start([initializer[, initargs]])

Démarre un sous-processus pour démarrer le gestionnaire. Si initializer n’est pas None alors le sous-processus appellera initializer(*initargs) quand il démarrera.

get_server()

Renvoie un objet Server qui représente le serveur sous le contrôle du gestionnaire. L’objet Server supporte la méthode serve_forever() :

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey='abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server possède en plus un attribut address.

connect()

Connecte un objet gestionnaire local au processus gestionnaire distant :

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 5000), authkey='abc')
>>> m.connect()
shutdown()

Stoppe le processus utilisé par le gestionnaire. Cela est disponible uniquement si start() a été utilisée pour démarrer le processus serveur.

Cette méthode peut être appelée plusieurs fois.

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

Une méthode de classe qui peut être utilisée pour enregistrer un type ou un appelable avec la classe gestionnaire.

typeif est un « type identifier » qui est utilisé pour identifier un type particulier d’objet partagé. Cela doit être une chaîne de caractères.

callable is a callable used for creating objects for this type identifier. If a manager instance will be created using the from_address() classmethod or if the create_method argument is False then this can be left as None.

proxytype est une sous-classe de BaseProxy utilisée pour créer des mandataires autour des objets partagés avec ce typeid. S’il est None, une classe mandataire sera créée automatiquement.

exposed est utilisé pour préciser une séquence de noms de méthodes dont les mandataires pour ce typeid doivent être autorisés à accéder via BaseProxy._callmethod(). (Si exposed est None alors proxytype._exposed_ est utilisé à la place s’il existe.) Dans le cas où aucune liste exposed n’est précisée, toutes les « méthodes publiques » de l’objet partagé seront accessibles. (Ici une « méthode publique » signifie n’importe quel attribut qui possède une méthode __call__() et dont le nom ne commence pas par un '_'.)

method_to_typeid est un tableau associatif utilisé pour préciser le type de retour de ces méthodes exposées qui doivent renvoyer un mandataire. Il associé un nom de méthode à une chaîne de caractères typeid. (Si method_to_typeid est None, proxytype._method_to_typeid_ est utilisé à la place s’il existe). Si le nom d’une méthode n’est pas une clé de ce tableau associatif ou si la valeur associée est None, l’objet renvoyé par la méthode sera une copie de la valeur.

create_method détermine si une méthode devrait être créée avec le nom typeid, qui peut être utilisée pour indiquer au processus serveur de créer un nouvel objet partagé et d’en renvoyer un mandataire. a valeur par défaut est True.

Les instances de BaseManager ont aussi une propriété en lecture seule :

address

L’adresse utilisée par le gestionnaire.

class multiprocessing.managers.SyncManager

Une sous-classe de BaseManager qui peut être utilisée pour la synchronisation entre processus. Des objets de ce type sont renvoyés par multiprocessing.Manager().

It also supports creation of shared lists and dictionaries.

BoundedSemaphore([value])

Crée un objet threading.BoundedSemaphore partagé et renvoie un mandataire pour cet objet.

Condition([lock])

Crée un objet threading.Condition partagé et renvoie un mandataire pour cet objet.

Si lock est fourni alors il doit être un mandataire pour un objet threading.Lock ou threading.RLock.

Event()

Crée un objet threading.Event partagé et renvoie un mandataire pour cet objet.

Lock()

Crée un objet threading.Lock partagé et renvoie un mandataire pour cet objet.

Namespace()

Crée un objet Namespace partagé et renvoie un mandataire pour cet objet.

Queue([maxsize])

Create a shared Queue.Queue object and return a proxy for it.

RLock()

Crée un objet threading.RLock partagé et renvoie un mandataire pour cet objet.

Semaphore([value])

Crée un objet threading.Semaphore partagé et renvoie un mandataire pour cet objet.

Array(typecode, sequence)

Crée un tableau et renvoie un mandataire pour cet objet.

Value(typecode, value)

Crée un objet avec un attribut value accessible en écriture et renvoie un mandataire pour cet objet.

dict()
dict(mapping)
dict(sequence)

Create a shared dict object and return a proxy for it.

list()
list(sequence)

Create a shared list object and return a proxy for it.

Note

Modifications to mutable values or items in dict and list proxies will not be propagated through the manager, because the proxy has no way of knowing when its values or items are modified. To modify such an item, you can re-assign the modified object to the container proxy:

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# reassigning the dictionary, the proxy is notified of the change
lproxy[0] = d
class multiprocessing.managers.Namespace

Un type qui peut être enregistré avec SyncManager.

Un espace de nommage n’a pas de méthodes publiques, mais possède des attributs accessibles en écriture. Sa représentation montre les valeurs de ses attributs.

Cependant, en utilisant un mandataire pour un espace de nommage, un attribut débutant par '_' est un attribut du mandataire et non de l’objet cible :

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print Global
Namespace(x=10, y='hello')

16.6.2.7.1. Gestionnaires personnalisés

Pour créer son propre gestionnaire, il faut créer une sous-classe de BaseManager et utiliser la méthode de classe register() pour enregistrer de nouveaux types ou callables au gestionnaire. Par exemple :

from multiprocessing.managers import BaseManager

class MathsClass(object):
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    manager = MyManager()
    manager.start()
    maths = manager.Maths()
    print maths.add(4, 3)         # prints 7
    print maths.mul(7, 8)         # prints 56

16.6.2.7.2. Utiliser un gestionnaire distant

Il est possible de lancer un serveur gestionnaire sur une machine et d’avoir des clients l’utilisant sur d’autres machines (en supposant que les pare-feus impliqués l’autorisent).

Exécuter les commandes suivantes crée un serveur pour une simple queue partagée à laquelle des clients distants peuvent accéder :

>>> from multiprocessing.managers import BaseManager
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Un client peut accéder au serveur comme suit :

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

Un autre client peut aussi l’utiliser :

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

Les processus locaux peuvent aussi accéder à cette queue, utilisant le code précédent sur le client pour y accéder à distance :

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super(Worker, self).__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

16.6.2.8. Objets mandataires

Un mandataire est un objet qui référence un objet partagé appartenant (supposément) à un processus différent. L’objet partagé est appelé le référent du mandataire. Plusieurs mandataires peuvent avoir un même référent.

A proxy object has methods which invoke corresponding methods of its referent (although not every method of the referent will necessarily be available through the proxy). A proxy can usually be used in most of the same ways that its referent can:

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print l
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print repr(l)
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

Notez qu’appliquer str() à un mandataire renvoie la représentation du référent, alors que repr() renvoie celle du mandataire.

An important feature of proxy objects is that they are picklable so they can be passed between processes. Note, however, that if a proxy is sent to the corresponding manager’s process then unpickling it will produce the referent itself. This means, for example, that one shared object can contain a second:

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print a, b
[[]] []
>>> b.append('hello')
>>> print a, b
[['hello']] ['hello']

Note

Les types de mandataires de multiprocessing n’implémentent rien pour la comparaison par valeurs. Par exemple, on a :

>>> manager.list([1,2,3]) == [1,2,3]
False

Il faut à la place simplement utiliser une copie du référent pour faire les comparaisons.

class multiprocessing.managers.BaseProxy

Les objets mandataires sont des instances de sous-classes de BaseProxy.

_callmethod(methodname[, args[, kwds]])

Appelle et renvoie le résultat d’une méthode du référent du mandataire.

Si proxy est un mandataire sont le référent est obj, alors l’expression :

proxy._callmethod(methodname, args, kwds)

s’évalue comme :

getattr(obj, methodname)(*args, **kwds)

dans le processus du gestionnaire.

La valeur renvoyée sera une copie du résultat de l’appel ou un mandataire sur un nouvel objet partagé – voir l’a documentation de l’argument method_to_typeid de BaseManager.register().

Si une exception est levée par l’appel, elle est relayée par _callmethod(). Si une autre exception est levée par le processus du gestionnaire, elle est convertie en une RemoteError et est levée par _callmethod().

Notez en particulier qu’une exception est levée si methodname n’est pas exposée.

Un exemple d’utilisation de _callmethod() :

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getslice__', (2, 7))   # equiv to `l[2:7]`
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))     # equiv to `l[20]`
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

Renvoie une copie du référent.

Si le référent n’est pas sérialisable, une exception est levée.

__repr__()

Renvoie la représentation de l’objet mandataire.

__str__()

Renvoie la représentation du référent.

16.6.2.8.1. Nettoyage

Un mandataire utilise un callback sous une référence faible de façon à ce que quand il est collecté par le ramasse-miettes, il se désenregistre auprès du gestionnaire qui possède le référent.

Un objet partagé est supprimé par le processus gestionnaire quand plus aucun mandataire ne le référence.

16.6.2.9. Bassins de processus

On peut créer un bassin de processus qui exécuteront les tâches qui lui seront soumises avec la classe Pool.

class multiprocessing.Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])

Un objet process pool qui contrôle un bassin de processus workers auquel sont soumises des tâches. Il supporte les résultats asynchrones avec des timeouts et des callabacks et possède une implémentation parallèle de map.

processes is the number of worker processes to use. If processes is None then the number returned by cpu_count() is used. If initializer is not None then each worker process will call initializer(*initargs) when it starts.

Notez que les méthodes de l’objet pool ne doivent être appelées que par le processus qui l’a créé.

Nouveau dans la version 2.7: maxtasksperchild est le nombre de tâches qu’un processus worker peut accomplir avant de se fermer et d’être remplacé par un worker frais, pour permettre aux ressources inutilisées d’être libérées. Par défaut maxtasksperchild est None, ce qui signifie que le worker vit aussi longtemps que le bassin.

Note

Les processus workers à l’intérieur d’une Pool vivent par défaut aussi longtemps que la queue de travail du bassin. Un modèle fréquent chez d’autres systèmes (tels qu’Apache, mod_wsgi, etc.) pour libérer les ressources détenues par les workers est d’autoriser un worker dans le bassin à accomplir seulement une certaine charge de travail avant de se fermer, se retrouvant nettoyé et remplacé par un nouvelle processus fraîchement lancé. L’argument maxtasksperchild de Pool expose cette fonctionnalité à l’utilisateur final.

apply(func[, args[, kwds]])

Equivalent of the apply() built-in function. It blocks until the result is ready, so apply_async() is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.

apply_async(func[, args[, kwds[, callback]]])

Une variante de la méthode apply() qui renvoie un objet résultat.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.

map(func, iterable[, chunksize])

Un équivalent parallèle à la fonction built-in map() (qui ne supporte cependant qu’un itérable en argument). Elle bloque jusqu’à ce que le résultat soit prêt.

La méthode découpe l’itérable en un nombre de morceaux qu’elle envoie au bassin de processus comme des tâches séparées. La taille (approximative) de ces morceaux peut être précisée en passant à chunksize un entier positif.

map_async(func, iterable[, chunksize[, callback]])

Une variante de la méthode map() qui renvoie un objet résultat.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.

imap(func, iterable[, chunksize])

An equivalent of itertools.imap().

L’argument chunksize est le même que celui utilisé par la méthode map(). Pour de très longs itérables, utiliser une grande valeur pour chunksize peut faire s’exécuter la tâche beaucoup plus rapidement qu’en utilisant la valeur par défaut de 1.

Aussi, si chucksize vaut 1 alors la méthode next() de l’itérateur renvoyé par imap() prend un paramètre optionnel timeout : next(timeout) lève une multiprocessing.TimeoutError si le résultat ne peut pas être renvoyé avant timeout secondes.

imap_unordered(func, iterable[, chunksize])

Identique à imap() si ce n’est que l’ordre des résultats de l’itérateur renvoyé doit être considéré comme arbitraire. (L’ordre n’est garanti que quand il n’y a qu’un worker.)

close()

Empêche de nouvelles tâches d’être envoyées à la pool. Les processus workers se terminent une fois que toutes les tâches ont été complétées.

terminate()

Stoppe immédiatement les processus workers sans finaliser les travaux courants. Quand l’objet pool est collecté par le ramasse-miettes, sa méthode terminate() est appelée immédiatement.

join()

Attend que les processus workers se terminent. Il est nécessaire d’appeler close() ou terminate() avant d’utiliser join().

class multiprocessing.pool.AsyncResult

La classe des résultats renvoyés par Pool.apply_async() et Pool.map_async().

get([timeout])

Renvoie le résultat quand il arrive. Si timeout n’est pas None et que le résultat n’arrive pas avant timeout secondes, une multiprocessing.TimeoutError est levée. Si l’appel distance lève une exception, alors elle est relayée par get().

wait([timeout])

Attend que le résultat soit disponible ou que timeout secondes s’écoulent.

ready()

Renvoie True ou False suivant si la tâche est accomplie.

successful()

Renvoie True ou False suivant si la tâche est accomplie sans lever d’exception. Lève une AssertionError si le résultat n’est pas prêt.

Les exemples suivants présentent l’utilisation d’un bassin de workers :

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    result = pool.apply_async(f, (10,))   # evaluate "f(10)" asynchronously in a single process
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow

    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

    it = pool.imap(f, range(10))
    print it.next()                       # prints "0"
    print it.next()                       # prints "1"
    print it.next(timeout=1)              # prints "4" unless your computer is *very* slow

    result = pool.apply_async(time.sleep, (10,))
    print result.get(timeout=1)           # raises multiprocessing.TimeoutError

16.6.2.10. Auditeurs et Clients

Usually message passing between processes is done using queues or by using Connection objects returned by Pipe().

However, the multiprocessing.connection module allows some extra flexibility. It basically gives a high level message oriented API for dealing with sockets or Windows named pipes, and also has support for digest authentication using the hmac module.

multiprocessing.connection.deliver_challenge(connection, authkey)

Envoie un message généré aléatoirement à l’autre bout de la connexion et attend une réponse.

If the reply matches the digest of the message using authkey as the key then a welcome message is sent to the other end of the connection. Otherwise AuthenticationError is raised.

multiprocessing.connection.answer_challenge(connection, authkey)

Reçoit un message, calcule le condensat du message en utilisant la clé authkey, et envoie le condensat en réponse.

If a welcome message is not received, then AuthenticationError is raised.

multiprocessing.connection.Client(address[, family[, authenticate[, authkey]]])

Attempt to set up a connection to the listener which is using address address, returning a Connection.

Le type de la connexion est déterminé par l’argument family, mais il peut généralement être omis puisqu’il peut être inféré depuis le format d”address. (Voir Formats d’adresses)

If authenticate is True or authkey is a string then digest authentication is used. The key used for authentication will be either authkey or current_process().authkey) if authkey is None. If authentication fails then AuthenticationError is raised. See Clés d’authentification.

class multiprocessing.connection.Listener([address[, family[, backlog[, authenticate[, authkey]]]]])

Un wrapper sur une socket liée ou un tube nommé sous Windows qui écoute pour des connexions.

address est l’adresse à utiliser par la socket liée ou le tube nommé de l’objet auditeur.

Note

Si une adresse “0.0.0.0” est utilisée, l’adresse ne sera pas un point d’accès connectable sous Windows. Si vous avez besoin d’un point d’accès connectable, utilisez “127.0.0.1”.

family est le type de socket (ou tube nommé) à utiliser. Cela peut être l’une des chaînes 'AF_INET' (pour une socket TCP), 'AF_UNIX' (pour une socket Unix) ou 'AF_PIPE' (pour un tube nommé sous Windows). Seulement la première d’entre elles est garantie d’être disponible. Si family est None, la famille est inférée depuis le format d”address. Si address est aussi None, la famille par défaut est utilisée. La famille par défaut est supposée être la plus rapide disponible. Voir Formats d’adresses. Notez que si la family est 'AF_UNIX' et qu”address est None, la socket est créée dans un répertoire temporaire privé créé avec tempfile.mkstemp().

Si l’objet auditeur utilise une socket alors backlog (1 par défaut) es passé à la méthode listen() de la socket une fois qu’elle a été liée.

If authenticate is True (False by default) or authkey is not None then digest authentication is used.

If authkey is a string then it will be used as the authentication key; otherwise it must be None.

If authkey is None and authenticate is True then current_process().authkey is used as the authentication key. If authkey is None and authenticate is False then no authentication is done. If authentication fails then AuthenticationError is raised. See Clés d’authentification.

accept()

Accept a connection on the bound socket or named pipe of the listener object and return a Connection object. If authentication is attempted and fails, then AuthenticationError is raised.

close()

Ferme la socket liée ou le tube nommé de l’objet auditeur. La méthode est appelée automatiquement quand l’auditeur est collecté par le ramasse-miettes. Il est cependant conseillé de l’appeler explicitement.

Les objets auditeurs ont aussi les propriétés en lecture seule suivantes :

address

L’adresse utilisée par l’objet auditeur.

last_accepted

L’adresse depuis laquelle a été établie la dernière connexion. None si aucune n’est disponible.

The module defines the following exceptions:

exception multiprocessing.connection.ProcessError

The base class of all multiprocessing exceptions.

exception multiprocessing.connection.BufferTooShort

Exception levée par Connection.recv_bytes_into() quand l’objet tampon fourni est trop petit pour le message à lire.

exception multiprocessing.connection.AuthenticationError

Raised when there is an authentication error.

exception multiprocessing.connection.TimeoutError

Raised by methods with a timeout when the timeout expires.

Exemples

Le code serveur suivant crée un auditeur qui utilise 'secret password' comme clé d’authentification. Il attend ensuite une connexion et envoie les données au client :

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'
listener = Listener(address, authkey='secret password')

conn = listener.accept()
print 'connection accepted from', listener.last_accepted

conn.send([2.25, None, 'junk', float])

conn.send_bytes('hello')

conn.send_bytes(array('i', [42, 1729]))

conn.close()
listener.close()

Le code suivant se connecte au serveur et en reçoit des données :

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)
conn = Client(address, authkey='secret password')

print conn.recv()                 # => [2.25, None, 'junk', float]

print conn.recv_bytes()            # => 'hello'

arr = array('i', [0, 0, 0, 0, 0])
print conn.recv_bytes_into(arr)     # => 8
print arr                         # => array('i', [42, 1729, 0, 0, 0])

conn.close()

16.6.2.10.1. Formats d’adresses

  • Une adresse 'AF_INET' est un tuple de la forme (hostname, port)hostname est une chaîne et port un entier.

  • Une adresse 'AF_UNIX' est une chaîne représentant un nom de fichier sur le système de fichiers.

  • Une adresse 'AF_PIPE' est une chaîne de la forme

    r'\.\pipe{PipeName}'. Pour utiliser un Client() pour se connecter à un tube nommé sur une machine distante appelée ServerName, il faut plutôt utiliser une adresse de la forme r'\ServerName\pipe{PipeName}'.

Notez que toute chaîne commençant par deux antislashs est considérée par défaut comme l’adresse d’un 'AF_PIPE' plutôt qu’une adresse 'AF_UNIX'.

16.6.2.11. Clés d’authentification

When one uses Connection.recv(), the data received is automatically unpickled. Unfortunately unpickling data from an untrusted source is a security risk. Therefore Listener and Client() use the hmac module to provide digest authentication.

An authentication key is a string which can be thought of as a password: once a connection is established both ends will demand proof that the other knows the authentication key. (Demonstrating that both ends are using the same key does not involve sending the key over the connection.)

Si l’authentification est requise et qu’aucune clé n’est spécifiée alors la valeur de retour de current_process().authkey est utilisée (voir Process). Celle valeur est automatiquement héritée par tout objet Process créé par le processus courant. Cela signifie que (par défaut) tous les processus d’un programme multi-processus partageront une clé d’authentification unique qui peut être utilisée pour mettre en place des connexions entre-eux.

Des clés d’authentification adaptées peuvent aussi être générées par os.urandom().

16.6.2.12. Journalisation

Un certain support de la journalisation est disponible. Notez cependant que le le paquet logging n’utilise pas de verrous partagés entre les processus et il est donc possible (dépendant du type de gestionnaire) que les messages de différents processus soient mélangés.

multiprocessing.get_logger()

Renvoie le journaliseur utilisé par multiprocessing. Si nécessaire, un nouveau sera créé.

À sa première création le journaliseur a pour niveau logging.NOTSET et pas de gestionnaire par défaut. Les messages envoyés à ce journaliseur ne seront pas propagés par défaut au journaliseur principal.

Notez que sous Windows les processus fils n’hériteront que du niveau du journaliseur du processus parent – toute autre personnalisation du journaliseur ne sera pas héritée.

multiprocessing.log_to_stderr()

Cette fonction effectue un appel à get_logger() mais en plus de renvoyer le journaliseur créé par get_logger, elle ajoute un gestionnaire qui envoie la sortie sur sys.stderr en utilisant le format '[%(levelname)s/%(processName)s] %(message)s'.

L’exemple ci-dessous présente une session avec la journalisation activée :

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

In addition to having these two logging functions, the multiprocessing also exposes two additional logging level attributes. These are SUBWARNING and SUBDEBUG. The table below illustrates where theses fit in the normal level hierarchy.

Niveau

Valeur numérique

SUBWARNING

25

SUBDEBUG

5

Pour un tableau complet des niveaux de journalisation, voir le module logging.

These additional logging levels are used primarily for certain debug messages within the multiprocessing module. Below is the same example as above, except with SUBDEBUG enabled:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(multiprocessing.SUBDEBUG)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../pymp-djGBXN/listener-...'
>>> del m
[SUBDEBUG/MainProcess] finalizer calling ...
[INFO/MainProcess] sending shutdown message to manager
[DEBUG/SyncManager-...] manager received shutdown message
[SUBDEBUG/SyncManager-...] calling <Finalize object, callback=unlink, ...
[SUBDEBUG/SyncManager-...] finalizer calling <built-in function unlink> ...
[SUBDEBUG/SyncManager-...] calling <Finalize object, dead>
[SUBDEBUG/SyncManager-...] finalizer calling <function rmtree at 0x5aa730> ...
[INFO/SyncManager-...] manager exiting with exitcode 0

16.6.2.13. Le module multiprocessing.dummy

multiprocessing.dummy réplique toute l’API de multiprocessing mais n’est rien de plus qu’un wrapper autour du module threading.

16.6.3. Lignes directrices de programmation

Il y a certaines lignes directrices et idiomes auxquels il faut adhérer en utilisant multiprocessing.

16.6.3.1. All platforms

Éviter les états partagés

Autant que possible, vous devriez éviter de déplacer de larges données entre les processus.

It is probably best to stick to using queues or pipes for communication between processes rather than using the lower level synchronization primitives from the threading module.

Sérialisation

Assurez-vous que les arguments passés aux méthodes des mandataires soient sérialisables (pickables).

Sûreté des mandataires à travers les fils d’exécution

N’utilisez pas d’objet mandataire depuis plus d’un fil d’exécution à moins que vous ne le protégiez avec un verrou.

(Il n’y a jamais de problème avec plusieurs processus utilisant un même mandataire.)

Attendre les processus zombies

Sous Unix quand un processus se termine mais n’est pas attendu, il devient un zombie. Il ne devrait jamais y en avoir beaucoup parce que chaque fois qu’un nouveau processus démarre (ou que active_children() est appelée) tous les processus complétés qui n’ont pas été attendus le seront. Aussi appeler la méthode Process.is_alive d’un processus terminé attendra le processus. Toutefois il est probablement une bonne pratique d’attendre explicitement tous les processus que vous démarrez.

Préférez hériter que sérialiser/désérialiser

On Windows many types from multiprocessing need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.

Éviter de terminer les processus

Utiliser la méthode Process.terminate pour stopper un processus risque de casser ou de rendre indisponible aux autres processus des ressources partagées (comme des verrous, sémaphores, tubes et queues) actuellement utilisée par le processus.

Il est donc probablement préférable de n’utiliser Process.terminate que sur les processus qui n’utilisent jamais de ressources partagées.

Attendre les processus qui utilisent des queues

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the « feeder » thread to the underlying pipe. (The child process can call the cancel_join_thread() method of the queue to avoid this behaviour.)

Cela signifie que chaque fois que vous utilisez une queue vous devez vous assurer que tous les éléments qui y ont été placés seront effectivement supprimés avant que le processus ne soit attendu. Autrement vous ne pouvez pas être sûr que les processus qui ont placé des éléments dans la queue se termineront. Souvenez-vous aussi que tous les processus non daemons seront attendus automatiquement.

L’exemple suivant provoquera un interblocage :

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

Une solution ici serait d’intervertir les deux dernières lignes (ou simplement supprimer la ligne p.join()).

Passer explicitement les ressources aux processus fils

On Unix a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process.

Apart from making the code (potentially) compatible with Windows this also ensures that as long as the child process is still alive the object will not be garbage collected in the parent process. This might be important if some resource is freed when the object is garbage collected in the parent process.

Donc par exemple :

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

devrait être réécrit comme :

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

Faire attention à remplacer sys.stdin par un objet « file-like »

À l’origine, multiprocessing appelait inconditionnellement :

os.close(sys.stdin.fileno())

dans la méthode multiprocessing.Process._bootstrap() — cela provoquait des problèmes avec les processus imbriqués. Cela peut être changé en :

sys.stdin.close()
sys.stdin = open(os.devnull)

Qui résout le problème fondamental des collisions entre processus provoquant des erreurs de mauvais descripteurs de fichiers, mais introduit un potentiel danger pour les applications qui remplacent sys.stdin() avec un « file-like object » ayant une sortie bufferisée. Ce danger est que si plusieurs processus appellent close() sur cet objet file-like, cela peut amener les données à être transmises à l’objet à plusieurs reprises, résultant en une corruption.

Si vous écrivez un objet file-like et implémentez votre propre cache, vous pouvez le rendre sûr pour les forks en stockant le pid chaque fois que vous ajoutez des données au cache, et annulez le cache quand le pip change. Par exemple :

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

Pour plus d’informations, voir bpo-5155, bpo-5313 et bpo-5331

16.6.3.2. Windows

Since Windows lacks os.fork() it has a few extra restrictions:

Plus de sérialisation

Ensure that all arguments to Process.__init__() are picklable. This means, in particular, that bound or unbound methods cannot be used directly as the target argument on Windows — just define a function and use that instead.

Also, if you subclass Process then make sure that instances will be picklable when the Process.start method is called.

Variables globales

Gardez en tête que si le code exécuté dans un processus fils essaie d’accéder à une variable globale, alors la valeur qu’il voit (s’il y en a une) pourrait ne pas être la même que la valeur du processus parent au moment même où Process.start est appelée.

Cependant, les variables globales qui sont juste des constantes de modules ne posent pas de problèmes.

Importation sûre du module principal

Assurez-vous que le module principal peut être importé en toute sécurité par un nouvel interpréteur Python sans causer d’effets de bord inattendus (comme le démarrage d’un nouveau processus).

For example, under Windows running the following module would fail with a RuntimeError:

from multiprocessing import Process

def foo():
    print 'hello'

p = Process(target=foo)
p.start()

Vous devriez plutôt protéger le « point d’entrée » du programme en utilisant if __name__ == '__main__': comme suit :

from multiprocessing import Process, freeze_support

def foo():
    print 'hello'

if __name__ == '__main__':
    freeze_support()
    p = Process(target=foo)
    p.start()

(La ligne freeze_support() peut être omise si le programme est uniquement lancé normalement et pas gelé.)

Cela permet aux interpréteurs Python fraîchement instanciés d’importer en toute sécurité le module et d’exécution ensuite la fonction foo().

Des restrictions similaires s’appliquent si une pool ou un gestionnaire est créé dans le module principal.

16.6.4. Exemples

Démonstration de comment créer et utiliser des gestionnaires et mandataires personnalisés :

#
# This module shows how to use arbitrary callables with a subclass of
# `BaseManager`.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo(object):
    def f(self):
        print 'you called Foo.f()'
    def g(self):
        print 'you called Foo.g()'
    def _h(self):
        print 'you called Foo._h()'

# A simple generator function
def baz():
    for i in xrange(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ('next', '__next__')
    def __iter__(self):
        return self
    def next(self):
        return self._callmethod('next')
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print '-' * 20

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print '-' * 20

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print '-' * 20

    it = manager.baz()
    for i in it:
        print '<%d>' % i,
    print

    print '-' * 20

    op = manager.operator()
    print 'op.add(23, 45) =', op.add(23, 45)
    print 'op.pow(2, 94) =', op.pow(2, 94)
    print 'op.getslice(range(10), 2, 6) =', op.getslice(range(10), 2, 6)
    print 'op.repeat(range(5), 3) =', op.repeat(range(5), 3)
    print 'op._exposed_ =', op._exposed_

##

if __name__ == '__main__':
    freeze_support()
    test()

En utilisant Pool :

#
# A test of `multiprocessing.Pool` class
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

def f(x):
    return 1.0 / (x-5.0)

def pow3(x):
    return x**3

def noop(x):
    pass

#
# Test code
#

def test():
    print 'cpu_count() = %d\n' % multiprocessing.cpu_count()

    #
    # Create pool
    #

    PROCESSES = 4
    print 'Creating pool with %d processes\n' % PROCESSES
    pool = multiprocessing.Pool(PROCESSES)
    print 'pool = %s' % pool
    print

    #
    # Tests
    #

    TASKS = [(mul, (i, 7)) for i in range(10)] + \
            [(plus, (i, 8)) for i in range(10)]

    results = [pool.apply_async(calculate, t) for t in TASKS]
    imap_it = pool.imap(calculatestar, TASKS)
    imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

    print 'Ordered results using pool.apply_async():'
    for r in results:
        print '\t', r.get()
    print

    print 'Ordered results using pool.imap():'
    for x in imap_it:
        print '\t', x
    print

    print 'Unordered results using pool.imap_unordered():'
    for x in imap_unordered_it:
        print '\t', x
    print

    print 'Ordered results using pool.map() --- will block till complete:'
    for x in pool.map(calculatestar, TASKS):
        print '\t', x
    print

    #
    # Simple benchmarks
    #

    N = 100000
    print 'def pow3(x): return x**3'

    t = time.time()
    A = map(pow3, xrange(N))
    print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
          (N, time.time() - t)

    t = time.time()
    B = pool.map(pow3, xrange(N))
    print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
          (N, time.time() - t)

    t = time.time()
    C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
    print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
          ' seconds' % (N, N//8, time.time() - t)

    assert A == B == C, (len(A), len(B), len(C))
    print

    L = [None] * 1000000
    print 'def noop(x): pass'
    print 'L = [None] * 1000000'

    t = time.time()
    A = map(noop, L)
    print '\tmap(noop, L):\n\t\t%s seconds' % \
          (time.time() - t)

    t = time.time()
    B = pool.map(noop, L)
    print '\tpool.map(noop, L):\n\t\t%s seconds' % \
          (time.time() - t)

    t = time.time()
    C = list(pool.imap(noop, L, chunksize=len(L)//8))
    print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
          (len(L)//8, time.time() - t)

    assert A == B == C, (len(A), len(B), len(C))
    print

    del A, B, C, L

    #
    # Test error handling
    #

    print 'Testing error handling:'

    try:
        print pool.apply(f, (5,))
    except ZeroDivisionError:
        print '\tGot ZeroDivisionError as expected from pool.apply()'
    else:
        raise AssertionError('expected ZeroDivisionError')

    try:
        print pool.map(f, range(10))
    except ZeroDivisionError:
        print '\tGot ZeroDivisionError as expected from pool.map()'
    else:
        raise AssertionError('expected ZeroDivisionError')

    try:
        print list(pool.imap(f, range(10)))
    except ZeroDivisionError:
        print '\tGot ZeroDivisionError as expected from list(pool.imap())'
    else:
        raise AssertionError('expected ZeroDivisionError')

    it = pool.imap(f, range(10))
    for i in range(10):
        try:
            x = it.next()
        except ZeroDivisionError:
            if i == 5:
                pass
        except StopIteration:
            break
        else:
            if i == 5:
                raise AssertionError('expected ZeroDivisionError')

    assert i == 9
    print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
    print

    #
    # Testing timeouts
    #

    print 'Testing ApplyResult.get() with timeout:',
    res = pool.apply_async(calculate, TASKS[0])
    while 1:
        sys.stdout.flush()
        try:
            sys.stdout.write('\n\t%s' % res.get(0.02))
            break
        except multiprocessing.TimeoutError:
            sys.stdout.write('.')
    print
    print

    print 'Testing IMapIterator.next() with timeout:',
    it = pool.imap(calculatestar, TASKS)
    while 1:
        sys.stdout.flush()
        try:
            sys.stdout.write('\n\t%s' % it.next(0.02))
        except StopIteration:
            break
        except multiprocessing.TimeoutError:
            sys.stdout.write('.')
    print
    print

    #
    # Testing callback
    #

    print 'Testing callback:'

    A = []
    B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

    r = pool.apply_async(mul, (7, 8), callback=A.append)
    r.wait()

    r = pool.map_async(pow3, range(10), callback=A.extend)
    r.wait()

    if A == B:
        print '\tcallbacks succeeded\n'
    else:
        print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)

    #
    # Check there are no outstanding tasks
    #

    assert not pool._cache, 'cache = %r' % pool._cache

    #
    # Check close() methods
    #

    print 'Testing close():'

    for worker in pool._pool:
        assert worker.is_alive()

    result = pool.apply_async(time.sleep, [0.5])
    pool.close()
    pool.join()

    assert result.get() is None

    for worker in pool._pool:
        assert not worker.is_alive()

    print '\tclose() succeeded\n'

    #
    # Check terminate() method
    #

    print 'Testing terminate():'

    pool = multiprocessing.Pool(2)
    DELTA = 0.1
    ignore = pool.apply(pow3, [2])
    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
    pool.terminate()
    pool.join()

    for worker in pool._pool:
        assert not worker.is_alive()

    print '\tterminate() succeeded\n'

    #
    # Check garbage collection
    #

    print 'Testing garbage collection:'

    pool = multiprocessing.Pool(2)
    DELTA = 0.1
    processes = pool._pool
    ignore = pool.apply(pow3, [2])
    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]

    results = pool = None

    time.sleep(DELTA * 2)

    for worker in processes:
        assert not worker.is_alive()

    print '\tgarbage collection succeeded\n'


if __name__ == '__main__':
    multiprocessing.freeze_support()

    assert len(sys.argv) in (1, 2)

    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
        print ' Using processes '.center(79, '-')
    elif sys.argv[1] == 'threads':
        print ' Using threads '.center(79, '-')
        import multiprocessing.dummy as multiprocessing
    else:
        print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
        raise SystemExit(2)

    test()

Synchronization types like locks, conditions and queues:

#
# A test file for the `multiprocessing` package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time, sys, random
from Queue import Empty

import multiprocessing               # may get overwritten


#### TEST_VALUE

def value_func(running, mutex):
    random.seed()
    time.sleep(random.random()*4)

    mutex.acquire()
    print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
    running.value -= 1
    mutex.release()

def test_value():
    TASKS = 10
    running = multiprocessing.Value('i', TASKS)
    mutex = multiprocessing.Lock()

    for i in range(TASKS):
        p = multiprocessing.Process(target=value_func, args=(running, mutex))
        p.start()

    while running.value > 0:
        time.sleep(0.08)
        mutex.acquire()
        print running.value,
        sys.stdout.flush()
        mutex.release()

    print
    print 'No more running processes'


#### TEST_QUEUE

def queue_func(queue):
    for i in range(30):
        time.sleep(0.5 * random.random())
        queue.put(i*i)
    queue.put('STOP')

def test_queue():
    q = multiprocessing.Queue()

    p = multiprocessing.Process(target=queue_func, args=(q,))
    p.start()

    o = None
    while o != 'STOP':
        try:
            o = q.get(timeout=0.3)
            print o,
            sys.stdout.flush()
        except Empty:
            print 'TIMEOUT'

    print


#### TEST_CONDITION

def condition_func(cond):
    cond.acquire()
    print '\t' + str(cond)
    time.sleep(2)
    print '\tchild is notifying'
    print '\t' + str(cond)
    cond.notify()
    cond.release()

def test_condition():
    cond = multiprocessing.Condition()

    p = multiprocessing.Process(target=condition_func, args=(cond,))
    print cond

    cond.acquire()
    print cond
    cond.acquire()
    print cond

    p.start()

    print 'main is waiting'
    cond.wait()
    print 'main has woken up'

    print cond
    cond.release()
    print cond
    cond.release()

    p.join()
    print cond


#### TEST_SEMAPHORE

def semaphore_func(sema, mutex, running):
    sema.acquire()

    mutex.acquire()
    running.value += 1
    print running.value, 'tasks are running'
    mutex.release()

    random.seed()
    time.sleep(random.random()*2)

    mutex.acquire()
    running.value -= 1
    print '%s has finished' % multiprocessing.current_process()
    mutex.release()

    sema.release()

def test_semaphore():
    sema = multiprocessing.Semaphore(3)
    mutex = multiprocessing.RLock()
    running = multiprocessing.Value('i', 0)

    processes = [
        multiprocessing.Process(target=semaphore_func,
                                args=(sema, mutex, running))
        for i in range(10)
        ]

    for p in processes:
        p.start()

    for p in processes:
        p.join()


#### TEST_JOIN_TIMEOUT

def join_timeout_func():
    print '\tchild sleeping'
    time.sleep(5.5)
    print '\n\tchild terminating'

def test_join_timeout():
    p = multiprocessing.Process(target=join_timeout_func)
    p.start()

    print 'waiting for process to finish'

    while 1:
        p.join(timeout=1)
        if not p.is_alive():
            break
        print '.',
        sys.stdout.flush()


#### TEST_EVENT

def event_func(event):
    print '\t%r is waiting' % multiprocessing.current_process()
    event.wait()
    print '\t%r has woken up' % multiprocessing.current_process()

def test_event():
    event = multiprocessing.Event()

    processes = [multiprocessing.Process(target=event_func, args=(event,))
                 for i in range(5)]

    for p in processes:
        p.start()

    print 'main is sleeping'
    time.sleep(2)

    print 'main is setting event'
    event.set()

    for p in processes:
        p.join()


#### TEST_SHAREDVALUES

def sharedvalues_func(values, arrays, shared_values, shared_arrays):
    for i in range(len(values)):
        v = values[i][1]
        sv = shared_values[i].value
        assert v == sv

    for i in range(len(values)):
        a = arrays[i][1]
        sa = list(shared_arrays[i][:])
        assert a == sa

    print 'Tests passed'

def test_sharedvalues():
    values = [
        ('i', 10),
        ('h', -2),
        ('d', 1.25)
        ]
    arrays = [
        ('i', range(100)),
        ('d', [0.25 * i for i in range(100)]),
        ('H', range(1000))
        ]

    shared_values = [multiprocessing.Value(id, v) for id, v in values]
    shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]

    p = multiprocessing.Process(
        target=sharedvalues_func,
        args=(values, arrays, shared_values, shared_arrays)
        )
    p.start()
    p.join()

    assert p.exitcode == 0


####

def test(namespace=multiprocessing):
    global multiprocessing

    multiprocessing = namespace

    for func in [ test_value, test_queue, test_condition,
                  test_semaphore, test_join_timeout, test_event,
                  test_sharedvalues ]:

        print '\n\t######## %s\n' % func.__name__
        func()

    ignore = multiprocessing.active_children()      # cleanup any old processes
    if hasattr(multiprocessing, '_debug_info'):
        info = multiprocessing._debug_info()
        if info:
            print info
            raise ValueError('there should be no positive refcounts left')


if __name__ == '__main__':
    multiprocessing.freeze_support()

    assert len(sys.argv) in (1, 2)

    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
        print ' Using processes '.center(79, '-')
        namespace = multiprocessing
    elif sys.argv[1] == 'manager':
        print ' Using processes and a manager '.center(79, '-')
        namespace = multiprocessing.Manager()
        namespace.Process = multiprocessing.Process
        namespace.current_process = multiprocessing.current_process
        namespace.active_children = multiprocessing.active_children
    elif sys.argv[1] == 'threads':
        print ' Using threads '.center(79, '-')
        import multiprocessing.dummy as namespace
    else:
        print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
        raise SystemExit(2)

    test(namespace)

Un exemple montrant comment utiliser des queues pour alimenter en tâches une collection de processus workers et collecter les résultats :

#
# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue.  If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print 'Unordered results:'
    for i in range(len(TASKS1)):
        print '\t', done_queue.get()

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print '\t', done_queue.get()

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()

An example of how a pool of worker processes can each run a SimpleHTTPServer.HttpServer instance while sharing a single listening socket.

#
# Example where a pool of http servers share a single listening socket
#
# On Windows this module depends on the ability to pickle a socket
# object so that the worker processes can inherit a copy of the server
# object.  (We import `multiprocessing.reduction` to enable this pickling.)
#
# Not sure if we should synchronize access to `socket.accept()` method by
# using a process-shared lock -- does not seem to be necessary.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import os
import sys

from multiprocessing import Process, current_process, freeze_support
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler

if sys.platform == 'win32':
    import multiprocessing.reduction    # make sockets pickable/inheritable


def note(format, *args):
    sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args))


class RequestHandler(SimpleHTTPRequestHandler):
    # we override log_message() to show which process is handling the request
    def log_message(self, format, *args):
        note(format, *args)

def serve_forever(server):
    note('starting server')
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        pass


def runpool(address, number_of_processes):
    # create a single server object -- children will each inherit a copy
    server = HTTPServer(address, RequestHandler)

    # create child processes to act as workers
    for i in range(number_of_processes-1):
        Process(target=serve_forever, args=(server,)).start()

    # main process also acts as a worker
    serve_forever(server)


def test():
    DIR = os.path.join(os.path.dirname(__file__), '..')
    ADDRESS = ('localhost', 8000)
    NUMBER_OF_PROCESSES = 4

    print 'Serving at http://%s:%d using %d worker processes' % \
          (ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES)
    print 'To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32']

    os.chdir(DIR)
    runpool(ADDRESS, NUMBER_OF_PROCESSES)


if __name__ == '__main__':
    freeze_support()
    test()

Some simple benchmarks comparing multiprocessing with threading:

#
# Simple benchmarks for the multiprocessing package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time, sys, multiprocessing, threading, Queue, gc

if sys.platform == 'win32':
    _timer = time.clock
else:
    _timer = time.time

delta = 1


#### TEST_QUEUESPEED

def queuespeed_func(q, c, iterations):
    a = '0' * 256
    c.acquire()
    c.notify()
    c.release()

    for i in xrange(iterations):
        q.put(a)

    q.put('STOP')

def test_queuespeed(Process, q, c):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        p = Process(target=queuespeed_func, args=(q, c, iterations))
        c.acquire()
        p.start()
        c.wait()
        c.release()

        result = None
        t = _timer()

        while result != 'STOP':
            result = q.get()

        elapsed = _timer() - t

        p.join()

    print iterations, 'objects passed through the queue in', elapsed, 'seconds'
    print 'average number/sec:', iterations/elapsed


#### TEST_PIPESPEED

def pipe_func(c, cond, iterations):
    a = '0' * 256
    cond.acquire()
    cond.notify()
    cond.release()

    for i in xrange(iterations):
        c.send(a)

    c.send('STOP')

def test_pipespeed():
    c, d = multiprocessing.Pipe()
    cond = multiprocessing.Condition()
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        p = multiprocessing.Process(target=pipe_func,
                                    args=(d, cond, iterations))
        cond.acquire()
        p.start()
        cond.wait()
        cond.release()

        result = None
        t = _timer()

        while result != 'STOP':
            result = c.recv()

        elapsed = _timer() - t
        p.join()

    print iterations, 'objects passed through connection in',elapsed,'seconds'
    print 'average number/sec:', iterations/elapsed


#### TEST_SEQSPEED

def test_seqspeed(seq):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        t = _timer()

        for i in xrange(iterations):
            a = seq[5]

        elapsed = _timer()-t

    print iterations, 'iterations in', elapsed, 'seconds'
    print 'average number/sec:', iterations/elapsed


#### TEST_LOCK

def test_lockspeed(l):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        t = _timer()

        for i in xrange(iterations):
            l.acquire()
            l.release()

        elapsed = _timer()-t

    print iterations, 'iterations in', elapsed, 'seconds'
    print 'average number/sec:', iterations/elapsed


#### TEST_CONDITION

def conditionspeed_func(c, N):
    c.acquire()
    c.notify()

    for i in xrange(N):
        c.wait()
        c.notify()

    c.release()

def test_conditionspeed(Process, c):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        c.acquire()
        p = Process(target=conditionspeed_func, args=(c, iterations))
        p.start()

        c.wait()

        t = _timer()

        for i in xrange(iterations):
            c.notify()
            c.wait()

        elapsed = _timer()-t

        c.release()
        p.join()

    print iterations * 2, 'waits in', elapsed, 'seconds'
    print 'average number/sec:', iterations * 2 / elapsed

####

def test():
    manager = multiprocessing.Manager()

    gc.disable()

    print '\n\t######## testing Queue.Queue\n'
    test_queuespeed(threading.Thread, Queue.Queue(),
                    threading.Condition())
    print '\n\t######## testing multiprocessing.Queue\n'
    test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
                    multiprocessing.Condition())
    print '\n\t######## testing Queue managed by server process\n'
    test_queuespeed(multiprocessing.Process, manager.Queue(),
                    manager.Condition())
    print '\n\t######## testing multiprocessing.Pipe\n'
    test_pipespeed()

    print

    print '\n\t######## testing list\n'
    test_seqspeed(range(10))
    print '\n\t######## testing list managed by server process\n'
    test_seqspeed(manager.list(range(10)))
    print '\n\t######## testing Array("i", ..., lock=False)\n'
    test_seqspeed(multiprocessing.Array('i', range(10), lock=False))
    print '\n\t######## testing Array("i", ..., lock=True)\n'
    test_seqspeed(multiprocessing.Array('i', range(10), lock=True))

    print

    print '\n\t######## testing threading.Lock\n'
    test_lockspeed(threading.Lock())
    print '\n\t######## testing threading.RLock\n'
    test_lockspeed(threading.RLock())
    print '\n\t######## testing multiprocessing.Lock\n'
    test_lockspeed(multiprocessing.Lock())
    print '\n\t######## testing multiprocessing.RLock\n'
    test_lockspeed(multiprocessing.RLock())
    print '\n\t######## testing lock managed by server process\n'
    test_lockspeed(manager.Lock())
    print '\n\t######## testing rlock managed by server process\n'
    test_lockspeed(manager.RLock())

    print

    print '\n\t######## testing threading.Condition\n'
    test_conditionspeed(threading.Thread, threading.Condition())
    print '\n\t######## testing multiprocessing.Condition\n'
    test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
    print '\n\t######## testing condition managed by a server process\n'
    test_conditionspeed(multiprocessing.Process, manager.Condition())

    gc.enable()

if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()