17.2. multiprocessing — Parallélisme par processus

Code source : Lib/multiprocessing/


17.2.1. Introduction

multiprocessing est un paquet qui permet l’instanciation de processus via la même API que le module threading. Le paquet multiprocessing offre à la fois des possibilités de programmation concurrente locale ou à distance, contournant les problèmes du Global Interpreter Lock en utilisant des processus plutôt que des fils d’exécution. Ainsi, le module multiprocessing permet au développeur de bénéficier entièrement des multiples processeurs sur une machine. Il tourne à la fois sur les systèmes Unix et Windows.

Le module multiprocessing introduit aussi des API sans analogues dans le module threading. Un exemple est l’objet Pool qui offre une manière pratique de paralléliser l’exécution d’une fonction sur de multiples valeurs d’entrée, distribuant ces valeurs entre les processus (parallélisme de données). L’exemple suivant présente la manière classique de définir une telle fonction dans un module afin que les processus fils puissent importer ce module avec succès. L’exemple basique de parallélisme de données utilise Pool,

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

affiche sur la sortie standard

[1, 4, 9]

17.2.1.1. La classe Process

Dans le module multiprocessing, les processus sont instanciés en créant un objet Process et en appelant sa méthode start(). La classe Process suit la même API que threading.Thread. Un exemple trivial d’un programme multi-processus est

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Pour afficher les IDs des processus impliqués, voici un exemple plus étoffé :

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

La nécessité de la ligne if __name__ == '__main__' est expliquée par Lignes directrices de programmation.

17.2.1.2. Contextes et méthodes de démarrage

Suivant la plateforme, multiprocessing gère trois manières de démarrer un processus. Ces méthodes de démarrage sont

spawn

Le processus parent démarre un processus neuf avec un interpréteur Python. Le processus fils hérite uniquement des ressources nécessaires pour exécuter la méthode run() de l’objet associé au processus. En particulier, les descripteurs de fichiers superflus et gérés par le processus parent ne sont pas hérités. Démarrer un processus en utilisant cette méthode est plutôt lent par rapport à fork ou forkserver.

Disponible sur Unix et Windows. Par défaut sur Windows.

fork

Le processus parent utilise os.fork() pour forker l’interpréteur Python. Le processus fils, quand il démarre, est effectivement identique au processus parent. Toutes les ressources du parent sont héritées par le fils. Notez qu’il est problématique de forker sans danger un processus multi-threadé.

Disponible uniquement sous Unix. Par défaut sous Unix.

forkserver

Quand le programme démarre et choisit la méthode de démarrage forkserver, un processus serveur est lancé. Dès lors, chaque fois qu’un nouveau processus est nécessaire, le processus parent se connecte au serveur et lui demande de forker un nouveau processus. Le processus serveur de fork n’utilisant qu’un seul fil d’exécution, il peut utiliser os.fork() sans danger. Les ressources superflues ne sont pas héritées.

Disponible sur les plateformes Unix qui acceptent le passage de descripteurs de fichiers à travers des tubes (pipes) Unix.

Modifié dans la version 3.4: spawn ajouté à toutes les plateformes Unix, et forkserver ajouté à certaines plateformes Unix. Les processus fils n’héritent plus de tous les descripteurs héritables du parent sous Windows.

Sous Unix, utiliser les méthodes de démarrage spawn ou forkserver démarre aussi un processus semaphore tracker qui traque les sémaphores nommés non libérés créés par les processus du programme. Quand tous les processus sont terminés, le traqueur de sémaphores libère les sémaphores restants. Généralement il ne devrait pas y en avoir, mais si un processus a été tué par un signal, certains sémaphores ont pu « fuiter ». (Libérer les sémaphores nommés est une affaire sérieuse puisque le système n’en autorise qu’un certain nombre, et qu’ils ne seront pas automatiquement libérés avant le prochain redémarrage.)

Pour sélectionner une méthode de démarrage, utilisez la fonction set_start_method() dans la clause if __name__ == '__main__' du module principal. Par exemple :

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() ne doit pas être utilisée plus d’une fois dans le programme.

Alternativement, vous pouvez utiliser get_context() pour obtenir un contexte. Les contextes ont la même API que le module multiprocessing, et permettent l’utilisation de plusieurs méthodes de démarrage dans un même programme.

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

Notez que les objets relatifs à un contexte ne sont pas forcément compatibles avec les processus d’un contexte différent. En particulier, les verrous créés avec le contexte fork ne peuvent pas être passés aux processus lancés avec les méthodes spawn ou forkserver.

Une bibliothèque qui veut utiliser une méthode de démarrage particulière devrait probablement faire appel à get_context() pour éviter d’interférer avec le choix de l’utilisateur de la bibliothèque.

17.2.1.3. Échange d’objets entre les processus

multiprocessing gère deux types de canaux de communication entre les processus :

Queues

La classe Queue est un clone assez proche de queue.Queue. Par exemple :

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

Les queues peuvent être utilisées par plusieurs fils d’exécution ou processus.

Tubes (pipes)

La fonction Pipe() renvoie une paire d’objets de connexion connectés à un tube qui est par défaut à double-sens. Par exemple :

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Les deux objets de connexion renvoyés par Pipe() représentent les deux bouts d’un tube. Chaque objet de connexion possède (entre autres) des méthodes send() et recv(). Notez que les données d’un tube peuvent être corrompues si deux processus (ou fils d’exécution) essaient de lire ou d’écrire sur le même bout du tube en même temps. Évidemment il n’y a pas de risque de corruption si les processus utilisent deux bouts différents en même temps.

17.2.1.4. Synchronisation entre processus

multiprocessing contient des équivalents à toutes les primitives de synchronisation de threading. Par exemple il est possible d’utiliser un verrou pour s’assurer qu’un seul processus à la fois écrit sur la sortie standard :

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

Sans le verrou, les sorties des différents processus risquent d’être mélangées.

17.2.1.5. Partager un état entre les processus

Comme mentionné plus haut, il est généralement préférable d’éviter autant que possible d’utiliser des états partagés en programmation concurrente. C’est particulièrement vrai quand plusieurs processus sont utilisés.

Cependant, si vous devez réellement partager des données, multiprocessing permet de le faire de deux manières.

Mémoire partagée

Les données peuvent être stockées dans une mémoire partagée en utilisant des Value ou des Array. Par exemple, le code suivant

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

affiche

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Les arguments 'd' et 'i' utilisés à la création des num et arr` sont des codes de types tels qu’utilisés par le module array : 'd' indique un flottant double-précision et 'i' indique un entier signé. Ces objets partagés seront sûr d’utilisation entre processus et fils d’exécution.

Pour plus de flexibilité dans l’utilisation de mémoire partagée, vous pouvez utiliser le module multiprocessing.sharedctypes qui permet la création d’objets arbitraires ctypes alloués depuis la mémoire partagée.

Processus serveur

Un objet gestionnaire renvoyé par Manager() contrôle un processus serveur qui détient les objets Python et autorise les autres processus à les manipuler à l’aide de mandataires.

Un gestionnaire renvoyé par Manager() supportera les types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value et Array. Par exemple,

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

affiche

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Les processus serveurs de gestionnaires sont plus flexibles que les mémoires partagées parce qu’ils peuvent gérer des types d’objets arbitraires. Aussi, un gestionnaire unique peut être partagé par les processus sur différentes machines à travers le réseau. Cependant, ils sont plus lents que les mémoires partagées.

17.2.1.6. Utiliser un réservoir de workers

La classe Pool représente une pool de processus de travail. Elle possède des méthodes qui permettent aux tâches d’être déchargées vers les processus de travail de différentes manières.

Par exemple :

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

Notez que les méthodes d’une pool ne devraient être utilisées que par le processus qui l’a créée.

Note

Fonctionnellement ce paquet exige que que le module __main__ soit importable par les fils. Cela est expliqué sur la page Lignes directrices de programmation, il est cependant utile de le rappeler ici. Cela signifie que certains exemples, comme les exemples utilisant multiprocessing.pool.Pool, ne fonctionnent pas dans l’interpréteur interactif. Par exemple :

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(Si vous essayez ce code, il affichera trois traces d’appels complètes entrelacées de manière semi-aléatoire, et vous aurez alors à stopper le processus maître.)

17.2.2. Référence

Le paquet multiprocessing reproduit en grande partie l’API du module threading.

17.2.2.1. Process et exceptions

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Les objets process représentent une activité exécutée dans un processus séparé. La classe Process a des équivalents à toutes les méthodes de threading.Thread.

Le constructeur doit toujours être appelé avec des arguments nommés. group devrait toujours être None ; il existe uniquement pour la compatibilité avec threading.Thread. target est l’objet appelable qui est invoqué par la méthode run(). Il vaut ``None`() par défaut, signifiant que rien n’est appelé. name est le nom du processus (voir name pour plus de détails). args est le tuple d’arguments pour l’invocation de la cible. kwargs est le dictionnaire des arguments nommés pour l’invocation de la cible. S’il est fourni, l’argument nommé daemon met l’option daemon du processus à True ou False. S’il est None (par défaut), l’option est héritée par le processus créateur.

Par défaut, aucun argument n’est passé à target.

Si une sous-classe redéfinit le constructeur, elle doit s’assurer d’invoquer le constructeur de la classe de base (Process.__init__()) avant de faire autre chose du processus.

Modifié dans la version 3.3: Ajout de l’argument daemon.

run()

Méthode représentant l’activité du processus.

Vous pouvez redéfinir cette méthode dans une sous-classe. La méthode standard run() invoque l’objet appelable passé au constructeur comme argument target, si fourni, avec les arguments séquentiels et nommés respectivement pris depuis les paramètres args et kwargs.

start()

Démarre l’activité du processus.

Elle doit être appelée au plus une fois par objet processus. Elle s’arrange pour que la méthode run() de l’objet soit invoquée dans un processus séparé.

join([timeout])

Si l’argument optionnel timeout est None (par défaut), la méthode bloque jusqu’à ce que le processus dont la méthode join() a été appelée se termine. Si timeout est un nombre positif, elle bloque au maximum pendant timeout secondes. Notez que la méthode renvoie None si le processus se termine ou si le temps d’exécution expire. Vérifiez l’attribut exitcode du processus pour déterminer s’il s’est terminé.

join peut être appelée plusieurs fois sur un même processus.

Un processus ne peut pas s’attendre lui-même car cela causerait un interblocage. C’est une erreur d’essayer d’attendre un processus avant qu’il ne soit démarré.

name

Le nom du processus. Le nom est une chaîne de caractères utilisée uniquement pour l’identification du processus. Il n’a pas de sémantique. Plusieurs processus peuvent avoir le même nom.

Le nom initial est déterminé par le constructeur. Si aucun nom explicite n’est fourni au constructeur, un nom de la forme « Process-N1:N2:…:Nk » est construit, où chaque Nk est le N-ième enfant de son parent.

is_alive()

Renvoie vrai si le processus est en vie, faux sinon.

Grossièrement, un objet processus est en vie depuis le moment où la méthode start() finit de s’exécuter jusqu’à ce que le processus fils se termine.

daemon

L’option daemon du processus, une valeur booléenne. L’option doit être réglée avant que la méthode start() ne soit appelée.

La valeur initiale est héritée par le processus créateur.

Quand un processus se ferme, il tente de terminer tous ses processus enfants daemon.

Notez qu’un processus daemon n’est pas autorisé à créer des processus fils. Sinon un processus daemon laisserait ses enfants orphelins lorsqu’il se termine par la fermeture de son parent. De plus, ce ne sont pas des daemons ou services Unix, ce sont des processus normaux qui seront terminés (et non attendus) si un processus non daemon se ferme.

En plus de l’API threading.Thread, les objets Process supportent aussi les attributs et méthodes suivants :

pid

Renvoie l’ID du processus. Avant que le processus ne soit lancé, la valeur est None.

exitcode

Le code de fermeture de l’enfant. La valeur est None si le processus ne s’est pas encore terminé. Une valeur négative -N indique que le fils a été terminé par un signal N.

authkey

La clé d’authentification du processus (une chaîne d’octets).

Quand multiprocessing est initialisé, une chaîne aléatoire est assignée au processus principal, en utilisant os.urandom().

Quand un objet Process est créé, il hérité de la clé d’authentification de son parent, bien que cela puisse être changé à l’aide du paramètre authkey pour une autre chaîne d’octets.

Voir Clés d’authentification.

sentinel

Un identifiant numérique de l’objet système qui devient « prêt » quand le processus se termine.

Vous pouvez utiliser cette valeur si vous voulez attendre plusieurs événements à la fois en utilisant multiprocessing.connection.wait(). Autrement appeler join() est plus simple.

Sous Windows, c’est un mécanisme de l’OS utilisable avec les familles d’appels API WaitForSingleObject et WaitForMultipleObjects. Sous Unix, c’est un descripteur de fichier utilisable avec les primitives sur module select.

Nouveau dans la version 3.3.

terminate()

Termine le processus. Sous Unix cela est réalisé à l’aide d’un signal SIGTERM, sous Windows TerminateProcess() est utilisé. Notez que les gestionnaires de sortie, les clauses finally etc. ne sont pas exécutées.

Notez que les descendants du processus ne seront pas terminés – ils deviendront simplement orphelins.

Avertissement

Si cette méthode est utilisée quand le processus associé utilise un tube ou une queue, alors le tube ou la queue sont susceptibles d’être corrompus et peuvent devenir inutilisables par les autres processus. De façon similaire, si le processus a acquis un verrou, un sémaphore ou autre, alors le terminer est susceptible de provoquer des blocages dans les autres processus.

Notez que les méthodes start(), join(), is_alive(), terminate() et exitcode ne devraient être appelées que par le processus ayant créé l’objet process.

Exemple d’utilisation de quelques méthodes de Process :

>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<Process(Process-1, initial)> False
>>> p.start()
>>> print(p, p.is_alive())
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.ProcessError

La classe de base de toutes les exceptions de multiprocessing.

exception multiprocessing.BufferTooShort

Exception levée par Connection.recv_bytes_into() quand l’objet tampon fourni est trop petit pour le message à lire.

Si e est une instance de BufferTooShort alors e.args[0] donnera un message sous forme d’une chaîne d’octets.

exception multiprocessing.AuthenticationError

Levée quand il y a une erreur d’authentification.

exception multiprocessing.TimeoutError

Levée par les méthodes avec temps d’exécution limité, quand ce temps expire.

17.2.2.2. Tubes (pipes) et Queues

Quand de multiples processus sont utilisés, de l’échange de messages est souvent mis en place pour la communication entre processus et éviter d’avoir à utiliser des primitives de synchronisation telles que des verrous.

Pour échanger des messages vous pouvez utiliser un Pipe() (pour une connexion entre deux processus) ou une queue (qui autorise de multiples producteurs et consommateurs).

Les types Queue, SimpleQueue et JoinableQueue sont des queues FIFO multi-producteurs et multi-consommateurs modelées sur la classe queue.Queue de la bibliothèque standard. Elles diffèrent par l’absence dans Queue des méthodes task_done() et join() introduites dans la classe queue.Queue par Python 2.5.

Si vous utilisez JoinableQueue alors vous devez appeler JoinableQueue.task_done() pour chaque tâche retirée de la queue, sans quoi le sémaphore utilisé pour compter le nombre de tâches non accomplies pourra éventuellement déborder, levant une exception.

Notez que vous pouvez aussi créer une queue partagée en utilisant un objet gestionnaire – voir Gestionnaires.

Note

multiprocessing utilise les exceptions habituelles queue.Empty et queue.Full pour signaler un dépassement du temps maximal autorisé. Elles ne sont pas disponibles dans l’espace de nommage multiprocessing donc vous devez les importer depuis le module queue.

Note

Quand un objet est placé dans une queue, l’objet est sérialisé par pickle et un fil d’exécution en arrière-plan transmettra ensuite les données sérialisées sur un tube sous-jacent. Cela a certaines conséquences qui peuvent être un peu surprenantes, mais ne devrait causer aucune difficultés pratiques – si elles vous embêtent vraiment, alors vous pouvez à la place utiliser une queue créée avec un manager.

  1. Après avoir placé un objet dans une queue vide il peut y avoir un délai infinitésimal avant que la méthode empty() de la queue renvoie False et que get_nowait() renvoie une valeur sans lever de queue.Empty.

  2. Si plusieurs processus placent des objets dans la queue, il est possible pour les objets d’être reçus de l’autre côté dans le désordre. Cependant, les objets placés par un même processus seront toujours récupérés dans l’ordre attendu.

Avertissement

Si un processus est tué à l’aide de Process.terminate() ou os.kill() pendant qu’il tente d’utiliser une Queue, alors les données de la queue peuvent être corrompues. Cela peut par la suite causer des levées d’exceptions dans les autres processus quand ils tenteront d’utiliser la queue.

Avertissement

Comme mentionné plus haut, si un processus fils a placé des éléments dans la queue (et qu’il n’a pas utilisé JoinableQueue.cancel_join_thread), alors le processus ne se terminera pas tant que les éléments placés dans le tampon n’auront pas été transmis au tube.

Cela signifie que si vous essayez d’attendre ce processus vous pouvez obtenir un interblocage, à moins que vous ne soyez sûr que tous les éléments placés dans la queue ont été consommés. De même, si le processus fils n’est pas un daemon alors le processus parent pourrait bloquer à la fermeture quand il tentera d’attendre tous ses enfants non daemons.

Notez que la queue créée à l’aide d’un gestionnaire n’a pas ce problème. Voir Lignes directrices de programmation.

Pour un exemple d’utilisation de queues pour de la communication entre les processus, voir Exemples.

multiprocessing.Pipe([duplex])

Renvoie une paire (conn1, conn2) d’objets Connection représentant les bouts d’un tube.

Si duplex vaut True (par défaut), alors le tube est bidirectionnel. Si duplex vaut False il est unidirectionnel : conn1 ne peut être utilisé que pour recevoir des messages et conn2 que pour en envoyer.

class multiprocessing.Queue([maxsize])

Renvoie une queue partagée entre les processus utilisant un tube et quelques verrous/sémaphores. Quand un processus place initialement un élément sur la queue, un fil d’exécution feeder est démarré pour transférer les objets du tampon vers le tube.

Les exceptions habituelles queue.Empty et queue.Full du module queue de la bibliothèque standard sont levées pour signaler les timeouts.

Queue implémente toutes les méthodes de queue.Queue à l’exception de task_done() et join().

qsize()

Renvoie la taille approximative de la queue. Ce nombre n’est pas fiable en raison des problématiques de multithreading et multiprocessing.

Notez que cela peut lever une NotImplementedError sous les plateformes Unix telles que Mac OS X où sem_getvalue() n’est pas implémentée.

empty()

Renvoie True si la queue est vide, False sinon. Cette valeur n’est pas fiable en raison des problématiques de multithreading et multiprocessing.

full()

Renvoie True si la queue est pleine, False sinon. Cette valeur n’est pas fiable en raison des problématiques de multithreading et multiprocessing.

put(obj[, block[, timeout]])

Place obj dans la queue. Si l’argument optionnel block vaut True (par défaut) est que timeout est None (par défaut), bloque jusqu’à ce qu’un slot libre soit disponible. Si timeout est un nombre positif, la méthode bloquera au maximum timeout secondes et lèvera une exception queue.Full si aucun slot libre n’a été trouvé dans le temps imparti. Autrement (block vaut False), place un élément dans la queue si un slot libre est immédiatement disponible, ou lève une exception queue.Full dans le cas contraire (timeout est ignoré dans ce cas).

put_nowait(obj)

Équivalent à put(obj, False).

get([block[, timeout]])

Retire et renvoie un élément de la queue. Si l’argument optionnel block vaut True (par défaut) et que timeout est None (par défaut), bloque jusqu’à ce qu’un élément soit disponible. Si timeout (le délai maximal autorisé) est un nombre positif, la méthode bloquera au maximum timeout secondes et lèvera une exception queue.Empty si aucun élément n’est disponible dans le temps imparti. Autrement (block vaut False), renvoie un élément s’il est immédiatement disponible, ou lève une exception queue.Empty dans le cas contraire (timeout est ignoré dans ce cas).

get_nowait()

Équivalent à get(False).

multiprocessing.Queue possède quelques méthodes additionnelles non présentes dans queue.Queue. Ces méthodes ne sont habituellement pas nécessaires pour la plupart des codes :

close()

Indique que plus aucune donnée ne peut être placée sur la queue par le processus courant. Le fil d’exécution en arrière-plan se terminera quand il aura transféré toutes les données du tampon vers le tube. Elle est appelée automatiquement quand la queue est collectée par le ramasse-miettes.

join_thread()

Attend le fil d’exécution d’arrière-plan. Elle peut seulement être utilisée une fois que close() a été appelée. Elle bloque jusqu’à ce que le fil d’arrière-plan se termine, assurant que toutes les données du tampon ont été transmises au tube.

Par défaut si un processus n’est pas le créateur de la queue alors à la fermeture elle essaiera d’attendre le fil d’exécution d’arrière-plan de la queue. Le processus peut appeler cancel_join_thread() pour que join_thread() ne fasse rien.

cancel_join_thread()

Empêche join_thread() de bloquer. En particulier, cela empêche le fil d’arrière-plan d’être attendu automatiquement quand le processus se ferme – voir join_thread().

Un meilleur nom pour cette méthode pourrait être allow_exit_without_flush(). Cela peut provoquer des pertes de données placées dans la queue, et vous ne devriez certainement pas avoir besoin de l’utiliser. Elle n’est là que si vous souhaitez terminer immédiatement le processus sans transférer les données du tampon, et que vous ne vous inquiétez pas de perdre des données.

Note

Le fonctionnement de cette classe requiert une implémentation de sémaphore partagé sur le système d’exploitation hôte. Sans cela, la fonctionnalité sera désactivée et la tentative d’instancier une Queue lèvera une ImportError. Voir bpo-3770 pour plus d’informations. Cette remarque reste valable pour les autres types de queues spécialisées définies par la suite.

class multiprocessing.SimpleQueue

Un type de Queue simplifié, très proche d’un Pipe avec verrou.

empty()

Renvoie True si la queue est vide, False sinon.

get()

Supprime et renvoie un élément de la queue.

put(item)

Place item dans la queue.

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue, une sous-classe de Queue, est une queue qui ajoute des méthodes task_done() et join().

task_done()

Indique qu’une tâche précédemment placée dans la queue est complétée. Utilisé par les consommateurs de la queue. Pour chaque get() utilisé pour récupérer une tâche, un appel ultérieur à task_done() indique à la queue que le traitement de la tâche est terminé.

Si un join() est actuellement bloquant, il se débloquera quand tous les éléments auront été traités (signifiant qu’un appel à task_done() a été reçu pour chaque élément ayant été placé via put() dans la queue).

Lève une exception ValueError si appelée plus de fois qu’il y avait d’éléments dans la file.

join()

Bloque jusqu’à ce que tous les éléments de la queue aient été récupérés et traités.

Le compteur des tâches non accomplies augmente chaque fois qu’un élément est ajouté à la queue. Le compteur redescend chaque fois qu’un consommateur appelle task_done() pour indiquer qu’un élément a été récupéré et que tout le travail qui le concerne est complété. Quand le compteur des tâches non accomplies atteint zéro, join() est débloquée.

17.2.2.3. Divers

multiprocessing.active_children()

Renvoie la liste de tous les enfants vivants du processus courant.

Appeler cette méthode provoque l’effet de bord d’attendre tout processus qui n’a pas encore terminé.

multiprocessing.cpu_count()

Renvoie le nombre de CPU sur le système.

Ce nombre n’est pas équivalent au nombre de CPUs que le processus courant peut utiliser. Le nombre de CPUs utilisables peut être obtenu avec len(os.sched_getaffinity(0))

Peut lever une NotImplementedError.

Voir aussi

os.cpu_count()

multiprocessing.current_process()

Renvoie l’objet Process correspondant au processus courant.

Un analogue à threading.current_thread().

multiprocessing.freeze_support()

Ajoute le support des programmes utilisant multiprocessing qui ont été gelés pour produire un exécutable Windows. (Testé avec py2exe, PyInstaller et cx_Freeze.)

Cette fonction doit être appelée juste après la ligne if __name__ == '__main__' du module principal. Par exemple :

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

Si la ligne freeze_support() est omise, alors tenter de lancer l’exécutable gelé lèvera une RuntimeError.

Appeler freeze_support() n’a pas d’effet quand elle est invoquée sur un système d’exploitation autre que Windows. De plus, si le module est lancé normalement par l’interpréteur Python sous Windows (le programme n’a pas été gelé), alors freeze_support() n’a pas d’effet.

multiprocessing.get_all_start_methods()

Renvoie la liste des méthodes de démarrage supportées, la première étant celle par défaut. Les méthodes de démarrage possibles sont 'fork', 'spawn' et 'forkserver'. Sous Windows seule 'spawn' est disponible. Sous Unix 'fork' et 'spawn' sont disponibles, 'fork' étant celle par défaut.

Nouveau dans la version 3.4.

multiprocessing.get_context(method=None)

Renvoie un contexte ayant les mêmes attributs que le module multiprocessing.

Si method est None le contexte par défaut est renvoyé. Sinon method doit valoir 'fork', 'spawn' ou 'forkserver'. Une ValueError est levée si la méthode de démarrage spécifiée n’est pas disponible.

Nouveau dans la version 3.4.

multiprocessing.get_start_method(allow_none=False)

Renvoie le nom de la méthode de démarrage utilisée pour démarrer le processus.

Si le nom de la méthode n’a pas été fixé et que allow_none est faux, alors la méthode de démarrage est réglée à celle par défaut et son nom est renvoyé. Si la méthode n’a pas été fixée et que allow_none est vrai, None est renvoyé.

La valeur de retour peut être 'fork', 'spawn', 'forkserver' ou None. 'fork' est la valeur par défaut sous Unix, 'spawn' est celle sous Windows.

Nouveau dans la version 3.4.

multiprocessing.set_executable()

Définit le chemin de l’interpréteur Python à utiliser pour démarrer un processus fils. (Par défaut sys.executable est utilisé). Les intégrateurs devront probablement faire quelque chose comme

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

avant de pouvoir créer des processus fils.

Modifié dans la version 3.4: Maintenant supporté sous Unix quand la méthode de démarrage 'spawn' est utilisée.

multiprocessing.set_start_method(method)

Règle la méthode qui doit être utilisée pour démarrer un processus fils. method peut être 'fork', 'spawn' ou 'forkserver'.

Notez que cette fonction ne devrait être appelée qu’une fois au plus, et l’appel devrait être protégé à l’intérieur d’une clause if __name__ == '__main__' dans le module principal.

Nouveau dans la version 3.4.

17.2.2.4. Objets de connexions

Les objets de connexion autorisent l’envoi et la réception d’objets sérialisables ou de chaînes de caractères. Ils peuvent être vus comme des interfaces de connexion (sockets) connectées orientées messages.

Les objets de connexion sont habituellement créés via Pipe – voir aussi Auditeurs et Clients.

class multiprocessing.connection.Connection
send(obj)

Envoie un objet sur l’autre bout de la connexion, qui devra être lu avec recv().

The object must be picklable. Very large pickles (approximately 32 MB+, though it depends on the OS) may raise a ValueError exception.

recv()

Renvoie un objet envoyé depuis l’autre bout de la connexion en utilisant send(). Bloque jusqu’à ce que quelque chose soit reçu. Lève une EOFError s’il n’y a plus rien à recevoir et que l’autre bout a été fermé.

fileno()

Renvoie le descripteur de fichier ou identifiant utilisé par la connexion.

close()

Ferme la connexion.

Elle est appelée automatiquement quand la connexion est collectée par le ramasse-miettes.

poll([timeout])

Renvoie vrai ou faux selon si des données sont disponibles à la lecture.

Si timeout n’est pas spécifié la méthode renverra immédiatement. Si timeout est un nombre alors il spécifie le temps maximum de blocage en secondes. Si timeout est None, un temps d’attente infini est utilisé.

Notez que plusieurs objets de connexions peuvent être attendus en même temps à l’aide de multiprocessing.connection.wait().

send_bytes(buffer[, offset[, size]])

Envoie des données binaires depuis un bytes-like object comme un message complet.

If offset is given then data is read from that position in buffer. If size is given then that many bytes will be read from buffer. Very large buffers (approximately 32 MB+, though it depends on the OS) may raise a ValueError exception

recv_bytes([maxlength])

Renvoie un message complet de données binaires envoyées depuis l’autre bout de la connexion comme une chaîne de caractères. Bloque jusqu’à ce qu’il y ait quelque chose à recevoir. Lève une EOFError s’il ne reste rien à recevoir et que l’autre côté de la connexion a été fermé.

Si maxlength est précisé que que le message est plus long que maxlength alors une OSError est levée et la connexion n’est plus lisible.

Modifié dans la version 3.3: Cette fonction levait auparavant une IOError, qui est maintenant un alias pour OSError.

recv_bytes_into(buffer[, offset])

Lit et stocke dans buffer un message complet de données binaires envoyées depuis l’autre bout de la connexion et renvoie le nombre d’octets du message. Bloque jusqu’à ce qu’il y ait quelque chose à recevoir. Lève une EOFError s’il ne reste rien à recevoir et que l’autre côté de la connexion a été fermé.

buffer doit être un bytes-like object accessible en écriture. Si offset est donné, le message sera écrit dans le tampon à partir de cette position. offset doit être un entier positif, inférieur à la taille de buffer (en octets).

Si le tampon est trop petit une exception BufferTooShort est levée et le message complet est accessible via e.args[0]e est l’instance de l’exception.

Modifié dans la version 3.3: Les objets de connexions eux-mêmes peuvent maintenant être transférés entre les processus en utilisant Connection.send() et Connection.recv().

Nouveau dans la version 3.3: Les objets de connexions supportent maintenant le protocole des gestionnaires de contexte – voir Le type gestionnaire de contexte. __enter__() renvoie l’objet de connexion, et __exit__() appelle close().

Par exemple :

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

Avertissement

La méthode Connection.recv() désérialise automatiquement les données qu’elle reçoit, ce qui peut être un risque de sécurité à moins que vous ne fassiez réellement confiance au processus émetteur du message.

Par conséquent, à moins que l’objet de connexion soit instancié par Pipe(), vous ne devriez uniquement utiliser les méthodes recv() et send() après avoir effectué une quelconque forme d’authentification. Voir Clés d’authentification.

Avertissement

Si un processus est tué pendant qu’il essaye de lire ou écrire sur le tube, alors les données du tube ont des chances d’être corrompues, parce qu’il devient impossible d’être sûr d’où se trouvent les bornes du message.

17.2.2.5. Primitives de synchronisation

Généralement les primitives de synchronisation ne sont pas nécessaire dans un programme multi-processus comme elles le sont dans un programme multi-fils d’exécution. Voir la documentation du module threading.

Notez que vous pouvez aussi créer des primitives de synchronisation en utilisant un objet gestionnaire – voir Gestionnaires.

class multiprocessing.Barrier(parties[, action[, timeout]])

Un objet barrière : un clone de threading.Barrier.

Nouveau dans la version 3.3.

class multiprocessing.BoundedSemaphore([value])

Un objet sémaphore lié : un analogue proche de threading.BoundedSemaphore.

Une seule différence existe avec son proche analogue : le premier argument de sa méthode acquire est appelé block, pour la cohérence avec Lock.acquire().

Note

Sur Mac OS X, elle n’est pas distinguable de la classe Semaphore parce que sem_getvalue() n’est pas implémentée sur cette plateforme.

class multiprocessing.Condition([lock])

Une variable conditionnelle : un alias pour threading.Condition.

Si lock est spécifié il doit être un objet Lock ou RLock du module multiprocessing.

Modifié dans la version 3.3: La méthode wait_for() a été ajoutée.

class multiprocessing.Event

Un clone de threading.Event.

class multiprocessing.Lock

Un verrou non récursif : un analogue proche de threading.Lock. Une fois que le processus ou le fil d’exécution a acquis un verrou, les tentatives suivantes d’acquisition depuis n’importe quel processus ou fil d’exécution bloqueront jusqu’à ce qu’il soit libéré ; n’importe quel processus ou fil peut le libérer. Les concepts et comportements de threading.Lock qui s’appliquent aux fils d’exécution sont répliqués ici dans multiprocessing.Lock et s’appliquent aux processus et aux fils d’exécution, à l’exception de ce qui est indiqué.

Notez que Lock est en fait une fonction factory qui renvoie une instance de multiprocessing.synchronize.Lock initialisée avec un contexte par défaut.

Lock supporte le protocole context manager et peut ainsi être utilisé avec une instruction with.

acquire(block=True, timeout=None)

Acquiert un verrou, bloquant ou non bloquant.

Avec l’argument block à True (par défaut), l’appel de méthode bloquera jusqu’à ce que le verrou soit dans déverrouillé, puis le verrouillera avant de renvoyer True. Notez que le nom de ce premier argument diffère de celui de threading.Lock.acquire().

Avec l’argument block à False, l’appel de méthode ne bloque pas. Si le verrou est actuellement verrouillé, renvoie False ; autrement verrouille le verrou et renvoie True.

Quand invoqué avec un nombre flottant positif comme timeout, bloque au maximum pendant ce nombre spécifié de secondes, tant que le verrou ne peut être acquis. Les invocations avec une valeur de timeout négatives sont équivalents à zéro. Les invocations avec un timeout à None (par défaut) correspondent à un délai d’attente infini. Notez que le traitement des valeurs de timeout négatives et None diffère du comportement implémenté dans threading.Lock.acquire(). L’argument timeout n’a pas d’implication pratique si l’argument block est mis ) False et est alors ignoré. Renvoie True si le verrou a été acquis et False si le temps de timeout a expiré.

release()

Libère un verrou. Elle peut être appelée depuis n’importe quel processus ou fil d’exécution, pas uniquement le processus ou le fil qui a acquis le verrou à l’origine.

Le comportement est le même que threading.Lock.release() excepté que lorsque la méthode est appelée sur un verrou déverrouillé, une ValueError est levée.

class multiprocessing.RLock

Un objet verrou récursif : un analogue proche de threading.RLock. Un verrou récursif doit être libéré par le processus ou le fil d’exécution qui l’a acquis. Quand un processus ou un fil acquiert un verrou récursif, le même processus/fil peut l’acquérir à nouveau sans bloquer ; le processus/fil doit le libérer autant de fois qu’il l’acquiert.

Notez que RLock est en fait une fonction factory qui renvoie une instance de multiprocessing.synchronize.RLock initialisée avec un contexte par défaut.

RLock supporte le protocole context manager et peut ainsi être utilisée avec une instruction with.

acquire(block=True, timeout=None)

Acquiert un verrou, bloquant ou non bloquant.

Quand invoqué avec l’argument block à True, bloque jusqu’à ce que le verrou soit déverrouillé (n’appartenant à aucun processus ou fil d’exécution) sauf s’il appartient déjà au processus ou fil d’exécution courant. Le processus ou fil d’exécution courant prend la possession du verrou (s’il ne l’a pas déjà) et incrémente d’un le niveau de récursion du verrou, renvoyant ainsi True. Notez qu’il y a plusieurs différences dans le comportement de ce premier argument comparé à l’implémentation de threading.RLock.acquire(), à commencer par le nom de l’argument lui-même.

Quand invoqué avec l’argument block à False, ne bloque pas. Si le verrou est déjà acquis (et possédé) par un autre processus ou fil d’exécution, le processus/fil courant n’en prend pas la possession et le niveau de récursion n’est pas incrémenté, résultant en une valeur de retour à False. Si le verrou est déverrouillé, le processus/fil courant en prend possession et incrémente son niveau de récursion, renvoyant True.

L’usage et les comportements de l’argument timeout sont les mêmes que pour Lock.acquire(). Notez que certains de ces comportements diffèrent par rapport à ceux implémentés par threading.RLock.acquire().

release()

Libère un verrou, décrémentant son niveau de récursion. Si après la décrémentation le niveau de récursion est zéro, réinitialise le verrou à un état déverrouillé (n’appartenant à aucun processus ou fil d’exécution) et si des processus/fils attendent que le verrou se déverrouillé, autorise un seul d’entre-eux à continuer. Si après cette décrémentation le niveau de récursion est toujours strictement positif, le verrou reste verrouillé et propriété du processus/fil appelant.

N’appelez cette méthode que si le processus ou fil d’exécution appelant est propriétaire du verrou. Une AssertionError est levée si cette méthode est appelée par un processus/fil autre que le propriétaire ou si le verrou n’est pas verrouillé (possédé). Notez que le type d’exception levé dans cette situation diffère du comportement de threading.RLock.release().

class multiprocessing.Semaphore([value])

Un objet sémaphore, proche analogue de threading.Semaphore.

Une seule différence existe avec son proche analogue : le premier argument de sa méthode acquire est appelé block, pour la cohérence avec Lock.acquire().

Note

Sous Mac OS X, sem_timedwait n’est pas supporté, donc appeler acquire() avec un temps d’exécution limité émulera le comportement de cette fonction en utilisant une boucle d’attente.

Note

Si le signal SIGINT généré par un Ctrl-C survient pendant que le fil d’exécution principal est bloqué par un appel à BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() ou Condition.wait(), l’appel sera immédiatement interrompu et une KeyboardInterrupt sera levée.

Cela diffère du comportement de threading où le SIGINT est ignoré tant que les appels bloquants sont en cours.

Note

Certaines des fonctionnalités de ce paquet requièrent une implémentation fonctionnelle de sémaphores partagés sur le système hôte. Sans cela, le module multiprocessing.synchronize sera désactivé, et les tentatives de l’importer lèveront une ImportError. Voir bpo-3770 pour plus d’informations.

17.2.2.6. Objets ctypes partagés

Il est possible de créer des objets partagés utilisant une mémoire partagée pouvant être héritée par les processus enfants.

multiprocessing.Value(typecode_or_type, *args, lock=True)

Renvoie un objet ctypes alloué depuis la mémoire partagée. Par défaut la valeur de retour est en fait un wrapper synchronisé autour de l’objet. L’objet en lui-même est accessible par l’attribut value de l’une Value.

typecode_or_type détermine le type de l’objet renvoyé : il s’agit soit d’un type ctype soit d’un caractère typecode tel qu’utilisé par le module array. *args est passé au constructeur de ce type.

Si lock vaut True (par défaut), alors un nouveau verrou récursif est créé pour synchroniser l’accès à la valeur. Si lock est un objet Lock ou RLock alors il sera utilisé pour synchroniser l’accès à la valeur. Si lock vaut False, l’accès à l’objet renvoyé ne sera pas automatiquement protégé par un verrou, donc il ne sera pas forcément « process-safe ».

Les opérations telles que += qui impliquent une lecture et une écriture ne sont pas atomique. Ainsi si vous souhaitez par exemple réaliser une incrémentation atomique sur une valeur partagée, vous ne pouvez pas simplement faire

counter.value += 1

En supposant que le verrou associé est récursif (ce qui est le cas par défaut), vous pouvez à la place faire

with counter.get_lock():
    counter.value += 1

Notez que lock est un argument keyword-only.

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

Renvoie un tableau ctypes alloué depuis la mémoire partagée. Par défaut la valeur de retour est en fait un wrapper synchronisé autour du tableau.

typecode_or_type détermine le type des éléments du tableau renvoyé : il s’agit soit d’un type ctype soit d’un caractère typecode tel qu’utilisé par le module array. Si size_or_initialize est un entier, alors il détermine la taille du tableau, et le tableau sera initialisé avec des zéros. Autrement, size*or_initializer est une séquence qui sera utilisée pour initialiser le tableau et dont la taille détermine celle du tableau.

Si lock vaut True (par défaut), alors un nouveau verrou est créé pour synchroniser l’accès à la valeur. Si lock est un objet Lock ou RLock alors il sera utilisé pour synchroniser l’accès à la valeur. Si lock vaut False, l’accès à l’objet renvoyé ne sera pas automatiquement protégé par un verrou, donc il ne sera pas forcément « process-safe ».

Notez que lock est un argument keyword-only.

Notez qu’un tableau de ctypes.c_char a ses attributs value et raw qui permettent de l’utiliser pour stocker et récupérer des chaînes de caractères.

17.2.2.6.1. Le module multiprocessing.sharedctypes

Le module multiprocessing.sharedctypes fournit des fonctions pour allouer des objets ctypes depuis la mémoire partagée, qui peuvent être hérités par les processus fils.

Note

Bien qu’il soit possible de stocker un pointeur en mémoire partagée, rappelez-vous qu’un pointer référence un emplacement dans l’espace d’adressage d’un processus particulier. Ainsi, ce pointeur a de fortes chances d’être invalide dans le contexte d’un autre processus et déréférencer le pointeur depuis ce second processus peut causer un plantage.

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

Renvoie un tableau ctypes alloué depuis la mémoire partagée.

typecode_or_type détermine le type des éléments du tableau renvoyé : il s’agit soit d’un type ctype soit d’un caractère codant le type des éléments du tableau (typecode) tel qu’utilisé par le module array. Si size_or_initialize est un entier, alors il détermine la taille du tableau, et le tableau sera initialisé avec des zéros. Autrement, size_or_initializer est une séquence qui sera utilisée pour initialiser le tableau et dont la taille détermine celle du tableau.

Notez que définir ou récupérer un élément est potentiellement non atomique – utilisez plutôt Array() pour vous assurer de synchroniser automatiquement avec un verrou.

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

Renvoie un objet ctypes alloué depuis la mémoire partagée.

typecode_or_type détermine le type de l’objet renvoyé : il s’agit soit d’un type ctype soit d’un caractère typecode tel qu’utilisé par le module array. *args est passé au constructeur de ce type.

Notez que définir ou récupérer un élément est potentiellement non atomique – utilisez plutôt Value() pour vous assurer de synchroniser automatiquement avec un verrou.

Notez qu’un tableau de ctypes.c_char a ses attributs value et raw qui permettent de l’utiliser pour stocker et récupérer des chaînes de caractères – voir la documentation de ctypes.

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

Identique à RawArray() à l’exception que suivant la valeur de lock un wrapper de synchronisation process-safe pourra être renvoyé à la place d’un tableau ctypes brut.

Si lock vaut True (par défaut), alors un nouveau verrou est créé pour synchroniser l’accès à la valeur. Si lock est un objet Lock ou RLock alors il sera utilisé pour synchroniser l’accès à la valeur. Si lock vaut False, l’accès à l’objet renvoyé ne sera pas automatiquement protégé par un verrou, donc il ne sera pas forcément « process-safe ».

Notez que lock est un argument keyword-only.

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

Identique à RawValue() à l’exception que suivant la valeur de lock un wrapper de synchronisation process-safe pourra être renvoyé à la place d’un objet ctypes brut.

Si lock vaut True (par défaut), alors un nouveau verrou est créé pour synchroniser l’accès à la valeur. Si lock est un objet Lock ou RLock alors il sera utilisé pour synchroniser l’accès à la valeur. Si lock vaut False, l’accès à l’objet renvoyé ne sera pas automatiquement protégé par un verrou, donc il ne sera pas forcément « process-safe ».

Notez que lock est un argument keyword-only.

multiprocessing.sharedctypes.copy(obj)

Renvoie un objet ctypes alloué depuis la mémoire partagée, qui est une copie de l’objet ctypes obj.

multiprocessing.sharedctypes.synchronized(obj[, lock])

Renvoie un wrapper process-safe autour de l’objet ctypes qui utilise lock pour synchroniser l’accès. Si lock est None (par défaut), un objet multiprocessing.RLock est créé automatiquement.

Un wrapper synchronisé aura deux méthodes en plus de celles de l’objet qu’il enveloppe : get_obj() renvoie l’objet wrappé et get_lock() renvoie le verrou utilisé pour la synchronisation.

Notez qu’accéder à l’objet ctypes à travers le wrapper peut s’avérer beaucoup plus lent qu’accéder directement à l’objet ctypes brut.

Modifié dans la version 3.5: Les objets synchronisés supportent le protocole context manager.

Le tableau ci-dessous compare la syntaxe de création des objets ctypes partagés depuis une mémoire partagée avec la syntaxe normale ctypes. (Dans le tableau, MyStruct est une sous-classe quelconque de ctypes.Structure.)

ctypes

sharedctypes utilisant un type

sharedctypes utilisant un typecode

c_double(2.4)

RawValue(c_double, 2.4)

RawValue(“d”, 2.4)

MyStruct(4, 6)

RawValue(MyStruct, 4, 6)

(c_short * 7)()

RawArray(c_short, 7)

RawArray(“h”, 7)

(c_int * 3)(9, 2, 8)

RawArray(c_int, (9, 2, 8))

RawArray(“i”, (9, 2, 8))

Ci-dessous un exemple où des objets ctypes sont modifiés par un processus fils :

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

Les résultats affichés sont

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

17.2.2.7. Gestionnaires

Les gestionnaires fournissent un moyen de créer des données qui peuvent être partagées entre les différents processus, incluant le partage à travers le réseau entre des processus tournant sur des machines différentes. Un objet gestionnaire contrôle un processus serveur qui gère les shared objects. Les autres processus peuvent accéder aux objets partagés à l’aide de mandataires.

multiprocessing.Manager()

Renvoie un objet SyncManager démarré qui peut être utilisé pour partager des objets entre les processus. L’objet gestionnaire renvoyé correspond à un processus enfant instancié et possède des méthodes pour créer des objets partagés et renvoyer les mandataires correspondants.

Les processus gestionnaires seront arrêtés dès qu’ils seront collectés par le ramasse-miettes ou que leur processus parent se terminera. Les classes gestionnaires sont définies dans le module multiprocessing.managers :

class multiprocessing.managers.BaseManager([address[, authkey]])

Crée un objet BaseManager.

Une fois créé il faut appeler start() ou get_server().serve_forever() pour assurer que l’objet gestionnaire référence un processus gestionnaire démarré.

address est l’adresse sur laquelle le processus gestionnaire écoute pour de nouvelles connexions. Si address est None, une adresse arbitraire est choisie.

authkey est la clé d’authentification qui sera utilisée pour vérifier la validité des connexions entrantes sur le processus serveur. Si authkey est None alors current_process().authkey est utilisée. Autrement authkey est utilisée et doit être une chaîne d’octets.

start([initializer[, initargs]])

Démarre un sous-processus pour démarrer le gestionnaire. Si initializer n’est pas None alors le sous-processus appellera initializer(*initargs) quand il démarrera.

get_server()

Renvoie un objet Server qui représente le serveur sous le contrôle du gestionnaire. L’objet Server supporte la méthode serve_forever() :

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server possède en plus un attribut address.

connect()

Connecte un objet gestionnaire local au processus gestionnaire distant :

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 5000), authkey=b'abc')
>>> m.connect()
shutdown()

Stoppe le processus utilisé par le gestionnaire. Cela est disponible uniquement si start() a été utilisée pour démarrer le processus serveur.

Cette méthode peut être appelée plusieurs fois.

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

Une méthode de classe qui peut être utilisée pour enregistrer un type ou un appelable avec la classe gestionnaire.

typeif est un « type identifier » qui est utilisé pour identifier un type particulier d’objet partagé. Cela doit être une chaîne de caractères.

callable est un objet appelable utilisé pour créer les objets avec cet identifiant de type. Si une instance de gestionnaire prévoit de se connecter au serveur en utilisant sa méthode connect() ou si l’argument create_method vaut False alors cet argument peut être laissé à None.

proxytype est une sous-classe de BaseProxy utilisée pour créer des mandataires autour des objets partagés avec ce typeid. S’il est None, une classe mandataire sera créée automatiquement.

exposed est utilisé pour préciser une séquence de noms de méthodes dont les mandataires pour ce typeid doivent être autorisés à accéder via BaseProxy._callmethod(). (Si exposed est None alors proxytype._exposed_ est utilisé à la place s’il existe.) Dans le cas où aucune liste exposed n’est précisée, toutes les « méthodes publiques » de l’objet partagé seront accessibles. (Ici une « méthode publique » signifie n’importe quel attribut qui possède une méthode __call__() et dont le nom ne commence pas par un '_'.)

method_to_typeid est un tableau associatif utilisé pour préciser le type de retour de ces méthodes exposées qui doivent renvoyer un mandataire. Il associé un nom de méthode à une chaîne de caractères typeid. (Si method_to_typeid est None, proxytype._method_to_typeid_ est utilisé à la place s’il existe). Si le nom d’une méthode n’est pas une clé de ce tableau associatif ou si la valeur associée est None, l’objet renvoyé par la méthode sera une copie de la valeur.

create_method détermine si une méthode devrait être créée avec le nom typeid, qui peut être utilisée pour indiquer au processus serveur de créer un nouvel objet partagé et d’en renvoyer un mandataire. a valeur par défaut est True.

Les instances de BaseManager ont aussi une propriété en lecture seule :

address

L’adresse utilisée par le gestionnaire.

Modifié dans la version 3.3: Les objets gestionnaires supportent le protocole des gestionnaires de contexte – voir Le type gestionnaire de contexte. __enter__() démarre le processus serveur (s’il n’a pas déjà été démarré) et renvoie l’objet gestionnaire. __exit__() appelle shutdown().

Dans les versions précédentes __enter__() ne démarrait pas le processus serveur du gestionnaire s’il n’était pas déjà démarré.

class multiprocessing.managers.SyncManager

Une sous-classe de BaseManager qui peut être utilisée pour la synchronisation entre processus. Des objets de ce type sont renvoyés par multiprocessing.Manager().

Ces méthodes créent et renvoient des Objets mandataires pour un certain nombre de types de données communément utilisés pour être synchronisés entre les processus. Elles incluent notamment des listes et dictionnaires partagés.

Barrier(parties[, action[, timeout]])

Crée un objet threading.Barrier partagé et renvoie un mandataire pour cet objet.

Nouveau dans la version 3.3.

BoundedSemaphore([value])

Crée un objet threading.BoundedSemaphore partagé et renvoie un mandataire pour cet objet.

Condition([lock])

Crée un objet threading.Condition partagé et renvoie un mandataire pour cet objet.

Si lock est fourni alors il doit être un mandataire pour un objet threading.Lock ou threading.RLock.

Modifié dans la version 3.3: La méthode wait_for() a été ajoutée.

Event()

Crée un objet threading.Event partagé et renvoie un mandataire pour cet objet.

Lock()

Crée un objet threading.Lock partagé et renvoie un mandataire pour cet objet.

Namespace()

Crée un objet Namespace partagé et renvoie un mandataire pour cet objet.

Queue([maxsize])

Crée un objet queue.Queue partagé et renvoie un mandataire pour cet objet.

RLock()

Crée un objet threading.RLock partagé et renvoie un mandataire pour cet objet.

Semaphore([value])

Crée un objet threading.Semaphore partagé et renvoie un mandataire pour cet objet.

Array(typecode, sequence)

Crée un tableau et renvoie un mandataire pour cet objet.

Value(typecode, value)

Crée un objet avec un attribut value accessible en écriture et renvoie un mandataire pour cet objet.

dict()
dict(mapping)
dict(sequence)

Crée un objet dict partagé et renvoie un mandataire pour cet objet.

list()
list(sequence)

Crée un objet list partagé et renvoie un mandataire pour cet objet.

Modifié dans la version 3.6: Les objets partagés peuvent être imbriqués. Par exemple, un conteneur partagé tel qu’une liste partagée peu contenir d’autres objets partagés qui seront aussi gérés et synchronisés par le SyncManager.

class multiprocessing.managers.Namespace

Un type qui peut être enregistré avec SyncManager.

Un espace de nommage n’a pas de méthodes publiques, mais possède des attributs accessibles en écriture. Sa représentation montre les valeurs de ses attributs.

Cependant, en utilisant un mandataire pour un espace de nommage, un attribut débutant par '_' est un attribut du mandataire et non de l’objet cible :

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

17.2.2.7.1. Gestionnaires personnalisés

Pour créer son propre gestionnaire, il faut créer une sous-classe de BaseManager et utiliser la méthode de classe register() pour enregistrer de nouveaux types ou callables au gestionnaire. Par exemple :

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

17.2.2.7.2. Utiliser un gestionnaire distant

Il est possible de lancer un serveur gestionnaire sur une machine et d’avoir des clients l’utilisant sur d’autres machines (en supposant que les pare-feus impliqués l’autorisent).

Exécuter les commandes suivantes crée un serveur pour une simple queue partagée à laquelle des clients distants peuvent accéder :

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Un client peut accéder au serveur comme suit :

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

Un autre client peut aussi l’utiliser :

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

Les processus locaux peuvent aussi accéder à cette queue, utilisant le code précédent sur le client pour y accéder à distance :

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super(Worker, self).__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

17.2.2.8. Objets mandataires

Un mandataire est un objet qui référence un objet partagé appartenant (supposément) à un processus différent. L’objet partagé est appelé le référent du mandataire. Plusieurs mandataires peuvent avoir un même référent.

Un mandataire possède des méthodes qui appellent les méthodes correspondantes du référent (bien que toutes les méthodes du référent ne soient pas nécessairement accessibles à travers le mandataire). De cette manière, un mandataire peut être utilisé comme le serait sont référent :

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

Notez qu’appliquer str() à un mandataire renvoie la représentation du référent, alors que repr() renvoie celle du mandataire.

Une fonctionnalité importantes des objets mandataires est qu’ils sont sérialisables et peuvent donc être échangés entre les processus. Ainsi, un référent peut contenir des Objets mandataires. Cela permet d’imbriquer des listes et dictionnaires gérés ainsi que d’autres Objets mandataires :

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

De même, les mandataires de listes et dictionnaires peuvent être imbriqués dans d’autres :

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

Si des objets standards (non proxyfiés) list ou dict sont contenus dans un référent, les modifications sur ces valeurs mutables ne seront pas propagées à travers le gestionnaire parce que le mandataire n’a aucun moyen de savoir quand les valeurs contenues sont modifiées. Cependant, stocker une valeur dans un conteneur mandataire (qui déclenche un appel à __setitem__ sur le mandataire) propage bien la modification à travers le gestionnaire et modifie effectivement l’élément, il est ainsi possible de réassigner la valeur modifiée au conteneur mandataire :

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d

Cette approche est peut-être moins pratique que d’utiliser des Objets mandataires imbriqués pour la majorité des cas d’utilisation, mais démontre aussi un certain niveau de contrôle sur la synchronisation.

Note

Les types de mandataires de multiprocessing n’implémentent rien pour la comparaison par valeurs. Par exemple, on a :

>>> manager.list([1,2,3]) == [1,2,3]
False

Il faut à la place simplement utiliser une copie du référent pour faire les comparaisons.

class multiprocessing.managers.BaseProxy

Les objets mandataires sont des instances de sous-classes de BaseProxy.

_callmethod(methodname[, args[, kwds]])

Appelle et renvoie le résultat d’une méthode du référent du mandataire.

Si proxy est un mandataire sont le référent est obj, alors l’expression

proxy._callmethod(methodname, args, kwds)

s’évalue comme

getattr(obj, methodname)(*args, **kwds)

dans le processus du gestionnaire.

La valeur renvoyée sera une copie du résultat de l’appel ou un mandataire sur un nouvel objet partagé – voir l’a documentation de l’argument method_to_typeid de BaseManager.register().

Si une exception est levée par l’appel, elle est relayée par _callmethod(). Si une autre exception est levée par le processus du gestionnaire, elle est convertie en une RemoteError et est levée par _callmethod().

Notez en particulier qu’une exception est levée si methodname n’est pas exposée.

Un exemple d’utilisation de _callmethod() :

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

Renvoie une copie du référent.

Si le référent n’est pas sérialisable, une exception est levée.

__repr__()

Renvoie la représentation de l’objet mandataire.

__str__()

Renvoie la représentation du référent.

17.2.2.8.1. Nettoyage

Un mandataire utilise un callback sous une référence faible de façon à ce que quand il est collecté par le ramasse-miettes, il se désenregistre auprès du gestionnaire qui possède le référent.

Un objet partagé est supprimé par le processus gestionnaire quand plus aucun mandataire ne le référence.

17.2.2.9. Bassins de processus

On peut créer un bassin de processus qui exécuteront les tâches qui lui seront soumises avec la classe Pool.

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

Un objet process pool qui contrôle un bassin de processus workers auquel sont soumises des tâches. Il supporte les résultats asynchrones avec des timeouts et des callabacks et possède une implémentation parallèle de map.

processes est le nombre de processus workers à utiliser. Si processes est None, le nombre renvoyé par os.cpu_count() est utilisé.

Si initializer n’est pas None, chaque processus worker appellera initializer(*initargs) en démarrant.

maxtasksperchild est le nombre de tâches qu’un processus worker peut accomplir avant de se fermer et d’être remplacé par un worker frais, pour permettre aux ressources inutilisées d’être libérées. Par défaut maxtasksperchild est None, ce qui signifie que le worker vit aussi longtemps que le bassin.

context peut être utilisé pour préciser le contexte utilisé pour démarrer les processus workers. Habituellement un bassin est créé à l’aide de la fonction multiprocessing.Pool() ou de la méthode Pool() d’un objet de contexte. Dans les deux cas context est réglé de façon appropriée.

Notez que les méthodes de l’objet pool ne doivent être appelées que par le processus qui l’a créé.

Nouveau dans la version 3.2: maxtasksperchild

Nouveau dans la version 3.4: context

Note

Les processus workers à l’intérieur d’une Pool vivent par défaut aussi longtemps que la queue de travail du bassin. Un modèle fréquent chez d’autres systèmes (tels qu’Apache, mod_wsgi, etc.) pour libérer les ressources détenues par les workers est d’autoriser un worker dans le bassin à accomplir seulement une certaine charge de travail avant de se fermer, se retrouvant nettoyé et remplacé par un nouvelle processus fraîchement lancé. L’argument maxtasksperchild de Pool expose cette fonctionnalité à l’utilisateur final.

apply(func[, args[, kwds]])

Appelle func avec les arguments args et les arguments nommés kwds. Bloque jusqu’à ce que que le résultat soit prêt. En raison de ce blocage, apply_async() est préférable pour exécuter du travail en parallèle. De plus, func est exécutée sur un seul des workers du bassin.

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

Une variante de la méthode apply() qui renvoie un objet résultat.

Si callback est précisé alors ce doit être un objet appelable qui accepte un seul argument. Quand le résultat est prêt, callback est appelé avec ce résultat, si l’appel n’échoue pas auquel cas error_callback est appelé à la place.

Si error_callback est précisé alors ce doit être un objet appelable qui accepte un seul argument. Si la fonction cible échoue, alors error_callback est appelé avec l’instance de l’exception.

Les callbacks doivent se terminer immédiatement, autrement le fil d’exécution qui gère les résultats se retrouverait bloqué.

map(func, iterable[, chunksize])

Un équivalent parallèle à la fonction built-in map() (qui ne supporte cependant qu’un itérable en argument). Elle bloque jusqu’à ce que le résultat soit prêt.

La méthode découpe l’itérable en un nombre de morceaux qu’elle envoie au bassin de processus comme des tâches séparées. La taille (approximative) de ces morceaux peut être précisée en passant à chunksize un entier positif.

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

Une variante de la méthode map() qui renvoie un objet résultat.

Si callback est précisé alors ce doit être un objet appelable qui accepte un seul argument. Quand le résultat est prêt, callback est appelé avec ce résultat, si l’appel n’échoue pas auquel cas error_callback est appelé à la place.

Si error_callback est précisé alors ce doit être un objet appelable qui accepte un seul argument. Si la fonction cible échoue, alors error_callback est appelé avec l’instance de l’exception.

Les callbacks doivent se terminer immédiatement, autrement le fil d’exécution qui gère les résultats se retrouverait bloqué.

imap(func, iterable[, chunksize])

A lazier version of map().

L’argument chunksize est le même que celui utilisé par la méthode map(). Pour de très longs itérables, utiliser une grande valeur pour chunksize peut faire s’exécuter la tâche beaucoup plus rapidement qu’en utilisant la valeur par défaut de 1.

Aussi, si chucksize vaut 1 alors la méthode next() de l’itérateur renvoyé par imap() prend un paramètre optionnel timeout : next(timeout) lève une multiprocessing.TimeoutError si le résultat ne peut pas être renvoyé avant timeout secondes.

imap_unordered(func, iterable[, chunksize])

Identique à imap() si ce n’est que l’ordre des résultats de l’itérateur renvoyé doit être considéré comme arbitraire. (L’ordre n’est garanti que quand il n’y a qu’un worker.)

starmap(func, iterable[, chunksize])

Semblable à map() à l’exception que les éléments d”iterable doivent être des itérables qui seront dépaquetés comme arguments pour la fonction.

Par conséquent un iterable [(1,2), (3, 4)] donnera pour résultat [func(1,2), func(3,4)].

Nouveau dans la version 3.3.

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

Une combinaison de starmap() et map_async() qui itère sur iterable (composé d’itérables) et appelle func pour chaque itérable dépaqueté. Renvoie l’objet résultat.

Nouveau dans la version 3.3.

close()

Empêche de nouvelles tâches d’être envoyées à la pool. Les processus workers se terminent une fois que toutes les tâches ont été complétées.

terminate()

Stoppe immédiatement les processus workers sans finaliser les travaux courants. Quand l’objet pool est collecté par le ramasse-miettes, sa méthode terminate() est appelée immédiatement.

join()

Attend que les processus workers se terminent. Il est nécessaire d’appeler close() ou terminate() avant d’utiliser join().

Nouveau dans la version 3.3: Les bassins de workers supportent maintenant le protocole des gestionnaires de contexte – voir Le type gestionnaire de contexte. __enter__() renvoie l’objet pool et __exit__() appelle terminate().

class multiprocessing.pool.AsyncResult

La classe des résultats renvoyés par Pool.apply_async() et Pool.map_async().

get([timeout])

Renvoie le résultat quand il arrive. Si timeout n’est pas None et que le résultat n’arrive pas avant timeout secondes, une multiprocessing.TimeoutError est levée. Si l’appel distance lève une exception, alors elle est relayée par get().

wait([timeout])

Attend que le résultat soit disponible ou que timeout secondes s’écoulent.

ready()

Renvoie True ou False suivant si la tâche est accomplie.

successful()

Renvoie True ou False suivant si la tâche est accomplie sans lever d’exception. Lève une AssertionError si le résultat n’est pas prêt.

Les exemples suivants présentent l’utilisation d’un bassin de workers :

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

17.2.2.10. Auditeurs et Clients

Habituellement l’échange de messages entre processus est réalisé en utilisant des queues ou des objets Connection renvoyés par Pipe().

Cependant, le module multiprocessing.connection permet un peu plus de flexibilité. Il fournit un message de plus haut-niveau orienté API pour gérer des connecteurs ou des tubes nommés sous Windows. Il gère aussi l’authentification par condensat (digest authentication en anglais) en utilisant le module hmac, et pour interroger de multiples connexions en même temps.

multiprocessing.connection.deliver_challenge(connection, authkey)

Envoie un message généré aléatoirement à l’autre bout de la connexion et attend une réponse.

Si la réponse correspond au condensat du message avec la clé authkey, alors un message de bienvenue est envoyé à l’autre bout de la connexion. Autrement, une AuthenticationError est levée.

multiprocessing.connection.answer_challenge(connection, authkey)

Reçoit un message, calcule le condensat du message en utilisant la clé authkey, et envoie le condensat en réponse.

Si un message de bienvenue n’est pas reçu, une AuthenticationError est levée.

multiprocessing.connection.Client(address[, family[, authkey]])

Essaie d’établir une connexion avec l’auditeur qui utilise l’adresse address, renvoie une Connection.

Le type de la connexion est déterminé par l’argument family, mais il peut généralement être omis puisqu’il peut être inféré depuis le format d”address. (Voir Formats d’adresses)

Si authkey est passée et n’est pas None, elle doit être une chaîne d’octets et sera utilisée comme clé secrète pour le défi d’authentification basé sur HMAC. Aucune authentification n’est réalisée si authkey est None. Une AuthenticationError est levée si l’authentification échoue. Voir Clés d’authentification.

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

Une enveloppe autour d’un connecteur lié ou un tube nommé sous Windows qui écoute pour des connexions.

address est l’adresse à utiliser par le connecteur lié ou le tube nommé de l’objet auditeur.

Note

Si une adresse “0.0.0.0” est utilisée, l’adresse ne sera pas un point d’accès connectable sous Windows. Si vous avez besoin d’un point d’accès connectable, utilisez “127.0.0.1”.

family est le type de connecteur (ou tube nommé) à utiliser. Cela peut être l’une des chaînes 'AF_INET' (pour un connecteur TCP), 'AF_UNIX' (pour un connecteur Unix) ou 'AF_PIPE' (pour un tube nommé sous Windows). Seulement le premier d’entre eux est garanti d’être disponible. Si family est None, la famille est inférée depuis le format d”address. Si address est aussi None, la famille par défaut est utilisée. La famille par défaut est supposée être la plus rapide disponible. Voir Formats d’adresses. Notez que si la family est 'AF_UNIX' et qu”address est None, le connecteur est créé dans un répertoire temporaire privé créé avec tempfile.mkstemp().

Si l’objet auditeur utilise un connecteur alors backlog (1 par défaut) est passé à la méthode listen() du connecteur une fois qu’il a été lié.

Si authkey est passée et n’est pas None, elle doit être une chaîne d’octets et sera utilisée comme clé secrète pour le défi d’authentification basé sur HMAC. Aucune authentification n’est réalisée si authkey est None. Une AuthenticationError est levée si l’authentification échoue. Voir Clés d’authentification.

accept()

Accepte une connexion sur le connecteur lié ou le tube nommé de l’objet auditeur et renvoie un objet Connection. Si la tentative d’authentification échoue, une AuthenticationError est levée.

close()

Ferme le connecteur lié ou le tube nommé de l’objet auditeur. La méthode est appelée automatiquement quand l’auditeur est collecté par le ramasse-miettes. Il est cependant conseillé de l’appeler explicitement.

Les objets auditeurs ont aussi les propriétés en lecture seule suivantes :

address

L’adresse utilisée par l’objet auditeur.

last_accepted

L’adresse depuis laquelle a été établie la dernière connexion. None si aucune n’est disponible.

Nouveau dans la version 3.3: Les objets auditeurs supportent maintenant le protocole des gestionnaires de contexte – voir Le type gestionnaire de contexte. __enter__() renvoie l’objet auditeur, et __exit__() appelle close().

multiprocessing.connection.wait(object_list, timeout=None)

Attend qu’un objet d”object_list soit prêt. Renvoie la liste de ces objets d”object_list qui sont prêts. Si timeout est un nombre flottant, l’appel bloquera au maximum ce nombre de secondes. Si timeout est None, l’appelle bloquera pour une durée non limitée. Un timeout négatif est équivalent à un timeout nul.

Pour Unix et Windows, un objet peut apparaître dans object_list s’il est

Une connexion (socket en anglais) est prête quand il y a des données disponibles en lecture dessus, ou que l’autre bout a été fermé.

Unix: wait(object_list, timeout) est en grande partie équivalente à select.select(object_list, [], [], timeout). La différence est que, si select.select() est interrompue par un signal, elle peut lever une OSError avec un numéro d’erreur EINTR, alors que wait() ne le fera pas.

Windows : un élément d”object_list doit être soit un identifiant waitable (en accord avec la définition utilisée par la documentation de la fonction Win32 WaitForMultipleObjects()), soit un objet avec une méthode fileno() qui renvoie un identifiant de connecteur ou de tube (notez que les identifiants de tubes et de connecteurs ne sont pas des identifiants waitables).

Nouveau dans la version 3.3.

Exemples

Le code serveur suivant crée un auditeur qui utilise 'secret password' comme clé d’authentification. Il attend ensuite une connexion et envoie les données au client :

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

Le code suivant se connecte au serveur et en reçoit des données :

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

Le code suivant utilise wait() pour attendre des messages depuis plusieurs processus à la fois :

import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

17.2.2.10.1. Formats d’adresses

  • Une adresse 'AF_INET' est un tuple de la forme (hostname, port)hostname est une chaîne et port un entier.

  • Une adresse 'AF_UNIX' est une chaîne représentant un nom de fichier sur le système de fichiers.

  • Une adresse 'AF_PIPE' est une chaîne de la forme

    r'\.\pipe{PipeName}'. Pour utiliser un Client() pour se connecter à un tube nommé sur une machine distante appelée ServerName, il faut plutôt utiliser une adresse de la forme r'\ServerName\pipe{PipeName}'.

Notez que toute chaîne commençant par deux antislashs est considérée par défaut comme l’adresse d’un 'AF_PIPE' plutôt qu’une adresse 'AF_UNIX'.

17.2.2.11. Clés d’authentification

Quand Connection.recv est utilisée, les données reçues sont automatiquement désérialisées par pickle. Malheureusement désérialiser des données depuis une source non sûre constitue un risque de sécurité. Par conséquent Listener et Client() utilisent le module hmac pour fournir une authentification par condensat.

Une clé d’authentification est une chaîne d’octets qui peut être vue comme un mot de passe : quand une connexion est établie, les deux interlocuteurs vont demander à l’autre une preuve qu’il connaît la clé d’authentification. (Démontrer que les deux utilisent la même clé n’implique pas d’échanger la clé sur la connexion.)

Si l’authentification est requise et qu’aucune clé n’est spécifiée alors la valeur de retour de current_process().authkey est utilisée (voir Process). Celle valeur est automatiquement héritée par tout objet Process créé par le processus courant. Cela signifie que (par défaut) tous les processus d’un programme multi-processus partageront une clé d’authentification unique qui peut être utilisée pour mettre en place des connexions entre-eux.

Des clés d’authentification adaptées peuvent aussi être générées par os.urandom().

17.2.2.12. Journalisation

Un certain support de la journalisation est disponible. Notez cependant que le le paquet logging n’utilise pas de verrous partagés entre les processus et il est donc possible (dépendant du type de gestionnaire) que les messages de différents processus soient mélangés.

multiprocessing.get_logger()

Renvoie le journaliseur utilisé par multiprocessing. Si nécessaire, un nouveau sera créé.

À sa première création le journaliseur a pour niveau logging.NOTSET et pas de gestionnaire par défaut. Les messages envoyés à ce journaliseur ne seront pas propagés par défaut au journaliseur principal.

Notez que sous Windows les processus fils n’hériteront que du niveau du journaliseur du processus parent – toute autre personnalisation du journaliseur ne sera pas héritée.

multiprocessing.log_to_stderr()

Cette fonction effectue un appel à get_logger() mais en plus de renvoyer le journaliseur créé par get_logger, elle ajoute un gestionnaire qui envoie la sortie sur sys.stderr en utilisant le format '[%(levelname)s/%(processName)s] %(message)s'.

L’exemple ci-dessous présente une session avec la journalisation activée :

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

Pour un tableau complet des niveaux de journalisation, voir le module logging.

17.2.2.13. Le module multiprocessing.dummy

multiprocessing.dummy réplique toute l’API de multiprocessing mais n’est rien de plus qu’un wrapper autour du module threading.

17.2.3. Lignes directrices de programmation

Il y a certaines lignes directrices et idiomes auxquels il faut adhérer en utilisant multiprocessing.

17.2.3.1. Toutes les méthodes de démarrage

Les règles suivantes s’appliquent aux méthodes de démarrage.

Éviter les états partagés

Autant que possible, vous devriez éviter de déplacer de larges données entre les processus.

Il est probablement meilleur de s’en tenir à l’utilisation de queues et tubes pour la communication entre processus plutôt que d’utiliser des primitives de synchronisation plus bas-niveau.

Sérialisation

Assurez-vous que les arguments passés aux méthodes des mandataires soient sérialisables (pickables).

Sûreté des mandataires à travers les fils d’exécution

N’utilisez pas d’objet mandataire depuis plus d’un fil d’exécution à moins que vous ne le protégiez avec un verrou.

(Il n’y a jamais de problème avec plusieurs processus utilisant un même mandataire.)

Attendre les processus zombies

Sous Unix quand un processus se termine mais n’est pas attendu, il devient un zombie. Il ne devrait jamais y en avoir beaucoup parce que chaque fois qu’un nouveau processus démarre (ou que active_children() est appelée) tous les processus complétés qui n’ont pas été attendus le seront. Aussi appeler la méthode Process.is_alive d’un processus terminé attendra le processus. Toutefois il est probablement une bonne pratique d’attendre explicitement tous les processus que vous démarrez.

Préférez hériter que sérialiser/désérialiser

Quand vous utilisez les méthodes de démarrage spawn ou forkserver, de nombreux types de multiprocessing nécessitent d’être sérialisées pour que les processus enfants puissent les utiliser. Cependant, il faut généralement éviter d’envoyer des objets partagés aux autres processus en utilisant des tubes ou des queues. Vous devriez plutôt vous arranger pour qu’un processus qui nécessite l’accès à une ressource partagée créée autre part qu’il en hérite depuis un de ses processus ancêtres.

Éviter de terminer les processus

Utiliser la méthode Process.terminate pour stopper un processus risque de casser ou de rendre indisponible aux autres processus des ressources partagées (comme des verrous, sémaphores, tubes et queues) actuellement utilisée par le processus.

Il est donc probablement préférable de n’utiliser Process.terminate que sur les processus qui n’utilisent jamais de ressources partagées.

Attendre les processus qui utilisent des queues

Gardez à l’esprit qu’un processus qui a placé des éléments dans une queue attendra que tous les éléments mis en tampon soient consommés par le fil d’exécution « consommateur » du tube sous-jacent avant de se terminer. (Le processus enfant peut appeler la méthode Queue.cancel_join_thread de la queue pour éviter ce comportement.)

Cela signifie que chaque fois que vous utilisez une queue vous devez vous assurer que tous les éléments qui y ont été placés seront effectivement supprimés avant que le processus ne soit attendu. Autrement vous ne pouvez pas être sûr que les processus qui ont placé des éléments dans la queue se termineront. Souvenez-vous aussi que tous les processus non daemons seront attendus automatiquement.

L’exemple suivant provoquera un interblocage :

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

Une solution ici serait d’intervertir les deux dernières lignes (ou simplement supprimer la ligne p.join()).

Passer explicitement les ressources aux processus fils

Sous Unix en utilisant la méthode de démarrage fork, un processus fils peut utiliser une ressource partagée créée par un processus parent en utilisant une ressource globale. Cependant, il est préférable de passer l’objet en argument au constructeur du processus fils.

En plus de rendre le code (potentiellement) compatible avec Windows et les autres méthodes de démarrage, cela assure aussi que tant que le processus fils est en vie, l’objet ne sera pas collecté par le ramasse-miettes du processus parent. Cela peut être important si certaines ressources sont libérées quand l’objet est collecté par le ramasse-miettes du processus parent.

Donc par exemple

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

devrait être réécrit comme

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

Faire attention à remplacer sys.stdin par un objet « file-like »

À l’origine, multiprocessing appelait inconditionnellement :

os.close(sys.stdin.fileno())

dans la méthode multiprocessing.Process._bootstrap() — cela provoquait des problèmes avec les processus imbriqués. Cela peut être changé en :

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

Qui résout le problème fondamental des collisions entre processus provoquant des erreurs de mauvais descripteurs de fichiers, mais introduit un potentiel danger pour les applications qui remplacent sys.stdin() avec un « file-like object » ayant une sortie bufferisée. Ce danger est que si plusieurs processus appellent close() sur cet objet file-like, cela peut amener les données à être transmises à l’objet à plusieurs reprises, résultant en une corruption.

Si vous écrivez un objet file-like et implémentez votre propre cache, vous pouvez le rendre sûr pour les forks en stockant le pid chaque fois que vous ajoutez des données au cache, et annulez le cache quand le pip change. Par exemple :

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

Pour plus d’informations, voir bpo-5155, bpo-5313 et bpo-5331

17.2.3.2. Les méthodes de démarrage spawn et forkserver

Certaines restrictions ne s’appliquent pas à la méthode de démarrage fork.

Plus de sérialisation

Assurez-vous que tous les argument de Process.__init__() sont sérialisables avec pickle. Aussi, si vous héritez de Process, assurez-vous que toutes les instances sont sérialisables quand la méthode Process.start est appelée.

Variables globales

Gardez en tête que si le code exécuté dans un processus fils essaie d’accéder à une variable globale, alors la valeur qu’il voit (s’il y en a une) pourrait ne pas être la même que la valeur du processus parent au moment même où Process.start est appelée.

Cependant, les variables globales qui sont juste des constantes de modules ne posent pas de problèmes.

Importation sûre du module principal

Assurez-vous que le module principal peut être importé en toute sécurité par un nouvel interpréteur Python sans causer d’effets de bord inattendus (comme le démarrage d’un nouveau processus).

Par exemple, utiliser la méthode de démarrage spawn ou forkserver pour lancer le module suivant échouerait avec une RuntimeError :

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

Vous devriez plutôt protéger le « point d’entrée » du programme en utilisant if __name__ == '__main__': comme suit :

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(La ligne freeze_support() peut être omise si le programme est uniquement lancé normalement et pas gelé.)

Cela permet aux interpréteurs Python fraîchement instanciés d’importer en toute sécurité le module et d’exécution ensuite la fonction foo().

Des restrictions similaires s’appliquent si une pool ou un gestionnaire est créé dans le module principal.

17.2.4. Exemples

Démonstration de comment créer et utiliser des gestionnaires et mandataires personnalisés :

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

En utilisant Pool :

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

Un exemple montrant comment utiliser des queues pour alimenter en tâches une collection de processus workers et collecter les résultats :

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()