Contents

Heap drills

Drills for working with Heaps in Python.

Basics

Define a heap.

Heaps are binary trees for which every parent node has a value less than or equal to any of its children.

Load the standard library module that implements heaps. What kind of heaps are supported?

1
2
# heapq implements min heaps. Push *-item* to implement max heap.
import heapq

Turn the below list into a min-heap.

1
heap = [1, -4, 7, 50]
1
heapq.heapify(heap)

Add -1 to the heap.

1
heapq.heappush(heap, -1)

Remove and return the smallest element from the heap.

1
heapq.heappop(heap)
-4

Add -20 to the heap, then remove and return the smalles element.

1
heapq.heappushpop(heap, -20)
-20

Remove and return the smallest element and add 3 to the heap.

1
heapq.heapreplace(heap, 3)
-1

Display the heap.

1
heap
[1, 3, 7, 50]

Return the two largest elements on the heap.

1
heapq.nlargest(2, heap)
[50, 7]

For the heap below, return the two elements with the smallest digits.

1
heap = [("a", 3), ("b", 2), ("c", 1)]
1
heapq.nsmallest(2, heap, key=lambda x: x[1])
[('c', 1), ('b', 2)]

Applications

Accessing shares

For the shares portfolio below, return the data for the three most expensive shares, sorted in descending order.

1
2
3
4
5
6
portfolio = [
    {"name": "IBM", "shares": 100, "price": 91.1},
    {"name": "AAPL", "shares": 50, "price": 543.22},
    {"name": "FB", "shares": 200, "price": 21.09},
    {"name": "HPQ", "shares": 35, "price": 31.75},
]
1
heapq.nlargest(3, portfolio, lambda x: x["price"])
[{'name': 'AAPL', 'shares': 50, 'price': 543.22},
 {'name': 'IBM', 'shares': 100, 'price': 91.1},
 {'name': 'HPQ', 'shares': 35, 'price': 31.75}]

Do the same using operator.itemgetter().

1
2
3
from operator import itemgetter

heapq.nlargest(3, portfolio, key=itemgetter("price"))

Use a faster method that doesn’t rely on heapq to achieve the same result.

1
%timeit sorted(portfolio, key=lambda x: x['price'], reverse=True)[:3]
588 ns ± 16.9 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
1
%timeit heapq.nlargest(3, portfolio, key=lambda x: x['price'])
2.1 µs ± 6.86 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

Calculate the running median for the list below.

Building a priority queue

Use heapq to build a priority queue that supports push() and pop() operations (based on Python Cookbook recipe 1.5 and heapq docs). Specifically, build a queue that takes in items of the form (value, priority), returns items in order or priority with higher values signifying higher priority, and breaks priority ties by returning items in the order they were added. To test the list, push the items (foo, 1), (bar, 3), and (baz, 3). The first pop should return (bar, 3).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import heapq


class Item:
    def __init__(self, name):
        self.name = name

    def __repr__(self):
        return "Item({!r})".format(self.name)


class PriorityQueue:
    def __init__(self):
        self._queue = []
        self._index = 0

    def push(self, item, priority):
        heapq.heappush(self._queue, (-priority, self._index, item))
        self._index += 1

    def pop(self):
        return heapq.heappop(self._queue)[-1]
1
2
3
4
5
q = PriorityQueue()
q.push(Item("foo"), 1)
q.push(Item("bar"), 3)
q.push(Item("baz"), 3)
q.pop()
Item('bar')

Running median

Write a program that calculates the running median and test it on the list below.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def running_median(sequence):
    min_heap, max_heap, result = [], [], []
    for x in sequence:
        heapq.heappush(max_heap, -heapq.heappushpop(min_heap, x))
        if len(max_heap) > len(min_heap):
            heapq.heappush(min_heap, -heapq.heappop(max_heap))
        result.append(
            (min_heap[0] + -max_heap[0]) / 2
            if len(min_heap) == len(max_heap)
            else min_heap[0]
        )
    return result
1
2
3
a = [1, 30, 2, -7, 99, 10]

running_median(a)
[1, 15.5, 2, 1.5, 2, 6.0]

Sources