heapq --- ヒープキューアルゴリズム

ソースコード: Lib/heapq.py


このモジュールではヒープキューアルゴリズムの一実装を提供しています。優先度キューアルゴリズムとしても知られています。

ヒープとは、各親ノードがどの子ノードよりも小さいか等しい値を持つ二分木のことです。この条件をヒープ不変式と呼びます。

ヒープの実装には、全ての k について、要素をゼロから数えたときに、 heap[k] <= heap[2*k+1] かつ heap[k] <= heap[2*k+2] となる配列を使用しています。比較のために、存在しない要素を無限大と考えます。ヒープの興味深い特性は、最小の要素が常にルートの heap[0] になることです。

以下の API は教科書におけるヒープアルゴリズムとは 2 つの側面で異なっています: (a) ゼロベースのインデクス化を行っています。これにより、ノードに対するインデクスとその子ノードのインデクスの関係がやや明瞭でなくなりますが、Python はゼロベースのインデクス化を使っているのでよりしっくりきます。(b) われわれの pop メソッドは最大の要素ではなく最小の要素 (教科書では "min heap:最小ヒープ" と呼ばれています; 教科書では並べ替えをインプレースで行うのに適した "max heap:最大ヒープ" が一般的です)。

これらの 2 点によって、ユーザに戸惑いを与えることなく、ヒープを通常の Python リストとして見ることができます: heap[0] が最小の要素となり、heap.sort() はヒープ不変式を保ちます!

ヒープを作成するには、 [] に初期化されたリストを使うか、 heapify() を用いて要素の入ったリストを変換します。

次の関数が用意されています:

heapq.heappush(heap, item)

itemheap に push します。ヒープ不変式を保ちます。

heapq.heappop(heap)

pop を行い、 heap から最小の要素を返します。ヒープ不変式は保たれます。ヒープが空の場合、 IndexError が送出されます。pop せずに最小の要素にアクセスするには、 heap[0] を使ってください。

heapq.heappushpop(heap, item)

itemheap に push した後、pop を行って heap から最初の要素を返します。この一続きの動作を heappush() に引き続いて heappop() を別々に呼び出すよりも効率的に実行します。

heapq.heapify(x)

リスト x をインプレース処理し、線形時間でヒープに変換します。

heapq.heapreplace(heap, item)

heap から最小の要素を pop して返し、新たに item を push します。ヒープのサイズは変更されません。ヒープが空の場合、 IndexError が送出されます。

この一息の演算は heappop() に次いで heappush() を送出するよりも効率的で、固定サイズのヒープを用いている場合にはより適しています。 pop/push の組み合わせは必ずヒープから要素を一つ返し、それを item と置き換えます。

返される値は加えられた item よりも大きくなるかもしれません。それを望まないなら、代わりに heappushpop() を使うことを考えてください。この push/pop の組み合わせは二つの値の小さい方を返し、大きい方の値をヒープに残します。

このモジュールではさらに3つのヒープに基く汎用関数を提供します。

heapq.merge(*iterables, key=None, reverse=False)

複数のソートされた入力をマージ(merge)して一つのソートされた出力にします (たとえば、複数のログファイルの時刻の入ったエントリーをマージします)。ソートされた値にわたる iterator を返します。

sorted(itertools.chain(*iterables)) と似ていますが、イテレータを返し、一度にはデータをメモリに読み込まず、それぞれの入力ストリームが予め(最小から最大へ)ソートされていることを仮定します。

2 つのオプション引数があり、これらはキーワード引数として指定されなければなりません。

key は 1 つの引数からなる key function を指定します。この関数は、入力の各要素から比較のキーを取り出すのに使われます。デフォルト値は None です (要素を直接比較します)。

reverse は真偽値です。 True を設定した場合、挿入要素は逆向きに比較されたかのように結合されます。 sorted(itertools.chain(*iterables), reverse=True) でこれに似た挙動を実現するには、全てのイテラブルは降順で並んでいなければなりません。

バージョン 3.5 で変更: オプションの key 引数および reverse 引数を追加.

heapq.nlargest(n, iterable, key=None)

iterable で定義されるデータセットのうち、最大値から降順に n 個の値のリストを返します。(あたえられた場合) key は、引数を一つとる、iterable のそれぞれの要素から比較キーを生成する関数を指定します(例 key=str.lower)。以下のコードと同等です: sorted(iterable, key=key, reverse=True)[:n]

heapq.nsmallest(n, iterable, key=None)

iterable で定義されるデータセットのうち、最小値から昇順に n 個の値のリストを返します。(あたえられた場合) key は、引数を一つとる、iterable のそれぞれの要素から比較キーを生成する関数を指定します(例 key=str.lower)。以下のコードと同等です: sorted(iterable, key=key)[:n]

後ろ二つの関数は n の値が小さな場合に最適な動作をします。大きな値の時には sorted() 関数の方が効率的です。さらに、 n==1 の時には min() および max() 関数の方が効率的です。この関数を繰り返し使うことが必要なら、iterable を実際のヒープに変えることを考えてください。

基本的な例

すべての値をヒープに push してから最小値を 1 つずつ pop することで、ヒープソート を実装できます:

>>> def heapsort(iterable):
...     h = []
...     for value in iterable:
...         heappush(h, value)
...     return [heappop(h) for i in range(len(h))]
...
>>> heapsort([1, 3, 5, 7, 9, 2, 4, 6, 8, 0])
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

これは sorted(iterable) に似ていますが、 sorted() とは異なり、この実装はステーブルソートではありません。

ヒープの要素はタプルに出来ます。これは、追跡される主レコードとは別に (タスクの優先度のような) 比較値を指定するときに便利です:

>>> h = []
>>> heappush(h, (5, 'write code'))
>>> heappush(h, (7, 'release product'))
>>> heappush(h, (1, 'write spec'))
>>> heappush(h, (3, 'create tests'))
>>> heappop(h)
(1, 'write spec')

優先度キュー実装の注釈

優先度つきキュー は、ヒープの一般的な使い方で、実装にはいくつか困難な点があります:

  • ソート安定性: 優先度が等しい二つのタスクが、もともと追加された順序で返されるためにはどうしたらいいでしょうか?

  • (priority, task) ペアに対するタプルの比較は、priority が同じで task がデフォルトの比較順を持たないときに破綻します。

  • あるタスクの優先度が変化したら、どうやってそれをヒープの新しい位置に移動させるのでしょうか?

  • 未解決のタスクが削除される必要があるとき、どのようにそれをキューから探して削除するのでしょうか?

最初の二つの困難の解決策は、項目を優先度、項目番号、そしてタスクを含む 3 要素のリストとして保存することです。この項目番号は、同じ優先度の二つのタスクが、追加された順序で返されるようにするための同点決勝戦として働きます。そして二つの項目番号が等しくなることはありませんので、タプルの比較が二つのタスクを直接比べようとすることはありえません。

比較できないタスク問題のもう一つの解決法は、タスクアイテムを無視して優先順序フィールドだけで比較するラッパークラスです:

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)

残りの困難は主に、未解決のタスクを探して、その優先度を変更したり、完全に削除することです。タスクを探すことは、キュー内の項目を指し示す辞書によってなされます。

項目を削除したり、優先度を変更することは、ヒープ構造の不変関係を壊すことになるので、もっと難しいです。ですから、可能な解決策は、その項目が無効であるものとしてマークし、必要なら変更された優先度の項目を加えることです:

pq = []                         # list of entries arranged in a heap
entry_finder = {}               # mapping of tasks to entries
REMOVED = '<removed-task>'      # placeholder for a removed task
counter = itertools.count()     # unique sequence count

def add_task(task, priority=0):
    'Add a new task or update the priority of an existing task'
    if task in entry_finder:
        remove_task(task)
    count = next(counter)
    entry = [priority, count, task]
    entry_finder[task] = entry
    heappush(pq, entry)

def remove_task(task):
    'Mark an existing task as REMOVED.  Raise KeyError if not found.'
    entry = entry_finder.pop(task)
    entry[-1] = REMOVED

def pop_task():
    'Remove and return the lowest priority task. Raise KeyError if empty.'
    while pq:
        priority, count, task = heappop(pq)
        if task is not REMOVED:
            del entry_finder[task]
            return task
    raise KeyError('pop from an empty priority queue')

理論

ヒープとは、全ての k について、要素を 0 から数えたときに、a[k] <= a[2*k+1] かつ a[k] <= a[2*k+2] となる配列です。比較のために、存在しない要素を無限大と考えます。ヒープの興味深い属性は a[0] が常に最小の要素になることです。

上記の奇妙な不変式は、勝ち抜き戦判定の際に効率的なメモリ表現を行うためのものです。以下の番号は a[k] ではなく k とします:

                               0

              1                                 2

      3               4                5               6

  7       8       9       10      11      12      13      14

15 16   17 18   19 20   21 22   23 24   25 26   27 28   29 30

上の木構造では、各セル k2*k+1 および 2*k+2 を最大値としています。スポーツに見られるような通常の 2 つ組勝ち抜き戦では、各セルはその下にある二つのセルに対する勝者となっていて、個々のセルの勝者を追跡していくことにより、そのセルに対する全ての相手を見ることができます。しかしながら、このような勝ち抜き戦を使う計算機アプリケーションの多くでは、勝歴を追跡する必要はりません。メモリ効率をより高めるために、勝者が上位に進級した際、下のレベルから持ってきて置き換えることにすると、あるセルとその下位にある二つのセルは異なる三つの要素を含み、かつ上位のセルは二つの下位のセルに対して "勝者と" なります。

このヒープ不変式が常に守られれば、インデックス 0 は明らかに最勝者となります。最勝者の要素を除去し、"次の" 勝者を見つけるための最も単純なアルゴリズム的手法は、ある敗者要素 (ここでは上図のセル 30 とします) を 0 の場所に持っていき、この新しい 0 を濾過するようにしてツリーを下らせて値を交換してゆきます。不変関係が再構築されるまでこれを続けます。この操作は明らかに、ツリー内の全ての要素数に対して対数的な計算量となります。全ての要素について繰り返すと、 O(n log n) のソートになります。

このソートの良い点は、新たに挿入する要素が、最後に取り出された 0 番目の要素よりも "良い値" でない限り、ソートを行っている最中に新たな要素を効率的に追加できるというところです。この性質は、シミュレーション的な状況で、ツリーで全ての入力イベントを保持し、"勝者" の状況を最小のスケジュール時刻にするような場合に特に便利です。あるイベントが他のイベント群の実行をスケジュールする際、それらは未来にスケジュールされることになるので、それらのイベント群を容易にヒープに積むことができます。すなわち、ヒープはスケジューラを実装する上で良いデータ構造であるといえます (私はこれを MIDI シーケンサで使っています :-)。

これまで、スケジューラを実装するための様々なデータ構造が広範に研究されてきました。ヒープは、十分高速で、速度はおおむね一定であり、最悪の場合でも平均的な速度とさほど変わらないため、良いデータ構造といえます。しかし、最悪の場合にひどい速度になるとしても、全体的にはより効率の高い他のデータ構造表現も存在します。

ヒープはまた、巨大なディスクのソートでも非常に有用です。おそらくご存知のように、巨大なソートを行うと、複数の "ラン (run)" (予めソートされた配列で、そのサイズは通常 CPU メモリの量に関係しています) が生成され、続いて統合処理 (merging) がこれらのランを判定します。この統合処理はしばしば非常に巧妙に組織されています [1]。重要なのは、最初のソートが可能な限り長いランを生成することです。勝ち抜き戦はこれを達成するための良い方法です。もし利用可能な全てのメモリを使って勝ち抜き戦を行い、要素を置換および濾過処理して現在のランに収めれば、ランダムな入力に対してメモリの二倍のサイズのランを生成することになり、大体順序づけがなされている入力に対してはもっと高い効率になります。

さらに、ディスク上の 0 番目の要素を出力して、現在の勝ち抜き戦に (最後に出力した値に "勝って" しまうために) 収められない入力を得たなら、ヒープには収まらないため、ヒープのサイズは減少します。解放されたメモリは二つ目のヒープを段階的に構築するために巧妙に再利用することができ、この二つ目のヒープは最初のヒープが崩壊していくのと同じ速度で成長します。最初のヒープが完全に消滅したら、ヒープを切り替えて新たなランを開始します。なんと巧妙で効率的なのでしょう!

一言で言うと、ヒープは知って得するメモリ構造です。私はいくつかのアプリケーションでヒープを使っていて、'ヒープ' モジュールを常備するのはいい事だと考えています。:-)

脚注