Source code for geomfum.numerics.graph

"""Routines for working with graphs."""

import heapq

from networkx.algorithms.shortest_paths.weighted import _weight_function


[docs] def single_source_partial_dijkstra_path_length(graph, source, k, weight="weight"): """Compute shortest-path distances from a source node to the k closest nodes. Based on cumulative path cost, using an early-stopped Dijkstra's algorithm. The search terminates once k nodes (including the source itself) have been reached. Parameters ---------- graph : networkx.Graph The input graph. Can be directed or undirected. Edge weights must be non-negative. source : node The starting node for paths. k : int Number of nodes to find distances to (including the source itself). Returns ------- length : dict Dict keyed by node to shortest path length from source. """ heap = [(0, source)] visited = set() length = {} weight = _weight_function(graph, weight) while heap and len(length) < k: dist_u, u = heapq.heappop(heap) if u in visited: continue visited.add(u) length[u] = dist_u for v, edata in graph[u].items(): if v not in visited: w = weight(u, v, edata) heapq.heappush(heap, (dist_u + w, v)) return length