multiprocessing.shared_memory --- 可跨进程直接访问的共享内存

源代码: Lib/multiprocessing/shared_memory.py

在 3.8 版本加入.


该模块提供了一个 SharedMemory 类,用于分配和管理多核或对称多处理器(SMP)机器上进程间的共享内存。 为了协助进行不同进程间共享内存的生命周期管理,在 multiprocessing.managers 模块中还提供了一个 BaseManager 的子类 SharedMemoryManager

在本模块中,共享内存是指“POSIX 风格”的共享内存块(虽然它并不一定被显式地以这种风格实现)而不是指“分布式共享内存”。 这种风格的共享内存允许不同进程读写一块共同的(或共享的)易失性内存区域。 进程在传统上被限制为只能访问它们自己的进程内存空间而共享内存则允许跨进程共享数据,从而避免通过进程间发送消息的形式传递数据。 相比通过磁盘或套接字或者其他需要序列化/反序列化以及数据拷贝的共享形式,直接通过内存共享数据可提供显著的性能提升。

class multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0)

创建一个 SharedMemory 类的实例用来新建一个共享内存块或关联到一个已存在的共享内存块。 每个共享内存块都被赋予一个独有的名称。 通过这种方式,进程可以创建一个具有特定名称的共享内存块然后别的进程可以使用相同的名称关联到相同的共享内存块。

作为一种跨进程共享数据的方式,共享内存块的寿命可以超过创建它的原始进程。 当一个进程不再需要访问一个可能仍被其他进程所需要的的共享内存块时,应当调用 close() 方法。 当一个共享内存块不再被任何进程所需要时,则应当调用 unlink() 方法以确保执行适当的清理操作。

参数:
  • name (str | None) -- 被请求的共享内存的独有名称,以字符串形式指定。 当创建新的共享内存块时,如果提供 None 作为名称(默认值),将随机生成一个新名称。

  • create (bool) -- 控制是要创建新的共享内存块 (True) 还是关联到已有的共享内存块 (False)。

  • size (int) -- 当创建新的共享内存块时所请求的字节数。 由于某些平台会选择根据平台的内存页大小来分配内存块,因此共享内存块的实际大小可能会大于等于所请求的大小。 当关联到已有的共享内存块时,size 形参将被忽略。

close()

关闭该实例对共享内存的访问。 为确保正确清理资源,所有实例都应当在实例不再被需要时调用 close()。 请注意调用 close() 并不会导致共享内存块本身被销毁。

请求销毁底层的共享内存块。 为了执行必要的清理,在所有需要使用这个共享内存块的进程中 unlink() 应当被调用一次(且仅有一次)。 在发出此销毁请求后,共享内存块可能会也可能不会被立即销毁,并且此行为在不同系统平台上可能不同。 在 unlink() 已被调用后再尝试访问共享内存块中的数据可能导致内存访问错误。 注意:最后一个结束持有共享内存块的进程可能以任意顺序调用 unlink()close()

buf

共享内存块内容的 memoryview 。

name

共享内存块的唯一标识,只读属性。

size

共享内存块的字节大小,只读属性。

以下示例展示了 SharedMemory 底层的用法:

>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
>>> buffer[4] = 100                           # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5])  # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy'  # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5])      # Access via shm_a
b'howdy'
>>> shm_b.close()   # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink()  # Call unlink only once to release the shared memory

下面的例子展示了 SharedMemory 类配合 NumPy 数组 的实际应用,从两个独立的 Python shell 访问相同的 numpy.ndarray:

>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8])  # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:]  # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
>>> type(a)
<class 'numpy.ndarray'>
>>> shm.name  # We did not specify a name so one was chosen for us
'psm_21467_46075'

>>> # In either the same shell or a new Python shell on the same machine
>>> import numpy as np
>>> from multiprocessing import shared_memory
>>> # Attach to the existing shared memory block
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
array([  1,   1,   2,   3,   5, 888])

>>> # Back in the first Python interactive shell, b reflects this change
>>> b
array([  1,   1,   2,   3,   5, 888])

>>> # Clean up from within the second Python shell
>>> del c  # Unnecessary; merely emphasizing the array is no longer used
>>> existing_shm.close()

>>> # Clean up from within the first Python shell
>>> del b  # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close()
>>> shm.unlink()  # Free and release the shared memory block at the very end
class multiprocessing.managers.SharedMemoryManager([address[, authkey]])

multiprocessing.managers.BaseManager 的子类,可被用于跨进程的共享内存块管理。

SharedMemoryManager 实例上调用 start() 方法会导致启动一个新进程。 这个新进程的唯一目的就是管理所有通过它创建的共享内存块的生命周期。 想要释放该进程所管理的全部共享内存块,可以在实例上调用 shutdown()。 这会触发执行该进程所管理的所有 SharedMemory 对象上的 unlink() 调用,然后停止该进程本身。 通过 SharedMemoryManager 创建 SharedMemory 实例,我们可以避免手动跟踪并触发共享内存资源的释放。

这个类提供了创建和返回 SharedMemory 实例的方法,以及以共享内存为基础创建一个列表类对象 (ShareableList) 的方法。

请参阅 BaseManager 查看有关被继承的可选输入参数 addressauthkey 以及如何使用它们来从其他进程连接已有的optional input arguments and how they may be used to connect to an existing SharedMemoryManager 服务的说明。

SharedMemory(size)

新建并返回一个具有指定的 size 个字节的 SharedMemory 对象。

ShareableList(sequence)

新建并返回一个 ShareableList 对象,使用从 sequence 输入的值来初始化。

下面的例子展示了 SharedMemoryManager 的基本机制:

>>> from multiprocessing.managers import SharedMemoryManager
>>> smm = SharedMemoryManager()
>>> smm.start()  # Start the process that manages the shared memory blocks
>>> sl = smm.ShareableList(range(4))
>>> sl
ShareableList([0, 1, 2, 3], name='psm_6572_7512')
>>> raw_shm = smm.SharedMemory(size=128)
>>> another_sl = smm.ShareableList('alpha')
>>> another_sl
ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221')
>>> smm.shutdown()  # Calls unlink() on sl, raw_shm, and another_sl

下面的例子展示了使用 SharedMemoryManager 对象的一种更方便的方式,通过 with 语句来确保所有共享内存块在它们不再被需要时得到释放:

>>> with SharedMemoryManager() as smm:
...     sl = smm.ShareableList(range(2000))
...     # Divide the work among two processes, storing partial results in sl
...     p1 = Process(target=do_work, args=(sl, 0, 1000))
...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
...     p1.start()
...     p2.start()  # A multiprocessing.Pool might be more efficient
...     p1.join()
...     p2.join()   # Wait for all work to complete in both processes
...     total_result = sum(sl)  # Consolidate the partial results now in sl

当在 with 语句中使用 SharedMemoryManager 对象时,使用这个管理器创建的共享内存块会在 with 语句代码块结束执行时全部被释放。

class multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)

提供一个可变的列表型对象,其中存储的所有值都是存储在一个共享内存块中。 这会将可存储的值限制为下列内置数据类型:

  • int (有符号 64 位)

  • float

  • bool

  • str (当使用 UTF-8 编码时每个小于 10M 字节)

  • bytes (每个小于 10M 字节)

  • None

它与内置 list 类型的显著区别还在于这些列表无法改变其总长度(即没有 append(), insert() 等)并且不支持通过切片动态地创建新的 ShareableList

sequence 会被用来填充已有值的新 ShareableList。 设为 None 则会基于唯一的共享内存名称联系到现有的 ShareableList

name 是所请求的共享内存的唯一名称,与 SharedMemory 的定义中描述的一致。 当关联到现有的 ShareableList 时,将指明其共享内存块的唯一名称并将 sequence 设为 None

备注

bytesstr 值存在一个已知问题。 如果它们以 \x00 空字节或字符结尾,那么当按索引号从 ShareableList 提取这些值时它们可能会被 静默地去除。 这种 .rstrip(b'\x00') 行为并认为是一个程序错误并可能在未来被修复。 参见 gh-106939

对于某些应用来说在右侧截去尾部空值会造成问题,要绕过此问题可以在存储这样的值时总是无条件地在其末尾附加一个额外的非 0 字节并在获取时无条件地移除它:

>>> from multiprocessing import shared_memory
>>> nul_bug_demo = shared_memory.ShareableList(['?\x00', b'\x03\x02\x01\x00\x00\x00'])
>>> nul_bug_demo[0]
'?'
>>> nul_bug_demo[1]
b'\x03\x02\x01'
>>> nul_bug_demo.shm.unlink()
>>> padded = shared_memory.ShareableList(['?\x00\x07', b'\x03\x02\x01\x00\x00\x00\x07'])
>>> padded[0][:-1]
'?\x00'
>>> padded[1][:-1]
b'\x03\x02\x01\x00\x00\x00'
>>> padded.shm.unlink()
count(value)

返回 value 出现的次数。

index(value)

返回 value 首次出现的索引位置。 如果 value 不存在则会引发 ValueError

format

包含由所有当前存储值所使用的 struct 打包格式的只读属性。

shm

存储了值的 SharedMemory 实例。

下面的例子演示了 ShareableList 实例的基本用法:

>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice'  # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
  ...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a  # Use of a ShareableList after call to unlink() is unsupported

下面的例子演示了一个、两个或多个进程如何通过提供下层的共享内存块名称来访问同一个 ShareableList:

>>> b = shared_memory.ShareableList(range(5))         # In a first process
>>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
>>> c
ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
-999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()

下面的例子显示 ShareableList (以及下层的 SharedMemory) 对象可以在必要时被封存和解封。 请注意,它将仍然为同一个共享对象。 出现这种情况是因为被反序列化的对象具有相同的唯一名称并会使用这个相同的名称附加到现有的对象上(如果对象仍然存活):

>>> import pickle
>>> from multiprocessing import shared_memory
>>> sl = shared_memory.ShareableList(range(10))
>>> list(sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> deserialized_sl = pickle.loads(pickle.dumps(sl))
>>> list(deserialized_sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl[0] = -1
>>> deserialized_sl[1] = -2
>>> list(sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> list(deserialized_sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> sl.shm.close()
>>> sl.shm.unlink()