I have implemented directed graph and Dijkstra algorithm using heap in Python.
I first implemented an abstract base class WeightedGraph, of which DirectedGraph is a subclass. The DirectedGraph uses an adjacency map as its internal representation. I then implemented the method single_source_shortest_paths, which uses the Dijkstra algorithm, which in turn uses a HeapQueue that I implemented in another module heap_queue.
Before going to the codes, let's look at an example.
An Example
We can add nodes and edges as usual. When adding an edge, if any node in the edge does not already exist, that node is automatically added.
>>> g = DirectedGraph()
>>> g.add_node(0)
>>> g.add_node(1)
>>> g.add_node(2)
>>>
>>> g.add_edge(start=0, end=1, weight=3)
>>> g.add_edge(start=1, end=2, weight=2)
>>> g.add_edge(start=3, end=4, weight=3)
>>> g.add_edge(start=4, end=3, weight=2)
>>> g.add_edge(start=0, end=3, weight=1)
>>> g.add_edge(start=3, end=1, weight=1)
>>> g.add_edge(start=2, end=4, weight=1)
In the above, when adding the edge (3, 4), the nodes 3 and 4 are automatically added. The graph g now looks like this:
We can list out all the nodes, edges and out-neighbors of a given node. Each edge is represented as a namedtuple.
>>> list(g.nodes)
[0, 1, 2, 4, 3]
>>>
>>> for edge in g.edges:
... print(edge)
DirectedEdge(start=0, end=1, weight=3)
DirectedEdge(start=0, end=3, weight=1)
DirectedEdge(start=1, end=2, weight=2)
DirectedEdge(start=2, end=4, weight=1)
DirectedEdge(start=4, end=3, weight=2)
DirectedEdge(start=3, end=4, weight=3)
DirectedEdge(start=3, end=1, weight=1)
>>>
>>> for neighbor, weight in g.neighbors(3):
... print(f'neighbor={neighbor}, weight={weight}')
neighbor=4, weight=3
neighbor=1, weight=1
Now comes the meat, the single_source_shortest_paths method, which finds the shortest paths to multiple targets from a single source. It returns a dictionary whose keys are the given targets and values are the corresponding shortest paths.
>>> g.single_source_shortest_paths(start=0, targets=range(5))
{0: [0],
1: [0, 3, 1],
2: [0, 3, 1, 2],
3: [0, 3],
4: [0, 3, 4]}
If any target is unreachable, its path is simply an empty list.
>>> g.single_source_shortest_paths(start=4, targets=[0, 2])
{0: [],
2: [4, 3, 1, 2]}
The Code
The code of WeightedGraph and DirectedGraph is as follow:
import abc
from collections import namedtuple
from heap_queue import HeapQueue
DirectedEdge = namedtuple('DirectedEdge', 'start end weight')
INF = float('inf')
class WeightedGraph(abc.ABC):
"""An abstract base class for a weighted graph."""
@abc.abstractmethod
def add_node(self, label):
"""Add a node with the given label.
No effect if a node with the given label already exists.
"""
@abc.abstractmethod
def add_edge(self, start, end, weight):
"""Add an edge from start to end with the given weight.
Add the nodes automatically if not already exist.
Overwrite the weight if the edge already exists.
"""
@abc.abstractmethod
def remove_node(self, label):
"""Remove the node with the given label.
Remove all edges at the node automatically.
Raise a ValueError if the node does not exist.
"""
@abc.abstractmethod
def remove_edge(self, start, end):
"""Remove the edge from start to end.
Raise a ValueError if the edge does not exist
"""
@property
@abc.abstractmethod
def nodes(self):
"""Return an iterator over all nodes."""
@property
@abc.abstractmethod
def edges(self):
"""Return an iterator over all edges."""
@property
@abc.abstractmethod
def node_count(self):
"""Return the number of nodes."""
@property
@abc.abstractmethod
def edge_count(self):
"""Return the number of edges."""
@abc.abstractmethod
def neighbors(self, label):
"""Return an iterator, in which each item is an (neighbor, weight) pair,
where neighbor is an out-neighbor of the node with the given label and
weight is the weight of the corresponding edge.
Raise a ValueError if the node does not exist.
"""
@abc.abstractmethod
def degree(self, label):
"""Return the (out-)degree of the node with the given label.
Raise a ValueError if the node does not exist.
"""
@abc.abstractmethod
def weight(self, start, end):
"""Return the weight of the edge from start to end.
Raise a ValueError if the edge does not exist.
"""
class DirectedGraph(WeightedGraph):
"""Implement a directed weighted graph using adjacency map."""
def __init__(self):
"""Initialize the graph with an adjacency map."""
self._adjacency = {}
def add_node(self, label):
# if no such node, initialize the corresponding neighbour dict
self._adjacency.setdefault(label, {})
def add_edge(self, start, end, weight):
self._adjacency.setdefault(end, {})
# extract the neighbour dict of start and set weight
nbr_dict = self._adjacency.setdefault(start, {})
nbr_dict[end] = weight
def remove_node(self, label):
if label not in self._adjacency:
raise ValueError(f'no node {label}')
else:
del self._adjacency[label]
# delete all edges whose end node is the given node
for _, nbr_dict in self._adjacency.items():
nbr_dict.pop(label, None)
def remove_edge(self, start, end):
try:
del self._adjacency[start][end]
# Two cases result in a KeyError
# Case 1: start node does not exist, or
# Case 2: start node exists but no edge from it to the end node
except KeyError:
raise ValueError(f'no edge from {start} to {end}')
@property
def nodes(self):
yield from self._adjacency
@property
def edges(self):
"""Iterate over all edges. Each edge is represented as a namedtuple
of the form DirectedEdge(start, end, weight).
"""
for start, nbr_dict in self._adjacency.items():
for end, weight in nbr_dict.items():
yield DirectedEdge(start, end, weight)
@property
def node_count(self):
return len(self._adjacency)
@property
def edge_count(self):
return sum(len(nbr_dict) for nbr_dict in self._adjacency.values())
def neighbors(self, label):
if label not in self._adjacency:
raise ValueError(f'no node {label}')
# return iter instead of yield from, otherwise ValueError is never
# raised upon the initialization of the generator
else:
nbr_dict = self._adjacency[label]
return iter(nbr_dict.items())
def degree(self, label):
if label not in self._adjacency:
raise ValueError(f'no node {label}')
else:
return len(self._adjacency[label])
def weight(self, start, end):
try:
return self._adjacency[start][end]
# similar to remove_edge, two cases result in a KeyError
except KeyError:
raise ValueError(f'no edge from {start} to {end}')
def single_source_shortest_paths(self, start, targets):
"""Find the shortest paths to multiple targets from a single source.
Return a dictionary whose keys are the given targets and values are the
corresponding shortest paths.
"""
targets = list(targets)
# check all nodes exist
if start not in self._adjacency:
raise ValueError(f'no node {start}')
for target in targets:
if target not in self._adjacency:
raise ValueError(f'no node {target}')
# use dijkstra to obtain the shortest-path tree, which
# is then used to construct a path for each target
result = {}
came_from = self._dijkstra(start, targets)
for target in targets:
if target not in came_from:
# no path from start to target
path = []
else:
path = DirectedGraph._construct_path(came_from, target)
result[target] = path
return result
def _dijkstra(self, start, targets):
"""A helper method that implements the Dijkstra algorithm.
Return the shortest-path tree represented as a dict came_from.
"""
came_from = {start: None}
targets = set(targets)
# initialize the cost of every node to be infinity, except the start
frontier = HeapQueue((node, INF) for node in self.nodes)
frontier.push(start, 0)
while targets:
# node popped from the queue already has its shortest path found,
# can be safely discarded
cur_node, cur_cost = frontier.pop()
targets.discard(cur_node)
for nxt_node, weight in self.neighbors(cur_node):
# only relax the nodes to which shortest paths are not yet found
if nxt_node in frontier:
nxt_cost = cur_cost + weight
# if new cost less than the current cost, update it
if nxt_cost < frontier[nxt_node]:
frontier.push(nxt_node, nxt_cost)
came_from[nxt_node] = cur_node
return came_from
@staticmethod
def _construct_path(came_from, target):
"""Given the shortest-path tree came_from,
find the path from start to the given target.
"""
cur_node = target
path = []
# backtrack from target until hitting the beginning
while cur_node is not None:
path.append(cur_node)
cur_node = came_from[cur_node]
# the path is in the reversed order
path.reverse()
return path
In the module heap_queue, the code of HeapQueue is as follow:
class HeapQueue:
"""Implement a priority queue in the form of a min-heap.
Each entry is an (item, key) pair. Item with a lower key has a higher priority.
Item must be hashable and unique. No duplicate items.
Pushing an existing item would update its key instead.
"""
def __init__(self, entries=None):
"""
:argument:
entries (iterable of tuples): an iterable of (item, key) pairs
"""
if entries is None:
self._entries = []
self._indices = {}
else:
self._entries = list(entries)
self._indices = {item: idx for idx, (item, _) in enumerate(self._entries)}
self._heapify()
def _heapify(self):
"""Enforce the heap properties upon initializing the heap."""
start = len(self) // 2 - 1
for idx in range(start, -1, -1):
self._down(idx)
def __contains__(self, item):
"""Return True if the item is in the heap."""
return item in self._indices
def __len__(self):
"""Number of entries remaining in the heap."""
return len(self._entries)
def __iter__(self):
"""Iterate over all items."""
for item, _ in self._entries:
yield item
"""Helper methods"""
def _swap(self, idx1, idx2):
"""Swap two entries."""
item1, _ = self._entries[idx1]
item2, _ = self._entries[idx2]
self._indices[item1] = idx2
self._indices[item2] = idx1
self._entries[idx1], self._entries[idx2] = self._entries[idx2], self._entries[idx1]
def _up(self, idx):
"""Bring a violating entry up to its correct position recursively."""
if idx == 0:
return
parent = (idx - 1) // 2
# compare key with the parent
if self._entries[idx][1] < self._entries[parent][1]:
self._swap(idx, parent)
self._up(parent)
def _smaller_child(self, idx):
"""Find the child with smaller key. If no child, return None."""
left = 2 * idx + 1
# case 1: no child
if left >= len(self):
return None
right = left + 1
# case 2: only left child
if right == len(self):
return left
# case 3: two children
if self._entries[left][1] < self._entries[right][1]:
return left
else:
return right
def _down(self, idx):
"""Bring a violating entry down to its correct position recursively."""
child = self._smaller_child(idx)
if child is None:
return
# compare key with the child with smaller key
if self._entries[idx][1] > self._entries[child][1]:
self._swap(idx, child)
self._down(child)
"""Priority queue operations"""
def __getitem__(self, item):
"""Return the key of an item.
If the item does not exist, raise a KeyError."""
if item not in self._indices:
raise KeyError(f"{item} not found")
idx = self._indices[item]
_, key = self._entries[idx]
return key
def peek(self):
"""Return the item with the minimum key."""
item, _ = self._entries[0]
return item
def push(self, item, key):
"""Push an item into the heap with a given key.
If the item already exists, update its key instead.
"""
if item not in self._indices:
# insert the new item to the end and bring it up to the correct position
idx = len(self)
self._entries.append((item, key))
self._indices[item] = idx
self._up(idx)
else:
# the item already exists, find its index and update its key
idx = self._indices[item]
item, old_key = self._entries[idx]
self._entries[idx] = (item, key)
# bring the entry to the correct position
if key < old_key:
self._up(idx)
if key > old_key:
self._down(idx)
def pop(self):
"""Remove the item with the minimum key.
The resulting (item, key) pair is also returned."""
# after swapping the first and the last entry,
# the required entry goes from the beginning to the end
self._swap(0, len(self) - 1)
item, key = self._entries.pop()
del self._indices[item]
self._down(0)
return item, key
What improvement can I make on my code? Any advice is appreciated. Thank you.

__init__method ofHeapQueueyou can set the default value of theentriesparameter to[]instead ofNone, and then get rid of theif else. \$\endgroup\$