multiprocessing
— Paralelismo basado en procesos¶
Código fuente: Lib/multiprocessing/
Introducción¶
multiprocessing
is a package that supports spawning processes using an
API similar to the threading
module. The multiprocessing
package
offers both local and remote concurrency, effectively side-stepping the
Global Interpreter Lock by using
subprocesses instead of threads. Due
to this, the multiprocessing
module allows the programmer to fully
leverage multiple processors on a given machine. It runs on both Unix and
Windows.
El módulo multiprocessing
también introduce API que no tienen análogos en el módulo threading
. Un buen ejemplo de esto es el objeto Pool
que ofrece un medio conveniente de paralelizar la ejecución de una función a través de múltiples valores de entrada, distribuyendo los datos de entrada a través de procesos (paralelismo de datos). El siguiente ejemplo demuestra la práctica común de definir tales funciones en un módulo para que los procesos secundarios puedan importar con éxito ese módulo. Este ejemplo básico de paralelismo de datos usando Pool
,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
imprimirá la salida estándar
[1, 4, 9]
La clase Process
¶
En multiprocessing
, los procesos se generan creando un objeto Process
y luego llamando a su método start()
. Process
sigue la API de threading.Thread
. Un ejemplo trivial de un programa multiproceso es
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
Para mostrar las IDs individuales involucradas en el proceso, aquí hay un ejemplo ampliado:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
Para obtener una explicación de por qué es necesaria la parte if __name__ == '__main__'
, consulte Pautas de programación.
Contextos y métodos de inicio¶
Dependiendo de la plataforma, multiprocessing
admite tres formas de iniciar un proceso. Estos métodos de inicio start methods son
- Generación (spawn)
The parent process starts a fresh python interpreter process. The child process will only inherit those resources necessary to run the process object’s
run()
method. In particular, unnecessary file descriptors and handles from the parent process will not be inherited. Starting a process using this method is rather slow compared to using fork or forkserver.Disponible en Unix y Windows. Por defecto en Windows y macOS.
- fork
El proceso parental usa
os.fork()
para bifurcar el intérprete de Python. El proceso hijo, cuando comienza, es efectivamente idéntico al proceso parental. Todos los recursos del proceso parental son heredados por el proceso hijo. Tenga en cuenta que bifurcar (forking) de forma segura un proceso multihilo es problemático.Disponible solo en Unix. Por defecto en Unix.
- forkserver
Cuando el programa se inicia y selecciona el método de inicio forkserver, se inicia un proceso de servidor. A partir de ese momento, cada vez que se necesite un nuevo proceso, el proceso parental se conecta al servidor y solicita que bifurque un nuevo proceso. El proceso del servidor fork es de un solo hilo, por lo que es seguro usarlo
os.fork()
. No se heredan recursos innecesarios.Disponible en plataformas Unix que admiten pasar descriptores de archivo a través de tuberías (pipes) Unix.
Distinto en la versión 3.8: En macOS, el método de inicio spawn ahora es el predeterminado. El método de inicio fork debe considerarse inseguro ya que puede provocar bloqueos del subproceso. Consulte bpo-33725.
Distinto en la versión 3.4: spawn fue añadido en todas las plataformas Unix, y forkserver fue agregado para algunas plataformas Unix. Los procesos hijos (child processes) ya no heredan todos los identificadores heredables de los procesos parentales en Windows.
En Unix, los métodos de inicio spawn o forkserver también iniciarán un proceso resource tracker que rastrea los recursos del sistema con nombre no vinculados (como los nombrados semáforos o objetos SharedMemory
) creado por procesos del programa. Cuando todos los procesos han salido, el rastreador de recursos desvincula cualquier objeto rastreado restante. Por lo general, no debería haber ninguno, pero si un proceso fue eliminado por una señal, puede haber algunos recursos «filtrados». (Ni los semáforos filtrados ni los segmentos de memoria compartida se desvincularán automáticamente hasta el próximo reinicio. Esto es problemático para ambos objetos porque el sistema solo permite un número limitado de semáforos con nombre, y los segmentos de memoria compartida ocupan algo de espacio en la memoria principal).
Para seleccionar un método de inicio, utilice la función set_start_method()
en la cláusula if __name__ == '__main__'
del módulo principal. Por ejemplo:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method()
no debería ser usada más de una vez en el programa.
Alternativamente, se puede usar get_context()
para obtener un objeto de contexto. Los objetos de contexto tienen la misma API que el módulo de multiprocesamiento, y permiten utilizar múltiples métodos de inicio en el mismo programa.
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
Tenga en cuenta que los objetos relacionados con un contexto pueden no ser compatibles con procesos para un contexto diferente. En particular, los locks creados con el contexto fork no se pueden pasar a los procesos iniciados con los métodos de inicio spawn o forkserver .
Una biblioteca que quiera usar un método de inicio particular probablemente debería usar get_context()
para evitar interferir con la elección del usuario de la biblioteca.
Advertencia
Los métodos de inicio 'spawn'
y 'forkserver'
actualmente no pueden ser usados con ejecutables «congelados» (frozen)(es decir, binarios producidos por paquetes como PyInstaller y cx_Freeze) en Unix. El método de inicio 'fork'
funciona.
Intercambiando objetos entre procesos¶
multiprocessing
admite dos tipos de canales de comunicación entre procesos:
Colas (*queues*)
La clase
Queue
es prácticamente un clon dequeue.Queue
. Por ejemplo:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()Las colas (queues) son hilos y procesos seguro.
Tuberías (*Pipes*)
La función
Pipe()
retorna un par de objetos de conexión conectados por una tubería (pipe) que, por defecto, es un dúplex (bidireccional). Por ejemplo:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()Los dos objetos de conexión retornados por
Pipe()
representan los dos extremos de la tubería (pipe). Cada objeto de conexión tiene los métodossend()
yrecv()
(entre otros). Tenga en cuenta que los datos en una tubería pueden corromperse si dos procesos (o hilos) intentan leer o escribir en el mismo (same) extremo de la tubería al mismo tiempo. Por supuesto, no hay riesgo de corrupción por procesos que utilizan diferentes extremos de la tubería (pipe) al mismo tiempo.
Sincronización entre procesos¶
multiprocessing
contiene equivalentes de todas las sincronizaciones primitivas de threading
. Por ejemplo, se puede usar un candado (lock) para garantizar que solo un proceso se imprima a la salida estándar a la vez:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
Sin usar el candado (lock) de salida de los diferentes procesos, es probable que todo se mezcle.
Usando una piscina de trabajadores (pool of workers)¶
La clase Pool
representa procesos de piscina de trabajadores (pool of workers). Tiene métodos que permiten que las tareas se descarguen a los procesos de trabajo de diferentes maneras.
Por ejemplo:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
Tenga en cuenta que los métodos de una piscina (pool) solo deben ser utilizados por el proceso que lo creó.
Nota
La funcionalidad en este paquete requiere que los procesos hijos (children) puedan importar el módulo __main__
. Esto está cubierto en Pautas de programación sin embargo, vale la pena señalarlo aquí. Esto significa que algunos ejemplos, como multiprocessing.pool.Pool
no funcionarán en el intérprete interactivo. Por ejemplo:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(Si intenta esto, en realidad generará tres trazas completas intercaladas de forma semialeatoria, y luego tendrá que detener el proceso principal de alguna manera)
Referencia¶
El paquete multiprocessing
mayoritariamente replica la API del módulo threading
.
Process
y excepciones¶
-
class
multiprocessing.
Process
(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶ Los objetos de proceso representan la actividad que se ejecuta en un proceso separado. La clase
Process
tiene equivalentes para todos los métodosthreading.Thread
.El constructor siempre debe llamarse con argumentos de palabras clave. group siempre debe ser
None
; existe únicamente por compatibilidad conthreading.Thread
. target es el objeto invocable a ser llamado por el métodorun()
. El valor predeterminado esNone
, lo que significa que nada es llamado. name es el nombre del proceso (consultename
para más detalles). args es la tupla de argumento para la invocación de destino. kwargs es un diccionario de argumentos de palabras clave para la invocación de destino. Si se proporciona, el argumento daemon solo de palabra clave establece el procesodaemon
enTrue
oFalse
. SiNone
(el valor predeterminado), este indicador se heredará del proceso de creación.Por defecto, ningún argumento es pasado a target.
Si una subclase anula al constructor, debe asegurarse de que invoca al constructor de la clase base (
Process.__init__()
) antes de hacer cualquier otra cosa al proceso.Distinto en la versión 3.3: Añadido el argumento daemon.
-
run
()¶ Método que representa la actividad del proceso.
Puede anular este método en una subclase. El método estándar
run()
invoca el objeto invocable pasado al constructor del objeto como argumento objetivo, si lo hay, con argumentos posicionares y de palabras clave tomados de los argumentos args y kwargs, respectivamente.
-
start
()¶ Comienza la actividad del proceso.
Esto debe llamarse como máximo una vez por objeto de proceso. Organiza la invocación del método
run()
del objeto en un proceso separado.
-
join
([timeout])¶ Si el argumento opcional timeout es
None
(el valor predeterminado), el método se bloquea hasta que el proceso cuyo métodojoin()
se llama termina. Si timeout es un número positivo, bloquea como máximo timeout segundos. Tenga en cuenta que el método retornaNone
si su proceso finaliza o si el método agota el tiempo de espera. Verifique el procesoexitcode
para determinar si terminó.Un proceso puede unirse muchas veces.
Un proceso no puede unirse a sí mismo porque esto provocaría un punto muerto. Es un error intentar unirse a un proceso antes de que se haya iniciado.
-
name
¶ El nombre del proceso. El nombre es una cadena utilizada solo con fines de identificación. No tiene semántica. Múltiples procesos pueden tener el mismo nombre.
El nombre inicial es establecido por el constructor. Si no se proporciona un nombre explícito al constructor, se forma un nombre usando “Process-N:sub:`1`:N:sub:`2`:…:N:sub:`k`” se construye, donde cada N:sub:`k` es el N-enésimo hijo del parental.
-
is_alive
()¶ Retorna si el proceso está vivo.
Aproximadamente, un objeto de proceso está vivo desde el momento en que el método
start()
retorna hasta que finaliza el proceso hijo.
-
daemon
¶ La bandera del proceso daemon , es un valor booleano. Esto debe establecerse antes de que
start()
sea llamado.El valor inicial se hereda del proceso de creación.
Cuando un proceso sale, intenta terminar todos sus procesos demoníacos (daemonic) hijos.
Tenga en cuenta que un proceso demoníaco (daemonic) no puede crear procesos hijos. De lo contrario, un proceso demoníaco (daemonic) dejaría a sus hijos huérfanos si se termina cuando finaliza su proceso parental. Además, estos no son daemons o servicios de Unix, son procesos normales qué finalizarán (y no se unirán) si los procesos no demoníacos han salido.
Además de API
threading.Thread
, los objetosProcess
también admiten los siguientes atributos y métodos:-
pid
¶ Retorna el ID del proceso. Antes de que se genere el proceso, esto será
None
.
-
exitcode
¶ El código de salida del hijo. Esto será
None
si el proceso aún no ha terminado. Un valor negativo -N indica que el hijo fue terminado por la señal N.
-
authkey
¶ La clave de autenticación del proceso (una cadena de bytes).
Cuando el
multiprocessing
se inicializa el proceso principal se le asigna una cadena aleatoria usandoos.urandom()
.Cuando se crea un objeto
Process
, este heredará la clave de autenticación de su proceso parental, aunque esto puede cambiarse configurandoauthkey
a otra cadena de bytes.Consulte Llaves de autentificación.
-
sentinel
¶ Un identificador numérico de un objeto del sistema que estará «listo» cuando finalice el proceso.
Se puede usar este valor si desea esperar varios eventos a la vez usando
multiprocessing.connection.wait()
. De lo contrario, llamar ajoin()
es más simple.En Windows, este es un sistema operativo manejable con la familia de llamadas API
WaitForSingleObject
yWaitForMultipleObjects
. En Unix, este es un descriptor de archivo utilizable con primitivas del móduloselect
.Nuevo en la versión 3.3.
-
terminate
()¶ Terminar el proceso. En Unix, esto se hace usando la señal
SIGTERM
; en Windows se utilizaTerminateProcess()
. Tenga en cuenta que los manejadores de salida y finalmente las cláusulas, etc., no se ejecutarán.Tenga en cuenta que los procesos descendientes del proceso no finalizarán – simplemente quedarán huérfanos.
Advertencia
Si este método se usa cuando el proceso asociado está usando una tubería (pipe) o una cola(queue), entonces la tubería o la cola pueden corromperse y pueden quedar inutilizables por otro proceso. Del mismo modo, si el proceso ha adquirido un lock o un semáforo, etc., su finalización puede provocar el bloqueo de otros procesos.
-
kill
()¶ Siempre como
terminate()
pero utilizando la señalSIGKILL
en Unix.Nuevo en la versión 3.7.
-
close
()¶ El cierre del objeto
Process
, libera todos los recursos asociados a él.ValueError
se lanza si el proceso subyacente aún se está ejecutando. Una vezclose()
se retorna con éxito, la mayoría de los otros métodos y atributos del objetoProcess
lanzaráValueError
.Nuevo en la versión 3.7.
Tenga en cuenta que los métodos
start()
,join()
,is_alive()
,terminate()
yexitcode
deberían solo ser llamados por el proceso que creó el objeto del proceso.Ejemplo de uso de algunos métodos de la
Process
:>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
-
-
exception
multiprocessing.
ProcessError
¶ La clase base de todas las excepciones
multiprocessing
.
-
exception
multiprocessing.
BufferTooShort
¶ La excepción
Connection.recv_bytes_into()
es lanzada cuando el objeto de búfer suministrado es demasiado pequeño para el mensaje leído.Si
e
es una instancia deBufferTooShort
entoncese.args[0]
dará el mensaje como una cadena de bytes.
-
exception
multiprocessing.
AuthenticationError
¶ Lanzada cuando hay un error de autenticación.
-
exception
multiprocessing.
TimeoutError
¶ Lanzada por métodos con un tiempo de espera (timeout) cuando este expira.
Tuberías (Pipes) y Colas (Queues)¶
Cuando se usan múltiples procesos, uno generalmente usa el paso de mensajes para la comunicación entre procesos y evita tener que usar primitivas de sincronización como locks.
Para pasar mensajes se puede usar Pipe()
(para una conexión entre dos procesos) o una cola (queue)(que permite múltiples productores y consumidores).
Los tipos Queue
, SimpleQueue
y JoinableQueue
son colas multi-productor, multi-consumidor FIFO (primero en entrar, primero en salir) modeladas en queue.Queue
en la biblioteca estándar. Se diferencian en que Queue
carece de task_done()
y join()
métodos introducidos en Python 2.5 queue.Queue
class.
Si usa JoinableQueue
, entonces debe llamar a JoinableQueue.task_done()
para cada tarea eliminada de la cola (queue) o de lo contrario el semáforo utilizado para contar el número de tareas sin terminar puede eventualmente desbordarse, lanzando un excepción.
Tenga en cuenta que también se puede crear una cola compartida mediante el uso de un objeto de administrador – consulte Administradores (Managers).
Nota
multiprocessing
utiliza las excepciones habituales queue.Empty
y queue.Full
para indicar un tiempo de espera. No están disponibles en el espacio de nombres multiprocessing
, por lo que debe importarlos desde queue
.
Nota
Cuando un objeto se coloca en una cola (queue), el objeto se serializa (pickled) y luego un subproceso de fondo vacía los datos serializados a una tubería (pipe) subyacente. Esto tiene algunas consecuencias que son un poco sorprendentes, pero no deberían causar dificultades prácticas: si realmente causan molestias, se puede usar una cola creada con un manager.
Después de poner un objeto en una cola vacía, puede haber un retraso infinitesimal antes de que el método de la cola
empty()
retorneFalse
yget_nowait()
puede retornar sin lanzarqueue.Empty
.Si varios procesos están poniendo en cola objetos, es posible que los objetos se reciban en el otro extremo fuera de orden. Sin embargo, los objetos en cola por el mismo proceso siempre estarán en el orden esperado entre sí.
Advertencia
Si se termina un proceso usando Process.terminate()
o os.kill()
mientras intenta usar una clase Queue
, es probable que los datos de la cola(queue) se corrompan. Esto puede hacer que cualquier otro proceso obtenga una excepción cuando intente usar la cola más adelante.
Advertencia
Como se mencionó anteriormente, si un proceso hijo ha puesto elementos en una cola (queue) (y no ha utilizado JoinableQueue.cancel_join_thread
), entonces ese proceso no terminará hasta que todos los elementos almacenados en búfer hayan sido vaciados a la tubería (pipe).
Esto significa que si intenta unirse a ese proceso, puede obtener un punto muerto a menos que esté seguro de que todos los elementos que se han puesto en la cola (queue) se han consumido. Del mismo modo, si el proceso hijo no es daemonic, el proceso parental puede bloquearse en la salida cuando intenta unir todos sus elementos hijos no daemonic.
Tenga en cuenta que una cola (queue) creada con un administrador no tiene este problema. Consulte Pautas de programación.
Para ver un ejemplo del uso de colas para la comunicación entre procesos, consulte Ejemplos.
-
multiprocessing.
Pipe
([duplex])¶ Retorna un par de objetos ``(conn1, conn2)`` de la
Connection
que representan los extremos de una tubería (pipe).Si duplex es
True
(el valor predeterminado), entonces la tubería (pipe) es bidireccional. Si duplex esFalse
, entonces la tubería es unidireccional:conn1
solo se puede usar para recibir mensajes yconn2
solo se puede usar para enviar mensajes.
-
class
multiprocessing.
Queue
([maxsize])¶ Retorna un proceso de cola (queue) compartida implementado utilizando una tubería (pipe) y algunos candados/semáforos (locks/semaphores). Cuando un proceso pone por primera vez un elemento en la cola, se inicia un hilo alimentador que transfiere objetos desde un búfer a la tubería.
Las excepciones habituales
queue.Empty
yqueue.Full
del módulo de la biblioteca estándarqueue
se generan para indicar tiempos de espera.La
Queue
implementa todos los métodos de laqueue.Queue
excepto portask_done()
yjoin()
.-
qsize
()¶ Retorna el tamaño aproximado de la cola (queue). Debido a la semántica multiproceso/multiprocesamiento, este número no es confiable.
Tenga en cuenta que esto puede lanzar
NotImplementedError
en plataformas Unix como Mac OS X dondesem_getvalue()
no está implementado.
-
empty
()¶ Retorna
True
si la cola (queue) está vacía, de lo contrario retornaFalse
. Debido a la semántica multiproceso/multiprocesamiento, esto no es confiable.
-
full
()¶ Retorna
True
si la cola (queue) está llena, de lo contrario retornaFalse
. Debido a la semántica multiproceso/multiprocesamiento, esto no es confiable.
-
put
(obj[, block[, timeout]])¶ Pone obj en la cola. Si el argumento opcional block es
True
(el valor predeterminado) y timeout esNone
(el valor predeterminado), se bloquea si es necesario hasta que haya un espacio disponible. Si timeout es un número positivo, bloquea a lo sumo timeout segundos y genera la excepciónqueue.Full
si no hay espacio libre disponible en ese tiempo. De lo contrario (block esFalse
), y coloca un elemento en la cola si hay un espacio libre disponible de inmediato, de lo contrario, genera la excepciónqueue.Full
(timeout se ignora en ese caso).Distinto en la versión 3.8: Si la cola (queue) está cerrada,
ValueError
se lanza en lugar deAssertionError
.
-
put_nowait
(obj)¶ Equivalente a
put(obj, False)
.
-
get
([block[, timeout]])¶ Elimina y retorna un artículo de la cola (queue). Si un argumento opcional block es
True
(el valor predeterminado) y el timeout esNone
(el valor predeterminado), es bloqueado si es necesario hasta que un elemento esté disponible. Si el timeout es un número positivo, bloquea a lo sumo segundos y genera la excepciónqueue.Empty
si no había ningún elemento disponible dentro de ese tiempo. De lo contrario (el bloque esFalse
), retorna un elemento si hay uno disponible de inmediato, de lo contrario, levante la excepciónqueue.Empty
(timeout se ignora en ese caso).Distinto en la versión 3.8: Si la cola está cerrada, se lanza
ValueError
en lugar deOSError
.
-
get_nowait
()¶ Equivalente a
get(False)
.
La
multiprocessing.Queue
tiene algunos métodos adicionales que no se encuentran enqueue.Queue
. Estos métodos suelen ser innecesarios para la mayoría de los códigos:-
close
()¶ Indica que el proceso actual no colocará más datos en esta cola (queue). El subproceso en segundo plano se cerrará una vez que haya vaciado todos los datos almacenados en la tubería (pipe). Esto se llama automáticamente cuando la cola es recolectada por el recolector de basura.
-
join_thread
()¶ Unifica al hilo de fondo. Esto solo se puede usar después de que se ha llamado a
close()
. Esto se bloquea hasta que salga el hilo de fondo, asegurando que todos los datos en el búfer se hayan vaciado a la tubería (pipe).Por defecto, si un proceso no es el creador de la cola (queue), al salir intentará unirse al hilo de fondo de la cola. El proceso puede llamar a
cancel_join_thread()
para hacer que eljoin_thread()
no haga nada.
-
cancel_join_thread
()¶ Evita que
join_thread()
bloquee. En particular, esto evita que el subproceso en segundo plano se una automáticamente cuando finaliza el proceso; consultejoin_thread()
.Un mejor nombre para este método podría ser
allow_exit_without_flush()
. Es probable que provoque la pérdida de datos en cola (queue), y es casi seguro que no necesitará usarlos. Realmente solo está allí si necesita que el proceso actual salga inmediatamente sin esperar a vaciar los datos en cola en la tubería (pipe) subyacente, y no le importan los datos perdidos.
Nota
La funcionalidad de esta clase requiere una implementación de semáforo compartido en funcionamiento en el sistema operativo que es huésped (host). Sin uno, la funcionalidad en esta clase se deshabilitará, y los intentos de instanciar a
Queue
resultarán enImportError
. Consulte bpo-3770 para información adicional. Lo mismo es válido para cualquiera de los tipos de cola especializados que se enumeran a continuación.-
-
class
multiprocessing.
SimpleQueue
¶ Es un tipo simplificado
Queue
, muy similar a un lock dePipe
.-
empty
()¶ Retorna
True
si la cola (queue) está vacía, de otra manera retornaFalse
.
-
get
()¶ Eliminar y retornar un artículo de la cola (queue).
-
put
(item)¶ Pone item en la cola.
-
-
class
multiprocessing.
JoinableQueue
([maxsize])¶ JoinableQueue
, una subclaseQueue
, es una cola (queue) que además tiene los métodostask_done()
yjoin()
.-
task_done
()¶ Indica que una tarea anteriormente en cola (queue) está completa. Usado por los consumidores de la cola. Por cada
get()
utilizado para recuperar una tarea, una llamada posterior atask_done()
le dice a la cola que el procesamiento de la tarea se ha completado.Si un
join()
se está bloqueando actualmente, se reanudará cuando se hayan procesado todos los elementos (lo que significa quetask_done()
es llamado para cada elemento que había sido puesto en cola (queue) porput()
).Lanza un
ValueError
si es llamado más veces que elementos hay en una cola.
-
join
()¶ Se bloquea hasta que todos los elementos en una cola han sido recibidos y procesados.
El recuento de tareas no finalizadas aumenta cada vez que se agrega un elemento a la cola. El recuento disminuye cada vez que un consumidor llama a
task_done()
para indicar que el artículo se recuperó y todo el trabajo en él está completo. Cuando el recuento de tareas inacabadas cae a cero,join()
se desbloquea.
-
Miscelánea¶
-
multiprocessing.
active_children
()¶ Retorna una lista con todos los hijos del proceso actual.
Llamar a esto tiene el efecto secundario de «unir» (joining) cualquier proceso que ya haya finalizado.
-
multiprocessing.
cpu_count
()¶ Retorna el número de CPU en el sistema.
Este número no es equivalente al número de CPU que puede utilizar el proceso actual. El número de CPU utilizables se puede obtener con
len(os.sched_getaffinity(0))
Puedo lanzar
NotImplementedError
.Ver también
-
multiprocessing.
current_process
()¶ Retorna el objeto de la
Process
correspondiente al proceso actual.Un análogo de la
threading.current_thread()
.
-
multiprocessing.
parent_process
()¶ Retorna el objeto de la
Process
correspondiente al proceso parental decurrent_process()
. Para el proceso principal,parent_process` será ``None
.Nuevo en la versión 3.8.
-
multiprocessing.
freeze_support
()¶ Agrega soporte para cuando un programa que utiliza
multiprocessing
se haya congelado para producir un ejecutable de Windows. (Ha sido probado con py2exe, PyInstaller y cx_Freeze.)Es necesario llamar a esta función inmediatamente después de la línea principal del módulo if __name__ == “__main__”` . Por ejemplo:
from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
Si se omite la línea
freeze_support()
entonces se intenta comenzar el ejecutable congelado que lanzaráRuntimeError
.La llamada de
freeze_support()
no tiene efecto cuando es invocada por cualquier sistema operativo que no sea Windows. Además, si el módulo ha sido ejecutado en un intérprete de Python en Windows (y el programa no se ha congelado) entoncesfreeze_support()
no tiene efecto.
-
multiprocessing.
get_all_start_methods
()¶ Retorna una lista de los métodos de inicio admitidos, el primero de los cuales es el predeterminado. Los posibles métodos de inicio son
'fork'
,'spawn'
y'forkserver'
. En Windows solo está disponible'spawn'
. En Unix,'fork'
y'spawn'
siempre son compatibles, siendo'fork'
el valor predeterminado.Nuevo en la versión 3.4.
-
multiprocessing.
get_context
(method=None)¶ Retorna un objeto de contexto que tiene los mismos atributos que el módulo
multiprocessing
.Si el método method es
None
entonces el contexto predeterminado es retornado. Por lo contrario, method debería ser'fork'
,'spawn'
,'forkserver'
. Se lanzaValueError
if el método de inicio no esta disponible.Nuevo en la versión 3.4.
-
multiprocessing.
get_start_method
(allow_none=False)¶ Retorna el nombre del método de inicio que es utilizado para iniciar procesos.
Si el método de inicio no se ha solucionado y allow_none es falso, entonces el método de inicio se fija al predeterminado y se retorna el nombre. Si el método de inicio no se ha solucionado y allow_none es verdadero, se retorna
None
.The return value can be
'fork'
,'spawn'
,'forkserver'
orNone
.'fork'
is the default on Unix, while'spawn'
is the default on Windows and macOS.
Distinto en la versión 3.8: En macOS, el método de inicio spawn ahora es el predeterminado. El método de inicio fork debe considerarse inseguro ya que puede provocar bloqueos del subproceso. Consulte bpo-33725.
Nuevo en la versión 3.4.
-
multiprocessing.
set_executable
()¶ Establece la ruta del intérprete de Python para usar cuando se inicia un proceso secundario. (Por defecto se utiliza
sys.executable
). Los integradores probablemente necesiten hacer algo comoset_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
antes ellos pueden crear procesos hijos.
Distinto en la versión 3.4: Ahora es compatible con Unix cuando se usa el método de inicio
'spawn'
.
-
multiprocessing.
set_start_method
(method)¶ Se establece el método que se debe usar para iniciar procesos secundarios. method puede ser
'fork'
,'spawn'
o'forkserver'
.Tenga en cuenta que esto debería llamarse como máximo una vez, y debería protegerse dentro de la cláusula
if __name__ == '__main__'
del módulo principal.Nuevo en la versión 3.4.
Nota
multiprocessing
no contiene análogos de threading.active_count()
, threading.enumerate()
, threading.settrace()
, threading.setprofile()
, threading.Timer
, o threading.local
.
Objetos de conexión Connection Objects¶
Los objetos de conexión permiten el envío y la recepción de objetos serializables (pickable) o cadenas de caracteres seleccionables. Pueden considerarse como sockets conectados orientados a mensajes.
Los objetos de conexión usualmente son creados usando Pipe
– ver también Oyentes y Clientes (Listeners and Clients).
-
class
multiprocessing.connection.
Connection
¶ -
send
(obj)¶ Envía un objeto al otro extremo de la conexión que debe leerse usando
recv()
.El objeto debe ser serializable (pickable). Los serializados (pickable) muy grandes (aproximadamente 32 MiB+ , aunque depende del sistema operativo) pueden generar una excepción
ValueError
.
-
recv
()¶ Retorna un objeto enviado desde el otro extremo de la conexión usando
send()
. Se bloquea hasta que haya algo para recibir. Se lanzaEOFError
si no queda nada por recibir y el otro extremo está cerrado.
-
fileno
()¶ Retorna el descriptor de archivo o identificador utilizado por la conexión.
-
close
()¶ Cierra la conexión.
Esto se llama automáticamente cuando la conexión es basura recolectada.
-
poll
([timeout])¶ Retorna si hay datos disponibles para leer.
Si no se especifica timeout, se retornará de inmediato. Si timeout es un número, esto especifica el tiempo máximo en segundos para bloquear. Si timeout es
None
, se usa un tiempo de espera infinito.Tenga en cuenta que se pueden sondear varios objetos de conexión a la vez utilizando
multiprocessing.connection.wait()
.
-
send_bytes
(buffer[, offset[, size]])¶ Envía datos de bytes desde a bytes-like object como un mensaje completo.
Si se da offset, los datos se leen desde esa posición en buffer. Si se da size entonces se leerán muchos bytes del búfer. Los buffers muy grandes (aproximadamente 32 MiB+, aunque depende del sistema operativo) pueden generar una excepción
ValueError
-
recv_bytes
([maxlength])¶ Retorna un mensaje completo de datos de bytes enviados desde el otro extremo de la conexión como una cadena de caracteres. Se bloquea hasta que haya algo para recibir. Aumenta
EOFError
si no queda nada por recibir y el otro extremo se ha cerrado.Si se especifica maxlength y el mensaje es más largo que maxlength, entonces se lanza un
OSError
y la conexión ya no será legible.
-
recv_bytes_into
(buffer[, offset])¶ Lee en buffer un mensaje completo de datos de bytes enviados desde el otro extremo de la conexión y retorne el número de bytes en el mensaje. Se bloquea hasta que haya algo para recibir. Si no queda nada por recibir y el otro extremo está cerrándose se lanza
EOFError
.buffer debe ser un escribible bytes-like object. Si se proporciona offset el mensaje se escribirá en el búfer desde esa posición. La compensación debe ser un número entero no negativo menor que la longitud de buffer (en bytes).
Si el búfer es demasiado corto, se genera una excepción
BufferTooShort
y el mensaje completo está disponible comoe.args[0]
dondee
es la instancia de excepción.
Distinto en la versión 3.3: Los objetos de conexión ahora pueden transferirse entre procesos usando
Connection.send()
yConnection.recv()
.Nuevo en la versión 3.3: Los objetos de conexión ahora admiten el protocolo de administración de contexto – consulte Tipos Gestores de Contexto.
__enter__()
retorna el objeto de conexión, y__exit__()
llama aclose()
.-
Por ejemplo:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Advertencia
El método Connection.recv()
desempaqueta automáticamente los datos que recibe, lo que puede ser un riesgo de seguridad a menos que pueda confiar en el proceso que envió el mensaje.
Por lo tanto, a menos que el objeto de conexión se haya producido usando Pipe()
solo debe usar los métodos recv()
y send()
después de realizar algún tipo de autenticación. Consulte Llaves de autentificación.
Advertencia
Si se mata un proceso mientras intenta leer o escribir en una tubería (pipe), entonces es probable que los datos en la tubería se corrompan, porque puede ser imposible estar seguro de dónde se encuentran los límites del mensaje.
Primitivas de sincronización (Synchronization primitives)¶
En general, las primitivas de sincronización no son tan necesarias en un programa multiproceso como en un programa multihilos (multithreaded). Consulte la documentación para threading
.
Tenga en cuenta que también se pueden crear primitivas de sincronización utilizando un objeto administrador – consulte Administradores (Managers).
-
class
multiprocessing.
Barrier
(parties[, action[, timeout]])¶ Un objeto de barrera: un clon de
threading.Barrier
.Nuevo en la versión 3.3.
-
class
multiprocessing.
BoundedSemaphore
([value])¶ Un objeto semáforo (semaphore object) acotado: un análogo cercano de la
threading.BoundedSemaphore
.Existe una diferencia solitaria de su análogo cercano: el primer argumento de su método
acquire
es nombrado block, es consistente conLock.acquire()
.Nota
En Mac OS X, esto no se puede distinguir de
Semaphore
porquesem_getvalue()
no está implementado en esa plataforma.
-
class
multiprocessing.
Condition
([lock])¶ Una variable de condición: un alias para la
threading.Condition
.Si se especifica lock, entonces debería ser una
Lock
oRLock
objeto demultiprocessing
.Distinto en la versión 3.3: El método
wait_for()
fue a añadido.
-
class
multiprocessing.
Event
¶ Un clon de
threading.Event
.
-
class
multiprocessing.
Lock
¶ Un objeto candado no recursivo: un análogo cercano de
threading.Lock
. Una vez que un proceso o subproceso ha adquirido un bloqueo, los intentos posteriores de adquirirlo de cualquier proceso o subproceso se bloquearán hasta que se libere; cualquier proceso o hilo puede liberarlo. Los conceptos y comportamientos dethreading.Lock
como se aplica a los subprocesos se replican aquí enmultiprocessing.Lock
como se aplica a los procesos o subprocesos, excepto como se indica.Tenga en cuenta que
Lock
es en realidad una función de fábrica que retorna una instancia demultiprocessing.synchronize.Lock
inicializada con un contexto predeterminado.La
Lock
soporta el protocolo context manager y, por lo tanto, se puede usar en la declaraciónwith
.-
acquire
(block=True, timeout=None)¶ Adquiriendo un candado (lock), bloqueante o no bloqueante.
Con el argumento block establecido en
True
(el valor predeterminado), la llamada al método se bloqueará hasta que el bloqueo esté en un estado desbloqueado, luego configúrelo como bloqueado y retorneTrue
. Tenga en cuenta que el nombre de este primer argumento difiere del que aparece enthreading.Lock.acquire()
.Con el argumento block establecido en
False
, la llamada al método no se bloquea. Si el bloqueo está actualmente en un estado bloqueado, retornaFalse
; de lo contrario, configure el bloqueo en un estado bloqueado y retorneTrue
.Cuando se invoca con un valor positivo de punto flotante para timeout, bloquea como máximo el número de segundos especificado por timeout siempre que no se pueda obtener el bloqueo. Las invocaciones con un valor negativo para timeout de cero. Las invocaciones con un valor timeout de
None
(el valor predeterminado) establecen el período de tiempo de espera en infinito. Tenga en cuenta que el tratamiento de valores negativos oNone
para timeout difiere del comportamiento implementado enthreading.Lock.acquire()
. El argumento timeout no tiene implicaciones prácticas si el argumento block se establece enFalse
y, por lo tanto, se ignora. RetornaTrue
si se ha adquirido el candado oFalse
si ha transcurrido el tiempo de espera.
-
release
()¶ Suelta un candado. Esto se puede llamar desde cualquier proceso o subproceso, no solo desde el proceso o subproceso que originalmente adquirió el candado.
El comportamiento es el mismo que en
threading.Lock.release()
excepto que cuando se invoca en un bloqueo desbloqueado, se genera aValueError
.
-
-
class
multiprocessing.
RLock
¶ Un objeto de candado recursivo: un análogo cercano a
threading.RLock
. El proceso o el hilo que lo adquirió debe liberar un candado recursivo. Una vez que un proceso o subproceso ha adquirido un candado recursivo, el mismo proceso o subproceso puede volver a adquirirlo sin bloquearlo; ese proceso o hilo debe liberarlo una vez por cada vez que se haya adquirido.Tenga en cuenta que
RLock
es en realidad una función de fábrica que retorna una instancia demultiprocessing.synchronize.RLock
inicializada con un contexto predeterminado.La
RLock
admite el protocolo context manager y, por lo tanto, puede usarse enwith
.-
acquire
(block=True, timeout=None)¶ Adquiriendo un candado (lock), bloqueante o no bloqueante.
Cuando se invoca con el argumento block establecido en
True
, bloquea hasta que el candado esté en un estado desbloqueado (que no sea propiedad de ningún proceso o subproceso) a menos que el candado o el subproceso actual ya sea de su propiedad. El proceso o subproceso actual se apropia del candado (si aún no lo tiene) y el nivel de recursión dentro de este aumenta en uno, lo que da como resultado un valor de retorno deTrue
. Tenga en cuenta que hay varias diferencias en el comportamiento de este primer argumento en comparación con la implementación dethreading.RLock.acquire()
, comenzando con el nombre del argumento en sí.Cuando se invoca con el argumento block establecido en
False
, no bloquea. Si el candado ya ha sido adquirido (y por lo tanto es propiedad) de otro proceso o subproceso, el proceso o subproceso actual no se apropia y el nivel de recursión dentro del candado no cambia, lo que resulta en un valor de retorno deFalse
. Si el candado está en un estado desbloqueado, el proceso o subproceso actual toma posesión y el nivel de recurrencia se incrementa, lo que resulta en un valor de retorno deTrue
.El uso y los comportamientos del argumento timeout son los mismos que en
Lock.acquire()
. Tenga en cuenta que algunos de estos comportamientos de timeout difieren de los comportamientos implementados enthreading.RLock.acquire()
.
-
release
()¶ Libera un candado, disminuyendo el nivel de recursión. Si después del decremento el nivel de recursión es cero, restablece el candado a desbloqueado (que no sea propiedad de ningún proceso o subproceso) y si se bloquean otros procesos o subprocesos esperando que el candado se desbloquee, permite que continúe exactamente uno de ellos. Si después del decremento el nivel de recursión sigue siendo distinto de cero, el candado permanece bloqueado y pertenece al proceso de llamada o subproceso.
Solo llame a este método cuando el proceso o subproceso de llamada sea el propietario del candado. Se lanza un
AssertionError
si se llama a este método mediante un proceso o subproceso que no sea el propietario o si el candado está en un estado desbloqueado (sin propietario). Tenga en cuenta que el tipo de excepción planteada en esta situación difiere del comportamiento implementado enthreading.RLock.release()
.
-
-
class
multiprocessing.
Semaphore
([value])¶ Un objeto semáforo: un análogo cercano de
threading.Semaphore
.Existe una diferencia solitaria de su análogo cercano: el primer argumento de su método
acquire
es nombrado block, es consistente conLock.acquire()
.
Nota
En Mac OS X, sem_timedwait
no es compatible, por lo que llamar a acquire()
con un tiempo de espera emulará el comportamiento de esa función utilizando un bucle inactivo.
Nota
Si la señal SIGINT generada por Ctrl-C llega mientras el hilo principal está bloqueado por una llamada a BoundedSemaphore.acquire()
, Lock.acquire()
, RLock.acquire()
, Semaphore.acquire()
, Condition.acquire()
o Condition.wait()
, la llamada se interrumpirá inmediatamente y KeyboardInterrupt
se lanzará.
Esto difiere del comportamiento de threading
donde SIGINT será ignorado mientras las llamadas de candado equivalentes están en progreso.
Nota
Parte de la funcionalidad de este paquete requiere una implementación de semáforo compartido que funcione en el sistema operativo. Sin uno, el módulo multiprocessing.synchronize
se desactivará, y los intentos de importarlo darán como resultado ImportError
. Consulte bpo-3770 para información adicional.
Administradores (Managers)¶
Los administradores (managers) proporcionan una forma de crear datos que se pueden compartir entre diferentes procesos, incluido el intercambio en una red entre procesos que se ejecutan en diferentes máquinas. Un objeto administrador controla un proceso de servidor que gestiona shared objects (objetos compartidos). Otros procesos pueden acceder a los objetos compartidos mediante el uso de servidores proxy.
Retorna un objeto iniciado
SyncManager
que se puede usar para compartir objetos entre procesos. El objeto administrador retornado corresponde a un proceso hijo generado y tiene métodos que crearán objetos compartidos y retornarán los proxies correspondientes.
Los procesos del administrador se cerrarán tan pronto como se recolecte la basura o salga su proceso padre. Las clases de administrador se definen en el módulo multiprocessing.managers
:
-
class
multiprocessing.managers.
BaseManager
([address[, authkey]])¶ Crear un objeto BaseManager.
Una vez creado, debe llamar a
start()
oget_server().serve_forever()
para asegurarse de que el objeto de administrador se refiera a un proceso de administrador iniciado.address es la dirección en la que el proceso del administrador escucha las nuevas conexiones. Si address es
None
, se elige una arbitrariamente.authkey es la clave de autenticación que se utilizará para verificar la validez de las conexiones entrantes al proceso del servidor. Si authkey es
None
, entonces se usacurrent_process().authkey
. De lo contrario, se usa authkey y debe ser una cadena de bytes.-
start
([initializer[, initargs]])¶ Se inicia un subproceso para iniciar el administrador. Si initializer no es
None
, entonces el subproceso llamaráinitializer(*initargs)
cuando se inicie.
-
get_server
()¶ Retorna un objeto
Server
que representa el servidor real bajo el control del Administrador. El objetoServer
admite el métodoserve_forever()
:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
tiene un atributo adicionaladdress
.
-
connect
()¶ Conecta un objeto de administrador (manager) local a un proceso de administrador remoto:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
-
shutdown
()¶ Detiene el proceso utilizado por el gerente (manager). Esto solo está disponible si
start()
se ha utilizado para iniciar el proceso del servidor.Esto se puede llamar múltiples veces.
-
register
(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶ Un método de clase que puede usarse para registrar un tipo o invocarse con la clase de administrador (manager).
typeid es un «identificador de tipo» que se utiliza para identificar un tipo particular de objeto compartido. Esto debe ser una cadena de caracteres.
callable es un invocable utilizado para crear objetos para este identificador de tipo. Si una instancia de administrador se conectará al servidor utilizando el método
connect()
, o si el argumento create_method esFalse
, esto se puede dejar comoNone
.proxytype es una subclase de
BaseProxy
que se usa para crear proxies para objetos compartidos con este typeid. SiNone
, se crea automáticamente una clase proxy.Se utiliza exposed para especificar una secuencia de nombres de métodos a los que se debe permitir el acceso de los servidores proxy para este tipo de identificación utilizando
BaseProxy._callmethod()
. (Si exposed esNone
, entoncesproxytype._exposed_
se usa en su lugar si existe). En el caso de que no se especifique una lista expuesta, todos los «métodos públicos» del objeto compartido serán accesibles . (Aquí un «método público» significa cualquier atributo que tenga un método__call__()
y cuyo nombre no comience con'_'
.)El method_to_typeid es una asignación utilizada para especificar el tipo de retorno de los métodos expuestos que deberían retornar un proxy. Asigna nombres de métodos a cadenas typeid. (Si method_to_typeid es
None
entoncesproxytype._method_to_typeid
se usa en su lugar si existe). Si el nombre de un método no es una clave de esta asignación o si la asignación esNone
entonces el objeto retornado por el método se copiará por valor.create_method determina si un método debe crearse con el nombre typeid que se puede usar para indicarle al proceso del servidor que cree un nuevo objeto compartido y retornando un proxy para él. Por defecto es
True
.
BaseManager
las instancias también tienen una propiedad de solo lectura:-
address
¶ La dirección utilizada por el administrador.
Distinto en la versión 3.3: Los objetos de administrador admiten el protocolo de gestión de contexto; consulte Tipos Gestores de Contexto.
__enter__()
inicia el proceso del servidor (si aún no se ha iniciado) y luego retorna el objeto de administrador.__exit__()
llamashutdown()
.En versiones anteriores
__enter__()
no iniciaba el proceso del servidor del administrador si aún no se había iniciado.-
-
class
multiprocessing.managers.
SyncManager
¶ Una subclase de
BaseManager
que se puede utilizar para la sincronización de procesos. Los objetos de este tipo son retornados pormultiprocessing.Manager()
.Sus métodos crean y retornan Objetos Proxy (Proxy Objects) para varios tipos de datos de uso común que se sincronizarán entre procesos. Esto incluye notablemente listas compartidas y diccionarios.
-
Barrier
(parties[, action[, timeout]])¶ threading.Barrier
crea un objeto compartido y retorna un proxy para él.Nuevo en la versión 3.3.
-
BoundedSemaphore
([value])¶ Crea un objeto compartido
threading.BoundedSemaphore
y retorna un proxy para él.
-
Condition
([lock])¶ Crea un objeto compartido
threading.Condition
y retorna un proxy para él.Si se proporciona lock, debería ser un proxy para un objeto
threading.Lock
othreading.RLock
.Distinto en la versión 3.3: El método
wait_for()
fue a añadido.
-
Event
()¶ Crea un objeto compartido
threading.Event
y retorna un proxy para él.
-
Lock
()¶ Crea un objeto compartido
threading.Lock
y retorna un proxy para él.
-
Queue
([maxsize])¶ Crea un objeto compartido
queue.Queue
y retorna un proxy para él.
-
RLock
()¶ Crea un objeto compartido
threading.RLock
y retorna un proxy para él.
-
Semaphore
([value])¶ Crea un objeto compartido
threading.Semaphore
y retorna un proxy para él.
-
Array
(typecode, sequence)¶ Crea un arreglo y retorna un proxy para ello.
-
Value
(typecode, value)¶ Crea un objeto con un atributo de escritura
value
y retorna un proxy para él.
Distinto en la versión 3.6: Los objetos compartidos pueden anidarse. Por ejemplo, un objeto contenedor compartido, como una lista compartida, puede contener otros objetos compartidos que serán administrados y sincronizados por
SyncManager
.-
-
class
multiprocessing.managers.
Namespace
¶ Un tipo que puede registrarse con
SyncManager
.Un objeto de espacio de nombres no tiene métodos públicos, pero tiene atributos de escritura. Su representación muestra los valores de sus atributos.
Sin embargo, cuando se usa un proxy para un objeto de espacio de nombres, un atributo que comience con
'_'
será un atributo del proxy y no un atributo del referente:>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
Administradores customizables (Customized managers)¶
Para crear su propio administrador, uno crea una subclase de BaseManager
y utiliza el método de clase register()
para registrar nuevos tipos o llamadas con la clase de administrador. Por ejemplo:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
Utilizando un administrador remoto¶
Es posible ejecutar un servidor administrador en una máquina y hacer que los clientes lo usen desde otras máquinas (suponiendo que los cortafuegos involucrados lo permitan).
La ejecución de los siguientes comandos crea un servidor para una única cola compartida a la que los clientes remotos pueden acceder:
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Un cliente puede tener accesos al servidor de la siguiente manera:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
Otro cliente puede también usarlo:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
Los procesos locales también pueden acceder a esa cola (queue), utilizando el código de arriba en el cliente para acceder de forma remota:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super().__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Objetos Proxy (Proxy Objects)¶
Un proxy es un objeto que se refiere a un objeto compartido que vive (presumiblemente) en un proceso diferente. Se dice que el objeto compartido es el referente del proxy. Varios objetos proxy pueden tener el mismo referente.
Un objeto proxy tiene métodos que invocan los métodos correspondientes de su referente (aunque no todos los métodos del referente estarán necesariamente disponibles a través del proxy). De esta manera, un proxy se puede usar al igual que su referente:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Tenga en cuenta que la aplicación str()
a un proxy retornará la representación del referente, mientras que la aplicación repr()
retornará la representación del proxy.
Una característica importante de los objetos proxy es que son seleccionables para que puedan pasarse entre procesos. Como tal, un referente puede contener Objetos Proxy (Proxy Objects). Esto permite anidar estas listas administradas, dictados y otros Objetos Proxy (Proxy Objects):
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
Del mismo modo, los proxies dict y list pueden estar anidados uno dentro del otro:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
Si los objetos estándar (no proxy) list
or dict
están contenidos en un referente, las modificaciones a esos valores mutables no se propagarán a través del administrador porque el proxy no tiene forma de saber cuándo los valores contenidos dentro son modificados. Sin embargo, almacenar un valor en un proxy de contenedor (que desencadena un __setitem__
en el objeto proxy) se propaga a través del administrador y, por lo tanto, para modificar efectivamente dicho elemento, uno podría reasignar el valor modificado al proxy de contenedor:
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
Este enfoque es quizás menos conveniente que emplear anidado Objetos Proxy (Proxy Objects) para la mayoría de los casos de uso, pero también demuestra un nivel de control sobre la sincronización.
Nota
Los tipos de proxy en multiprocessing
no hacen nada para admitir comparaciones por valor. Entonces, por ejemplo, tenemos:
>>> manager.list([1,2,3]) == [1,2,3]
False
En su lugar, se debe usar una copia del referente al hacer comparaciones.
-
class
multiprocessing.managers.
BaseProxy
¶ Los objetos proxy son instancias de subclases de
BaseProxy
.-
_callmethod
(methodname[, args[, kwds]])¶ Llama y retorna el resultado de un método del referente del proxy.
Si
proxy
es un proxy cuyo referente esobj
entonces la expresiónproxy._callmethod(methodname, args, kwds)
evaluará la expresión
getattr(obj, methodname)(*args, **kwds)
en el proceso del administrador.
El valor retornado será una copia del resultado de la llamada o un proxy a un nuevo objeto compartido; consulte la documentación del argumento method_to_typeid de
BaseManager.register()
.Si la llamada genera una excepción, entonces se vuelve a generar
_callmethod()
. Si se genera alguna otra excepción en el proceso del administrador, esto se convierte en una excepciónRemoteError
y se genera mediante_callmethod()
.Tenga en cuenta en particular que se generará una excepción si methodname no ha sido exposed.
Un ejemplo de uso de
_callmethod()
:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
-
_getvalue
()¶ Retorna una copia del referente.
Si el referente no se puede deserializar (unpicklable), esto generará una excepción.
-
__repr__
()¶ Retorna una representación de un objeto proxy.
-
__str__
()¶ Retorna una representación del referente.
-
Limpieza (Cleanup)¶
Un objeto proxy utiliza una devolución de llamada (callback) de referencia débil (weakref) para que cuando sea recolectado por el recolector de basura se da de baja del administrador que posee su referente.
Un objeto compartido se elimina del proceso del administrador cuando ya no hay ningún proxy que se refiera a él.
Piscinas de procesos (Process Pools)¶
Se puede crear un grupo de procesos que llevarán a cabo las tareas que se le presenten con la Pool
class.
-
class
multiprocessing.pool.
Pool
([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶ Un objeto de grupo de procesos que controla un grupo de procesos de trabajo a los que se pueden enviar trabajos. Admite resultados asincrónicos con tiempos de espera y devoluciones de llamada y tiene una implementación de mapa paralelo.
processes es el número de procesos de trabajo a utilizar. Si processes es
None
, se utiliza el número retornado poros.cpu_count()
.Si initializer no es
None
, cada proceso de trabajo llamaráinitializer(*initargs)
cuando se inicie.maxtasksperchild es el número de tareas que un proceso de trabajo puede completar antes de salir y ser reemplazado por un proceso de trabajo nuevo, para permitir que se liberen los recursos no utilizados. El valor predeterminado maxtasksperchild es
None
, lo que significa que los procesos de trabajo vivirán tanto tiempo como el grupo.context se puede utilizar para especificar el contexto utilizado para iniciar los procesos de trabajo. Por lo general, un grupo se crea utilizando la función
multiprocessing.Pool()
o el método de un objeto de contextoPool()
. En ambos casos, context se establece de manera adecuada.Tenga en cuenta que los métodos del objeto de grupo solo deben ser invocados por el proceso que creó el grupo.
Advertencia
Los objetos
multiprocessing.pool
tienen recursos internos que necesitan ser administrados adecuadamente (como cualquier otro recurso) utilizando el grupo como administrador de contexto o llamando aclose()
yterminate()
manualmente. De lo contrario, el proceso puede demorarse en la finalización.Tenga en cuenta que no es correcto confiar en el recolector de basura para destruir el grupo ya que CPython no asegura que se llamará al finalizador del grupo (consulte
object.__del__()
para obtener más información).Nuevo en la versión 3.2: maxtasksperchild
Nuevo en la versión 3.4: context
Nota
Los procesos de los trabajadores dentro de una
Pool
normalmente viven durante la duración completa de la cola de trabajo de la piscina. Un patrón frecuente que se encuentra en otros sistemas (como Apache, mod_wsgi, etc.) para liberar recursos en poder de los trabajadores es permitir que un trabajador dentro de un grupo complete solo una cantidad determinada de trabajo antes de salir, limpiarse y generar un nuevo proceso para reemplazar el viejo. El argumento maxtasksperchild paraPool
expone esta capacidad al usuario final.-
apply
(func[, args[, kwds]])¶ Llama a func con argumentos args y argumentos de palabras clave kwds. Se bloquea hasta que el resultado esté listo. Dados estos bloques,
apply_async()
es más adecuado para realizar trabajos en paralelo. Además, func solo se ejecuta en uno de los trabajadores de piscina.
-
apply_async
(func[, args[, kwds[, callback[, error_callback]]]])¶ A variant of the
apply()
method which returns aAsyncResult
object.Si se especifica callback, debería ser un invocable que acepte un único argumento. Cuando el resultado está listo, se le aplica callback, a menos que la llamada falle, en cuyo caso se aplica error_callback.
Si se especifica error_callback, debería ser un invocable que acepte un único argumento. Si la función de destino falla, se llama a error_callback con la instancia de excepción.
Las devoluciones de llamada deben completarse inmediatamente ya que de lo contrario el hilo que maneja los resultados se bloqueará.
-
map
(func, iterable[, chunksize])¶ Un equivalente paralelo de la función incorporada
map()
(aunque solo admite un argumento iterable, para varios iterables consultestarmap()
). Bloquea hasta que el resultado esté listo.Este método corta el iterable en varios trozos que envía al grupo de procesos como tareas separadas. El tamaño (aproximado) de estos fragmentos se puede especificar estableciendo chunksize en un entero positivo.
Tenga en cuenta que puede causar un alto uso de memoria para iterables muy largos. Considere usar
imap()
oimap_unordered()
con la opción explícita chunksize para una mejor eficiencia.
-
map_async
(func, iterable[, chunksize[, callback[, error_callback]]])¶ A variant of the
map()
method which returns aAsyncResult
object.Si se especifica callback, debería ser un invocable que acepte un único argumento. Cuando el resultado está listo, se le aplica callback, a menos que la llamada falle, en cuyo caso se aplica error_callback.
Si se especifica error_callback, debería ser un invocable que acepte un único argumento. Si la función de destino falla, se llama a error_callback con la instancia de excepción.
Las devoluciones de llamada deben completarse inmediatamente ya que de lo contrario el hilo que maneja los resultados se bloqueará.
-
imap
(func, iterable[, chunksize])¶ Una versión más perezosa (lazier) de
map()
.El argumento chunksize es el mismo que el utilizado por el método
map()
. Para iterables muy largos, usar un valor grande para chunksize puede hacer que el trabajo se complete much (mucho) más rápido que usar el valor predeterminado de1
.Además, si chunksize es
1
, el métodonext()
del iterador retornado por el métodoimap()
tiene un parámetro opcional timeout :next (timeout)
lanzarámultiprocessing.TimeoutError
si el resultado no puede retornarse dentro de timeout segundos.
-
imap_unordered
(func, iterable[, chunksize])¶ Lo mismo que
imap()
, excepto que el orden de los resultados del iterador retornado debe considerarse arbitrario. (Solo cuando hay un solo proceso de trabajo se garantiza que el orden sea «correcto»).
-
starmap
(func, iterable[, chunksize])¶ Como
map()
excepto que se espera que los elementos de iterable sean iterables que se desempaquetan como argumentos.Por lo tanto, un iterable de
[(1,2), (3, 4)]
da como resultado[func(1,2), func(3,4)]
.Nuevo en la versión 3.3.
-
starmap_async
(func, iterable[, chunksize[, callback[, error_callback]]])¶ Una combinación de
starmap()
ymap_async()
que itera sobre iterable de iterables y llama a func con los iterables desempaquetados. Como resultado se retorna un objeto.Nuevo en la versión 3.3.
-
close
()¶ Impide que se envíen más tareas a la piscina (pool). Una vez que se hayan completado todas las tareas, se cerrarán los procesos de trabajo.
-
terminate
()¶ Detiene los procesos de trabajo inmediatamente sin completar el trabajo pendiente. Cuando el objeto del grupo es basura recolectada
terminate()
se llamará inmediatamente.
-
join
()¶ Espera a que salgan los procesos de trabajo. Se debe llamar
close()
oterminate()
antes de usarjoin()
.
Nuevo en la versión 3.3: Los objetos de piscina (pool) ahora admiten el protocolo de administración de contexto; consulte Tipos Gestores de Contexto.
__ enter__()
retorna el objeto de grupo, y__ exit__()
llamaterminate()
.-
-
class
multiprocessing.pool.
AsyncResult
¶ La clase del resultado retornado por
Pool.apply_async()
yPool.map_async()
.-
get
([timeout])¶ Retorna el resultado cuando llegue. Si timeout no es
None
y el resultado no llega dentro de timeout segundos, entonces se lanzamultiprocessing.TimeoutError
. Si la llamada remota generó una excepción, esa excepción se volverá a plantear medianteget()
.
-
wait
([timeout])¶ Espera hasta que el resultado esté disponible o hasta que pase timeout segundos.
-
ready
()¶ Retorna si la llamada se ha completado.
-
successful
()¶ Retorna si la llamada se completó sin generar una excepción. Lanzará
ValueError
si el resultado no está listo.Distinto en la versión 3.7: Si el resultado no está listo
ValueError
aparece en lugar deAssertionError
.
-
El siguiente ejemplo demuestra el uso de una piscina(pool):
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
Oyentes y Clientes (Listeners and Clients)¶
Por lo general, el paso de mensajes entre procesos se realiza mediante colas o mediante objetos Connection
retornados por Pipe()
.
Sin embargo, el módulo multiprocessing.connection
permite cierta flexibilidad adicional. Básicamente proporciona una API orientada a mensajes de alto nivel para tratar con sockets o canalizaciones con nombre de Windows. También tiene soporte para digest authentication usando el módulo hmac
, y para sondear múltiples conexiones al mismo tiempo.
-
multiprocessing.connection.
deliver_challenge
(connection, authkey)¶ Envía un mensaje generado aleatoriamente al otro extremo de la conexión y espera una respuesta.
Si la respuesta coincide con el resumen del mensaje utilizando authkey como clave, se envía un mensaje de bienvenida al otro extremo de la conexión. De lo contrario se lanza
AuthenticationError
.
-
multiprocessing.connection.
answer_challenge
(connection, authkey)¶ Recibe un mensaje, calcula el resumen del mensaje usando authkey como la clave y luego envía el resumen de vuelta.
Si no se recibe un mensaje de bienvenida, se lanza
AuthenticationError
.
-
multiprocessing.connection.
Client
(address[, family[, authkey]])¶ Se intenta configurar una conexión con el oyente que utiliza la dirección address, retornando
Connection
.El tipo de conexión está determinado por el argumento family, pero esto generalmente se puede omitir ya que generalmente se puede inferir del formato de address. (Consulte Formatos de dirección (Address formats))
Si se proporciona authkey y no None, debe ser una cadena de bytes y se utilizará como clave secreta para un desafío de autenticación basado en HMAC. No se realiza la autenticación si authkey es None. Si falla la autenticación se lanza
AuthenticationError
. Consulte Llaves de autentificación.
-
class
multiprocessing.connection.
Listener
([address[, family[, backlog[, authkey]]]])¶ Un contenedor para un socket vinculado o una tubería (pipe) con nombre de Windows que está “escuchando” las conexiones.
address es la dirección que utilizará el socket vinculado o la conocida tubería (pipe) con nombre del objeto de escucha.
Nota
Si se usa una dirección de “0.0.0.0” , la dirección no será un punto final conectable en Windows. Si necesita un punto final conectable, debe usar “127.0.0.1”.
family es el tipo de socket (o tubería con nombre) a utilizar. Esta puede ser una de las cadenas de caracteres
'AF_INET'
(para un socket TCP),' AF_UNIX'
(para un socket de dominio Unix) o'AF_PIPE'
(para una tubería con nombre de Windows) . De estos, solo el primero está garantizado para estar disponible. Si family esNone
, family se deduce del formato de address. Si address también esNone
, se elige un valor predeterminado. Este valor predeterminado es family con la opción más rápida disponible. Consulte Formatos de dirección (Address formats). Tenga en cuenta que si family es'AF_UNIX'
y la dirección esNone
, el socket se creará en un directorio temporal privado usandotempfile.mkstemp()
.Si el objeto de escucha utiliza un socket, entonces backlog (1 por defecto) se pasa al método
listen()
del socket una vez que se ha vinculado.Si se proporciona authkey y no None, debe ser una cadena de bytes y se utilizará como clave secreta para un desafío de autenticación basado en HMAC. No se realiza la autenticación si authkey es None. Si falla la autenticación se lanza
AuthenticationError
. Consulte Llaves de autentificación.-
accept
()¶ Acepta una conexión en el socket vinculado o canalización con nombre del objeto de escucha y retorne un objeto
Connection
. Si se intenta la autenticación y falla, entonces se lanza unaAuthenticationError
.
-
close
()¶ Cierra el socket vinculado o la tubería con nombre del objeto de escucha. Esto se llama automáticamente cuando el oyente es recolectado por el recolector de basura. Sin embargo, es aconsejable llamarlo explícitamente.
Los objetos de escucha tienen las siguientes propiedades de solo lectura:
-
address
¶ La dirección que está utilizando el objeto Listener.
-
last_accepted
¶ La dirección de donde vino la última conexión aceptada. Si esto no está disponible, entonces es
None
.
Nuevo en la versión 3.3: Los objetos de escucha ahora admiten el protocolo de gestión de contexto – consulte Tipos Gestores de Contexto. El objeto lISTENER retorna
__enter__()
, y__exit__()
llama aclose()
.-
-
multiprocessing.connection.
wait
(object_list, timeout=None)¶ Espera hasta que un objeto en object_list esté listo. Retorna la lista de esos objetos en object_list que están listos. Si timeout es flotante, la llamada se bloquea durante como máximo tantos segundos. Si timeout es
`None
, se bloqueará por un período ilimitado. Un tiempo de espera negativo es equivalente a un tiempo de espera cero.Tanto para Unix como para Windows, un objeto puede aparecer en object_list si este es
un objeto legible de
Connection
;un objeto conectado y legible de
socket.socket
; o
Un objeto de conexión o socket está listo cuando hay datos disponibles para leer, o el otro extremo se ha cerrado.
Unix:
wait(object_list, timeout)
es casi equivalente aselect.select(object_list, [], [], timeout)
. La diferencia es que si se interrumpeselect.select()
por una señal, este lanzaOSError
con un número de errorEINTR
, a diferencia dewait()
.Windows: Un elemento en object_list debe ser un identificador de número entero que se pueda esperar (de acuerdo con la definición utilizada por la documentación de la función Win32
WaitForMultipleObjects()
) o puede ser un objeto con unfileno()
Método que retorna un manejador de tubo o manejador de tubería. (Tenga en cuenta que las manijas de las tuberías y las manijas de los zócalos son no manijas aptas)Nuevo en la versión 3.3.
Ejemplos
El siguiente código de servidor crea un escucha que utiliza 'secret password'
como clave de autenticación. Luego espera una conexión y envía algunos datos al cliente:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
El siguiente código se conecta al servidor y recibe algunos datos del servidor:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
El siguiente código utiliza wait()
para esperar mensajes de múltiples procesos a la vez:
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
Formatos de dirección (Address formats)¶
Una dirección
'AF_INET'
es una tupla de la forma(hostname, port)
donde hostname es una cadena de caracteres y port es un número entero.An
'AF_UNIX'
address is a string representing a filename on the filesystem.An
'AF_PIPE'
address is a string of the formr'\.\pipe{PipeName}'
. To useClient()
to connect to a named pipe on a remote computer called ServerName one should use an address of the formr'\ServerName\pipe{PipeName}'
instead.
Note that any string beginning with two backslashes is assumed by default to be
an 'AF_PIPE'
address rather than an 'AF_UNIX'
address.
Llaves de autentificación¶
Cuando uno usa Connection.recv
, los datos recibidos se desbloquean automáticamente. Desafortunadamente, la eliminación de datos de una fuente no confiable es un riesgo de seguridad. Por lo tanto Listener
y Client()
usan el módulo hmac
para proporcionar autenticación de resumen.
Una clave de autenticación es una cadena de bytes que se puede considerar como una contraseña: una vez que se establece una conexión, ambos extremos exigirán pruebas de que el otro conoce la clave de autenticación. (Demostrar que ambos extremos están usando la misma clave no implica enviar la clave a través de la conexión).
Si se solicita la autenticación pero no se especifica una clave de autenticación, se utiliza el valor de retorno de current_process().authkey
(consulte Process
). Este valor será heredado automáticamente por cualquier objeto Process
que crea el proceso actual. Esto significa que (por defecto) todos los procesos de un programa multiproceso compartirán una única clave de autenticación que se puede usar al configurar conexiones entre ellos.
Las claves de autenticación adecuadas también se pueden generar utilizando os.urandom()
.
Logging¶
Existe cierto soporte para el registro. Sin embargo, tenga en cuenta que el paquete logging
no utiliza candados compartidos de proceso, por lo que es posible (dependiendo del tipo de controlador) que los mensajes de diferentes procesos se mezclen.
-
multiprocessing.
get_logger
()¶ Retorna el registrador utilizado por
multiprocessing
. Si es necesario, se creará uno nuevo.Cuando se creó por primera vez, el registrador tiene nivel
logging.NOTSET
y no tiene un controlador predeterminado. Los mensajes enviados a este registrador no se propagarán por defecto al registrador raíz.Tenga en cuenta que en Windows los procesos hijos solo heredarán el nivel del registrador del proceso parental – no se heredará ninguna otra personalización del registrador.
-
multiprocessing.
log_to_stderr
()¶ Esta función realiza una llamada a
get_logger()
pero además de retornar el registrador creado por get_logger, agrega un controlador que envía la salida asys.stderr
usando el formato'[%(levelname)s/%(processName)s] %(message)s'
.
A continuación se muestra una sesión de ejemplo con el registro activado:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
Para obtener una tabla completa de niveles de registro, consulte el módulo logging
.
El módulo multiprocessing.dummy
¶
El multiprocessing.dummy
replica la API de multiprocessing
pero no es más que un contenedor alrededor del módulo threading
.
In particular, the Pool
function provided by multiprocessing.dummy
returns an instance of ThreadPool
, which is a subclass of
Pool
that supports all the same method calls but uses a pool of
worker threads rather than worker processes.
-
class
multiprocessing.pool.
ThreadPool
([processes[, initializer[, initargs]]])¶ A thread pool object which controls a pool of worker threads to which jobs can be submitted.
ThreadPool
instances are fully interface compatible withPool
instances, and their resources must also be properly managed, either by using the pool as a context manager or by callingclose()
andterminate()
manually.processes is the number of worker threads to use. If processes is
None
then the number returned byos.cpu_count()
is used.Si initializer no es
None
, cada proceso de trabajo llamaráinitializer(*initargs)
cuando se inicie.Unlike
Pool
, maxtasksperchild and context cannot be provided.Nota
A
ThreadPool
shares the same interface asPool
, which is designed around a pool of processes and predates the introduction of theconcurrent.futures
module. As such, it inherits some operations that don’t make sense for a pool backed by threads, and it has its own type for representing the status of asynchronous jobs,AsyncResult
, that is not understood by any other libraries.Users should generally prefer to use
concurrent.futures.ThreadPoolExecutor
, which has a simpler interface that was designed around threads from the start, and which returnsconcurrent.futures.Future
instances that are compatible with many other libraries, includingasyncio
.
Pautas de programación¶
Hay ciertas pautas y expresiones idiomáticas que deben tenerse en cuenta al usar multiprocessing
.
Todos los métodos de inicio¶
Lo siguiente se aplica a todos los métodos de inicio.
Evita estado compartido
En la medida de lo posible, se debe tratar de evitar el desplazamiento de grandes cantidades de datos entre procesos.
Probablemente sea mejor seguir usando colas (queues) o tuberías (pipes) para la comunicación entre procesos en lugar de usar las primitivas de sincronización de nivel inferior.
Serialización (picklability)
Asegúrese que todos los argumentos de los métodos de proxies son serializables (pickable)
Seguridad de hilos de proxies
No usa un objeto proxy de más de un hilo a menos que lo proteja con un candado (lock).
(Nunca hay un problema con diferentes procesos que usan el mismo proxy.)
Uniéndose a procesos zombies
En Unix, cuando un proceso finaliza pero no se ha unido, se convierte en un zombie. Nunca debería haber muchos porque cada vez que se inicia un nuevo proceso (o se llama
active_children()
) se unirán todos los procesos completados que aún no se hayan unido. También llamando a un proceso terminadoProcess.is_alive
se unirá al proceso. Aun así, probablemente sea una buena práctica unir explícitamente todos los procesos que comience.
Mejor heredar que serializar/deserializar (pickle/unpickle)
Cuando se usan los métodos de inicio spawn o forkserver, muchos tipos de
multiprocesamiento
deben ser seleccionables para que los procesos secundarios puedan usarlos. Sin embargo, generalmente se debe evitar enviar objetos compartidos a otros procesos mediante tuberías o colas. En su lugar, debe organizar el programa para que un proceso que necesita acceso a un recurso compartido creado en otro lugar pueda heredarlo de un proceso ancestro.
Evita procesos de finalización
El uso del método
Process.terminate
para detener un proceso puede causar que los recursos compartidos (como candados, semáforos, tuberías y colas) que el proceso utiliza actualmente se rompan o no disponible para otros procesos.Por lo tanto, probablemente sea mejor considerar usar
Process.terminate
en procesos que nunca usan recursos compartidos.
Unirse a procesos que usan colas
Tenga en cuenta que un proceso que ha puesto elementos en una cola esperará antes de finalizar hasta que todos los elementos almacenados en búfer sean alimentados por el hilo «alimentador» a la tubería subyacente. (El proceso secundario puede llamar al método
Queue.cancel_join_thread
de la cola para evitar este comportamiento).Esto significa que siempre que use una cola debe asegurarse de que todos los elementos que se hayan puesto en la cola se eliminarán antes de unirse al proceso. De lo contrario, no puede estar seguro de que los procesos que han puesto elementos en la cola finalizarán. Recuerde también que los procesos no demoníacos se unirán automáticamente.
Un ejemplo que de bloqueo mutuo (deadlock) es el siguiente
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()Una solución aquí sería intercambiar las dos últimas líneas (o simplemente eliminar la línea
p.join()
).
Se pasan recursos explícitamente a procesos hijos
En Unix que utiliza el método de inicio fork, un proceso secundario puede hacer uso de un recurso compartido creado en un proceso primario utilizando un recurso global. Sin embargo, es mejor pasar el objeto como argumento al constructor para el proceso secundario.
Además de hacer que el código (potencialmente) sea compatible con Windows y los otros métodos de inicio, esto también garantiza que mientras el proceso secundario siga vivo, el objeto no se recolectará en el proceso primario. Esto podría ser importante si se libera algún recurso cuando el objeto es basura recolectada en el proceso padre.
Entonces por ejemplo
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()debería ser reescrito como
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
Tenga cuidado de reemplazar sys.stdin
con un «file like object»
multiprocessing
original e incondicionalmente llamado:os.close(sys.stdin.fileno())en el método
multiprocessing.Process._bootstrap()
— Esto dio lugar a problemas con los procesos en proceso. Esto ha sido cambiado a:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)Lo que resuelve el problema fundamental de los procesos que chocan entre sí dando como resultado un error de descriptor de archivo incorrecto, pero presenta un peligro potencial para las aplicaciones que reemplazan
sys.stdin()
con un «objeto similar a un archivo» con almacenamiento en búfer de salida. Este peligro es que si varios procesos invocanclose()
en este objeto similar a un archivo, podría ocasionar que los mismos datos se vacíen al objeto varias veces, lo que provocaría corrupción.Si escribe un objeto similar a un archivo e implementa su propio almacenamiento en caché, puede hacer que sea seguro para la bifurcación (fork-safe) almacenando el pid cada vez que se agrega al caché y descartando el caché cuando cambia el pid. Por ejemplo:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cachePara más información, consulte bpo-5155, bpo-5313 y bpo-5331
Los métodos de inicio spawn y forkserver¶
Hay algunas restricciones adicionales que no se aplican al método de inicio fork.
Más serialización (pickability)
Asegúrese de que todos los argumentos para
Process.__init__()
sean serializables (picklable). Además, si la subclase esProcess
asegúrese de que las instancias serán serializables cuando se llame al métodoProcess.start
.
Variables globales
Tenga en cuenta que si el código que se ejecuta en un proceso secundario intenta acceder a una variable global, entonces el valor que ve (si lo hay) puede no ser el mismo que el valor en el proceso primario en el momento en que fue llamado
Process.start
.Sin embargo, las variables globales que son solo constantes de nivel de módulo no causan problemas.
Importando de manera segura el módulo principal
Asegúrese de que un nuevo intérprete de Python pueda importar de forma segura el módulo principal sin causar efectos secundarios no deseados (como comenzar un nuevo proceso).
Por ejemplo, usando el método de inicio spawn o forkserver ejecutando este módulo fallaría produciendo
RuntimeError
:from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()En su lugar, se debe proteger el «punto de entrada» («entry point») del programa utilizando como sigue
if __name__ == '__main__':
from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(La línea
freeze_support()
puede omitirse si el programa se ejecuta normalmente en lugar de congelarse).Esto permite que el intérprete de Python recién generado importe de forma segura el módulo y luego ejecute la función del módulo
foo()
.Se aplican restricciones similares si se crea un grupo o administrador en el módulo principal.
Ejemplos¶
Demostración de cómo crear y usar gerentes y proxies personalizados:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
Usando Pool
:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
Un ejemplo que muestra cómo usar las colas para alimentar tareas a una colección de procesos de trabajo y recopilar los resultados:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()