heapq — File de priorité basée sur un tas

Code source : Lib/heapq.py


Ce module expose une implémentation de l'algorithme de file de priorité, basée sur un tas.

Les tas sont des arbres binaires pour lesquels chaque valeur portée par un nœud est inférieure ou égale à celle de ses deux fils. Cette implémentation utilise des tableaux pour lesquels tas[k] <= tas[2*k+1] et tas[k] <= tas[2*k+2] pour tout k, en commençant la numérotation à zéro. Pour contenter l'opérateur de comparaison, les éléments inexistants sont considérés comme porteur d'une valeur infinie. L'intérêt du tas est que son plus petit élément est toujours la racine, tas[0].

L'API ci-dessous diffère de la file de priorité classique par deux aspects : (a) l'indiçage commence à zéro. Cela complexifie légèrement la relation entre l'indice d'un nœud et les indices de ses fils mais est alignée avec l'indiçage commençant à zéro que Python utilise. (b) La méthode pop renvoie le plus petit élément et non le plus grand (appelé « tas-min » dans les manuels scolaires ; le « tas-max » étant généralement plus courant dans la littérature car il permet le classement sans tampon).

Ces deux points permettent d'aborder le tas comme une liste Python standard sans surprise : heap[0] est le plus petit élément et heap.sort() conserve l'invariant du tas !

Pour créer un tas, utilisez une liste initialisée à [] ou bien utilisez une liste existante et transformez la en tas à l'aide de la fonction heapify().

Les fonctions suivantes sont fournies :

heapq.heappush(heap, item)

Introduit la valeur item dans le tas heap, en conservant l'invariance du tas.

heapq.heappop(heap)

Extraie le plus petit élément de heap en préservant l'invariant du tas. Si le tas est vide, une exception IndexError est levée. Pour accéder au plus petit élément sans le retirer, utilisez heap[0].

heapq.heappushpop(heap, item)

Introduit l'élément item dans le tas, puis extraie le plus petit élément de heap. Cette action combinée est plus efficace que heappush() suivie par un appel séparé à heappop().

heapq.heapify(x)

Transforme une liste x en un tas, sans utiliser de tampon et en temps linéaire.

heapq.heapreplace(heap, item)

Extraie le plus petit élément de heap et introduit le nouvel élément item. La taille du tas ne change pas. Si le tas est vide, une exception IndexError est levée.

Cette opération en une étape est plus efficace qu'un appel à heappop() suivi d'un appel à heappush() et est plus appropriée lorsque le tas est de taille fixe. La combinaison pop/push renvoie toujours un élément du tas et le remplace par item.

La valeur renvoyée peut être plus grande que l'élément item ajouté. Si cela n'est pas souhaitable, utilisez plutôt heappushpop() à la place. Sa combinaison push/pop renvoie le plus petit élément des deux valeurs et laisse la plus grande sur le tas.

Ce module contient également trois fonctions génériques utilisant les tas.

heapq.merge(*iterables, key=None, reverse=False)

Fusionne plusieurs entrées ordonnées en une unique sortie ordonnée (par exemple, fusionne des entrées datées provenant de multiples journaux applicatifs). Renvoie un iterator sur les valeurs ordonnées.

Similaire à sorted(itertools.chain(*iterables)) mais renvoie un itérable, ne stocke pas toutes les données en mémoire en une fois et suppose que chaque flux d'entrée est déjà classé (en ordre croissant).

A deux arguments optionnels qui doivent être fournis par mot clef.

key spécifie une key function d'un argument utilisée pour extraire une clef de comparaison de chaque élément de la liste. La valeur par défaut est None (compare les éléments directement).

reverse est une valeur booléenne. Si elle est True, la liste d'éléments est fusionnée comme si toutes les comparaisons étaient inversées. Pour obtenir un comportement similaire à sorted(itertools.chain(*iterables), reverse=True), tous les itérables doivent être classés par ordre décroissant.

Modifié dans la version 3.5: Ajout des paramètres optionnels key et reverse.

heapq.nlargest(n, iterable, key=None)

Renvoie une liste contenant les n plus grands éléments du jeu de données défini par iterable. Si l'option key est fournie, celle-ci spécifie une fonction à un argument qui est utilisée pour extraire la clé de comparaison de chaque élément dans iterable (par exemple, key=str.lower). Équivalent à : sorted(iterable, key=key, reverse=True)[:n].

heapq.nsmallest(n, iterable, key=None)

Renvoie une liste contenant les n plus petits éléments du jeu de données défini par iterable. Si l'option key est fournie, celle-ci spécifie une fonction à un argument qui est utilisée pour extraire la clé de comparaison de chaque élément dans iterable (par exemple, key=str.lower). Équivalent à : sorted(iterable, key=key)[:n].

Les deux fonctions précédentes sont les plus efficaces pour des petites valeurs de n. Pour de grandes valeurs, il est préférable d'utiliser la fonction sorted(). En outre, lorsque n==1, il est plus efficace d'utiliser les fonctions natives min() et max(). Si vous devez utiliser ces fonctions de façon répétée, il est préférable de transformer l'itérable en tas.

Exemples simples

Un tri par tas peut être implémenté en introduisant toutes les valeurs dans un tas puis en effectuant l'extraction des éléments un par un :

>>> def heapsort(iterable):
...     h = []
...     for value in iterable:
...         heappush(h, value)
...     return [heappop(h) for i in range(len(h))]
...
>>> heapsort([1, 3, 5, 7, 9, 2, 4, 6, 8, 0])
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Ceci est similaire à sorted(iterable) mais, contrairement à sorted(), cette implémentation n'est pas stable.

Les éléments d'un tas peuvent être des n-uplets. C'est pratique pour assigner des valeurs de comparaison (par exemple, des priorités de tâches) en plus de l'élément qui est suivi :

>>> h = []
>>> heappush(h, (5, 'write code'))
>>> heappush(h, (7, 'release product'))
>>> heappush(h, (1, 'write spec'))
>>> heappush(h, (3, 'create tests'))
>>> heappop(h)
(1, 'write spec')

Notes d'implémentation de la file de priorité

Une file de priorité est une application courante des tas et présente plusieurs défis d'implémentation :

  • Stabilité du classement : comment s'assurer que deux tâches avec la même priorité sont renvoyées dans l'ordre de leur ajout ?

  • La comparaison des couples (priorité, tâche) échoue si les priorités sont identiques et que les tâches n'ont pas de relation d'ordre par défaut.

  • Si la priorité d'une tâche change, comment la déplacer à sa nouvelle position dans le tas ?

  • Si une tâche en attente doit être supprimée, comment la trouver et la supprimer de la file ?

Une solution aux deux premiers problèmes consiste à stocker les entrées sous forme de liste à 3 éléments incluant la priorité, le numéro d'ajout et la tâche. Le numéro d'ajout sert à briser les égalités de telle sorte que deux tâches avec la même priorité sont renvoyées dans l'ordre de leur insertion. Puisque deux tâches ne peuvent jamais avoir le même numéro d'ajout, la comparaison des triplets ne va jamais chercher à comparer des tâches entre elles.

Une autre solution au fait que les tâches ne possèdent pas de relation d'ordre est de créer une classe d'encapsulation qui ignore l'élément tâche et ne compare que le champ priorité :

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)

Le problème restant consiste à trouver une tâche en attente et modifier sa priorité ou la supprimer. Trouver une tâche peut être réalisé à l'aide d'un dictionnaire pointant vers une entrée dans la file.

Supprimer une entrée ou changer sa priorité est plus difficile puisque cela romprait l'invariant de la structure de tas. Une solution possible est de marquer l'entrée comme supprimée et d'ajouter une nouvelle entrée avec sa priorité modifiée :

pq = []                         # list of entries arranged in a heap
entry_finder = {}               # mapping of tasks to entries
REMOVED = '<removed-task>'      # placeholder for a removed task
counter = itertools.count()     # unique sequence count

def add_task(task, priority=0):
    'Add a new task or update the priority of an existing task'
    if task in entry_finder:
        remove_task(task)
    count = next(counter)
    entry = [priority, count, task]
    entry_finder[task] = entry
    heappush(pq, entry)

def remove_task(task):
    'Mark an existing task as REMOVED.  Raise KeyError if not found.'
    entry = entry_finder.pop(task)
    entry[-1] = REMOVED

def pop_task():
    'Remove and return the lowest priority task. Raise KeyError if empty.'
    while pq:
        priority, count, task = heappop(pq)
        if task is not REMOVED:
            del entry_finder[task]
            return task
    raise KeyError('pop from an empty priority queue')

Théorie

Les tas sont des tableaux pour lesquels a[k] <= a[2*k+1] et a[k] <= a[2*k+2] pour tout k en comptant les éléments à partir de 0. Pour simplifier la comparaison, les éléments inexistants sont considérés comme étant infinis. L'intérêt des tas est que a[0] est toujours leur plus petit élément.

L'invariant étrange ci-dessus est une représentation efficace en mémoire d'un tournoi. Les nombres ci-dessous sont k et non a[k] :

                               0

              1                                 2

      3               4                5               6

  7       8       9       10      11      12      13      14

15 16   17 18   19 20   21 22   23 24   25 26   27 28   29 30

Dans l'arbre ci-dessus, chaque nœud k a pour enfants 2*k+1 et 2*k+2. Dans les tournois binaires habituels dans les compétitions sportives, chaque nœud est le vainqueur des deux nœuds inférieurs et nous pouvons tracer le chemin du vainqueur le long de l'arbre afin de voir qui étaient ses adversaires. Cependant, dans de nombreuses applications informatiques de ces tournois, nous n'avons pas besoin de produire l'historique du vainqueur. Afin d'occuper moins de mémoire, on remplace le vainqueur lors de sa promotion par un autre élément à un plus bas niveau. La règle devient alors qu'un nœud et les deux nœuds qu'il chapeaute contiennent trois éléments différents, mais le nœud supérieur « gagne » contre les deux nœuds inférieurs.

If this heap invariant is protected at all time, index 0 is clearly the overall winner. The simplest algorithmic way to remove it and find the "next" winner is to move some loser (let's say cell 30 in the diagram above) into the 0 position, and then percolate this new 0 down the tree, exchanging values, until the invariant is re-established. This is clearly logarithmic on the total number of items in the tree. By iterating over all items, you get an O(n log n) sort.

Une propriété agréable de cet algorithme est qu'il est possible d'insérer efficacement de nouveaux éléments en cours de classement, du moment que les éléments insérés ne sont pas « meilleurs » que le dernier élément qui a été extrait. Ceci s'avère très utile dans des simulations où l'arbre contient la liste des événements arrivants et que la condition de « victoire » est le plus petit temps d'exécution planifié. Lorsqu'un événement programme l'exécution d'autres événements, ceux-ci sont planifiés pour le futur et peuvent donc rejoindre le tas. Ainsi, le tas est une bonne structure pour implémenter un ordonnanceur (et c'est ce que j'ai utilisé pour mon séquenceur MIDI ☺).

Plusieurs structures ont été étudiées en détail pour implémenter des ordonnanceurs et les tas sont bien adaptés : ils sont raisonnablement rapides, leur vitesse est presque constante et le pire cas ne diffère pas trop du cas moyen. S'il existe des représentations qui sont plus efficaces en général, les pires cas peuvent être terriblement mauvais.

Les tas sont également très utiles pour ordonner les données sur de gros disques. Vous savez probablement qu'un gros tri implique la production de séquences pré-classées (dont la taille est généralement liée à la quantité de mémoire CPU disponible), suivie par une passe de fusion qui est généralement organisée de façon très intelligente [1]. Il est très important que le classement initial produise des séquences les plus longues possibles. Les tournois sont une bonne façon d'arriver à ce résultat. Si, en utilisant toute la mémoire disponible pour stocker un tournoi, vous remplacez et faites percoler les éléments qui s'avèrent acceptables pour la séquence courante, vous produirez des séquences d'une taille égale au double de la mémoire pour une entrée aléatoire et bien mieux pour une entrée approximativement triée.

Qui plus est, si vous écrivez l'élément 0 sur le disque et que vous recevez en entrée un élément qui n'est pas adapté au tournoi actuel (parce que sa valeur « gagne » par rapport à la dernière valeur de sortie), alors il ne peut pas être stocké dans le tas donc la taille de ce dernier diminue. La mémoire libérée peut être réutilisée immédiatement pour progressivement construire un deuxième tas, qui croit à la même vitesse que le premier décroît. Lorsque le premier tas a complètement disparu, vous échangez les tas et démarrez une nouvelle séquence. Malin et plutôt efficace !

Pour résumer, les tas sont des structures de données qu'il est bon de connaître. Je les utilise dans quelques applications et je pense qu'il est bon de garder le module heap sous le coude. ☺

Notes