8.5. heapq — 堆積佇列 (heap queue) 演算法

原始碼: Lib/heapq.py


這個模組實作了堆積佇列 (heap queue) 演算法,亦被稱為優先佇列 (priority queue) 演算法。

Heap(堆積)是一顆二元樹,樹上所有父節點的值都小於等於他的子節點的值。使用陣列實作,對於所有從0開始的 k 都滿足 heap[k] <= heap[2*k+1]heap[k] <= heap[2*k+2] 。為了比較節點的值,不存在的元素被視為無限大。heap 存在一個有趣的性質:樹上最小的元素永遠會在根節點 heap[0] 上。

下方的 API 跟一般教科書的 heap queue 演算法有兩個方面不同:第一,我們的索引從 0 開始計算,這會父節點與子節點之間的關係產生很微小的差異,但更符合 Python 從 0 開始索引的設計。第二,我們的 pop 方法會回傳最小的元素而不是最大的元素 ( 在教科書中被稱作 「min heap」,而 「max heap」 因為他很適合做原地排序,所以更常出現在教科書中 )。

這兩個特性使得把 heap 當作一個標準的 Python list 檢視時不會出現意外:heap[0] 是最小的物件,heap.sort() 能保持 heap 的性質不變!

建立一個 heap 可以使用 list 初始化為 [],或者使用函式 heapify() 將一個已經有元素的 list轉成一個 heap。

此模組提供下面的函式

heapq.heappush(heap, item)

item 放進 heap,並保持 heap 性質不變。

heapq.heappop(heap)

heap 取出並回傳最小的元素,同時保持 heap 性質不變。如果 heap 是空的會產生 IndexError 錯誤。只存取最小元素但不取出可以使用 heap[0]

heapq.heappushpop(heap, item)

item 放入 heap ,接著從 heap 取出並回傳最小的元素。這個組合函式比呼叫 heappush() 之後呼叫 heappop() 更有效率。

heapq.heapify(x)

在線性時間內將 list x 轉為 heap,且過程不會申請額外記憶體。

heapq.heapreplace(heap, item)

heap 取出並回傳最小的元素,接著將新的 item 放進heap。heap 的大小不會改變。如果 heap 是空的會產生 IndexError 錯誤。

這個一次完成的操作會比呼叫 heappop() 之後呼叫 heappush() 更有效率,並在維護 heap 的大小不變時更為適當,取出/放入的組合函式一定會從 heap 回傳一個元素並用 item 取代他。

函式的回傳值可能會大於被加入的 item 。如果這不是你期望發生的,可以考慮使用 heappushpop() 替代,他會回傳 heap 的最小值和 item 兩個當中比較小的那個,並將大的留在 heap 內。

這個模組也提供三個利用 heap 實作的一般用途函式

heapq.merge(*iterables, key=None, reverse=False)

合併多個已排序的輸入並產生單一且已排序的輸出(舉例:合併來自多個 log 檔中有時間戳記的項目)。回傳一個 iterator 包含已經排序的值。

sorted(itertools.chain(*iterables)) 類似但回傳值是一個 iterable ,不會一次把所有資料都放進記憶體中,並且假設每一個輸入都已經(由小到大)排序過了。

有兩個選用參數,指定時必須被當作關鍵字參數指定。

key 參數指定了一個 key function 引數,用來從每一個輸入的元素中決定一個比較的依據。預設的值是 None (直接比較元素)。

reverse is a boolean value. If set to True, then the input elements are merged as if each comparison were reversed.

3.5 版更變: 加入選用參數 keyreverse

heapq.nlargest(n, iterable, key=None)

Return a list with the n largest elements from the dataset defined by iterable. key, if provided, specifies a function of one argument that is used to extract a comparison key from each element in the iterable: key=str.lower Equivalent to: sorted(iterable, key=key, reverse=True)[:n]

heapq.nsmallest(n, iterable, key=None)

Return a list with the n smallest elements from the dataset defined by iterable. key, if provided, specifies a function of one argument that is used to extract a comparison key from each element in the iterable: key=str.lower Equivalent to: sorted(iterable, key=key)[:n]

後兩個函式在 n 值比較小時有最好的表現。對於較大的 n 值,只用 sorted() 函式會更有效率。同樣地,當 n==1 時,使用內建函式 min()max() 會有更好的效率。如果需要重複使用這些函式,可以考慮將 iterable 轉成真正的 heap 。

8.5.1. 基礎範例

堆排序 可以通过将所有值推入堆中然后每次弹出一个最小值项来实现。

>>> def heapsort(iterable):
...     h = []
...     for value in iterable:
...         heappush(h, value)
...     return [heappop(h) for i in range(len(h))]
...
>>> heapsort([1, 3, 5, 7, 9, 2, 4, 6, 8, 0])
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

雖然類似 sorted(iterable) ,但跟 sorted() 不同的是,這個實作不是 stable 的排序。

Heap 中的元素可以是 tuple 。這有利於將要比較的值(例如一個 task 的優先度)和主要資料放在一起排序。

>>> h = []
>>> heappush(h, (5, 'write code'))
>>> heappush(h, (7, 'release product'))
>>> heappush(h, (1, 'write spec'))
>>> heappush(h, (3, 'create tests'))
>>> heappop(h)
(1, 'write spec')

8.5.2. 優先佇列 (Priority Queue) 實作細節

优先队列 是堆的常用场合,并且它的实现包含了多个挑战:

  • 排序的穩定性:你如何將兩個擁有相同 priority 的 task 按照他們被加入的順序回傳。
  • Tuple的排序在某些情況下會壞掉,例如當 Tuple (priority, task) 的 priorities 相等且 tasks 沒有一個預設的排序時。
  • 當一個 heap 中 task 的 priority 改變時,你如何將它移到 heap 正確的位置上。
  • 或者一個還沒被解決的 task 需要被刪除時,你要如何從佇列中找到並刪除指定的 task。

一個針對前兩個問題的解法是:儲存一個包含 priority 、 entry count 和 task 三個元素的 tuple 。兩個 task 有相同 priority 時, entry count 會讓兩個 task 能根據加入的順序排序。因為沒有任何兩個 task 擁有相同的 entry count ,所以永遠不會直接使用 task 做比較。

剩下的問題可以藉由找到要刪除的 task 並更改它的 priority 或者直接將它移除。尋找一個 task 可以使用一個 dictionary 指向佇列當中的 entry 。

移除 entry 或更改它的 priority 更為困難,因為這會破壞 heap 的性質。所以一個可行的方案是將原本的 entry 做一個標記表示它已經被刪除,並新增一個擁有新的 priority 的 entry 。

pq = []                         # list of entries arranged in a heap
entry_finder = {}               # mapping of tasks to entries
REMOVED = '<removed-task>'      # placeholder for a removed task
counter = itertools.count()     # unique sequence count

def add_task(task, priority=0):
    'Add a new task or update the priority of an existing task'
    if task in entry_finder:
        remove_task(task)
    count = next(counter)
    entry = [priority, count, task]
    entry_finder[task] = entry
    heappush(pq, entry)

def remove_task(task):
    'Mark an existing task as REMOVED.  Raise KeyError if not found.'
    entry = entry_finder.pop(task)
    entry[-1] = REMOVED

def pop_task():
    'Remove and return the lowest priority task. Raise KeyError if empty.'
    while pq:
        priority, count, task = heappop(pq)
        if task is not REMOVED:
            del entry_finder[task]
            return task
    raise KeyError('pop from an empty priority queue')

8.5.3. 原理

Heap 是一個陣列對於所有從0開始的 index k 都存在性質 a[k] <= a[2*k+1]a[k] <= a[2*k+2] 。為了方便比較,不存在的元素被視為無限大。一個有趣的 heap 性質是 a[0] 永遠是最小的元素。

上面的特殊不变量是用来作为一场锦标赛的高效内存表示。 下面的数字是 k 而不是 a[k]:

                               0

              1                                 2

      3               4                5               6

  7       8       9       10      11      12      13      14

15 16   17 18   19 20   21 22   23 24   25 26   27 28   29 30

在上面的树中,每个 k 单元都位于 2*k+12*k+2 之上。 体育运动中我们经常见到二元锦标赛模式,每个胜者单元都位于另两个单元之上,并且我们可以沿着树形图向下追溯胜者所遇到的所有对手。 但是,在许多采用这种锦标赛模式的计算机应用程序中,我们并不需要追溯胜者的历史。 为了获得更高的内存利用效率,当一个胜者晋级时,我们会用较低层级的另一条目来替代它,因此规则变为一个单元和它之下的两个单元包含三个不同条目,上方单元“胜过”了两个下方单元。

如果此堆的不变量始终受到保护,则序号 0 显然是最后的赢家。 删除它并找出“下一个”赢家的最简单算法方式是家某个输家(让我们假定是上图中的 30 号单元)移至 0 号位置,然后将这个新的 0 号沿树下行,不断进行值的交换,直到不变量重新建立。 这显然会是树中条目总数的对数。 通过迭代所有条目,你将得到一个 O(n log n) 复杂度的排序。

此排序有一个很好的特性就是你可以在排序进行期间高效地插入新条目,前提是插入的条目不比你最近取出的 0 号元素“更好”。 这在模拟上下文时特别有用,在这种情况下树保存的是所有传入事件,“胜出”条件是最小调度时间。 当一个事件将其他事件排入执行计划时,它们的调试时间向未来方向延长,这样它们可方便地入堆。 因此,堆结构很适宜用来实现调度器,我的 MIDI 音序器就是用的这个 :-)。

用于实现调度器的各种结构都得到了充分的研究,堆是非常适宜的一种,因为它们的速度相当快,并且几乎是恒定的,最坏的情况与平均情况没有太大差别。 虽然还存在其他总体而言更高效的实现方式,但其最坏的情况却可能非常糟糕。

堆在大磁盘排序中也非常有用。 你应该已经了解大规模排序会有多个“运行轮次”(即预排序的序列,其大小通常与 CPU 内存容量相关),随后这些轮次会进入合并通道,轮次合并的组织往往非常巧妙 [1]。 非常重要的一点是初始排序应产生尽可能长的运行轮次。 锦标赛模式是达成此目标的好办法。 如果你使用全部有用内存来进行锦标赛,替换和安排恰好适合当前运行轮次的条目,你将可以对于随机输入生成两倍于内存大小的运行轮次,对于模糊排序的输入还会有更好的效果。

另外,如果你输出磁盘上的第 0 个条目并获得一个可能不适合当前锦标赛的输入(因为其值要“胜过”上一个输出值),它无法被放入堆中,因此堆的尺寸将缩小。 被释放的内存可以被巧妙地立即重用以逐步构建第二个堆,其增长速度与第一个堆的缩减速度正好相同。 当第一个堆完全消失时,你可以切换新堆并启动新的运行轮次。 这样做既聪明又高效!

总之,堆是值得了解的有用内存结构。 我在一些应用中用到了它们,并且认为保留一个 『heap』 模块是很有意义的。 :-)

註解

[1]当前时代的磁盘平衡算法与其说是巧妙,不如说是麻烦,这是由磁盘的寻址能力导致的结果。 在无法寻址的设备例如大型磁带机上,情况则相当不同,开发者必须非常聪明地(极为提前地)确保每次磁带转动都尽可能地高效(就是说能够最好地加入到合并“进程”中)。 有些磁带甚至能够反向读取,这也被用来避免倒带的耗时。 请相信我,真正优秀的磁带机排序看起来是极其壮观的,排序一向都是一门伟大的艺术! :-)