Skip to content

Reference

Shortest paths

unweaver.shortest_paths.shortest_path.shortest_path(G, origin_node, destination_node, cost_function)

Find the shortest path from one on-graph node to another.

Parameters:

Name Type Description Default
G DiGraphGPKGView

The graph to use for this shortest path search.

required
origin_node str

The start node ID.

required
destination_node str

The end node ID.

required
cost_function CostFunction

A dynamic cost function.

required
precalculated_cost_function

A cost function that finds a precalculated weight.

required
Source code in unweaver/shortest_paths/shortest_path.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def shortest_path(
    G: DiGraphGPKGView,
    origin_node: str,
    destination_node: str,
    cost_function: CostFunction,
) -> Tuple[float, List[str], List[EdgeData]]:
    """Find the shortest path from one on-graph node to another.

    :param G: The graph to use for this shortest path search.
    :param origin_node: The start node ID.
    :param destination_node: The end node ID.
    :param cost_function: A dynamic cost function.
    :param precalculated_cost_function: A cost function that finds a
    precalculated weight.

    """
    return shortest_path_multi(
        G, [origin_node, destination_node], cost_function
    )

unweaver.shortest_paths.shortest_path.shortest_path_multi(G, nodes, cost_function)

Find the on-graph shortest path between multiple waypoints (nodes).

Parameters:

Name Type Description Default
G DiGraphGPKGView

The routing graph.

required
nodes List[Union[str, ProjectedNode]]

A list of nodes to visit, finding the shortest path between each.

required
cost_function CostFunction

A networkx-compatible cost function. Takes u, v, ddict as parameters and returns a single number.

required
Source code in unweaver/shortest_paths/shortest_path.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
def shortest_path_multi(
    G: DiGraphGPKGView,
    nodes: List[Union[str, ProjectedNode]],
    cost_function: CostFunction,
) -> Tuple[float, List[str], List[EdgeData]]:
    """Find the on-graph shortest path between multiple waypoints (nodes).

    :param G: The routing graph.
    :param nodes: A list of nodes to visit, finding the shortest path between
    each.
    :param cost_function: A networkx-compatible cost function. Takes u, v,
                          ddict as parameters and returns a single number.

    """
    # FIXME: written in a way that expects all waypoint nodes to have been
    # pre-vetted to be non-None
    # TODO: Extract invertible/flippable edge attributes into the profile.
    # NOTE: Written this way to anticipate multi-waypoint routing
    G_overlay = nx.DiGraph()
    node_list = []
    for node in nodes:
        if isinstance(node, ProjectedNode):
            if node.edges_out:
                G_overlay.add_edges_from(node.edges_out)
            if node.edges_in:
                G_overlay.add_edges_from(node.edges_in)
            node_list.append(node.n)
        else:
            node_list.append(node)

    pairs = zip(node_list[:-1], node_list[1:])

    G_aug = AugmentedDiGraphGPKGView(G=G, G_overlay=G_overlay)

    result_legs = []
    cost: float
    path: List[str]
    edges: List[Dict[str, Any]]
    for n1, n2 in pairs:
        try:
            cost, path = multi_source_dijkstra(
                G_aug, sources=[n1], target=n2, weight=cost_function
            )
        except nx.exception.NetworkXNoPath:
            raise NoPathError("No viable path found.")
        if cost is None:
            raise NoPathError("No viable path found.")

        edges = [dict(G_aug[u][v]) for u, v in zip(path, path[1:])]

        result_legs.append((cost, path, edges))

    # TODO: Return multiple legs once multiple waypoints supported
    return result_legs[0]

unweaver.shortest_paths.shortest_path_tree.shortest_path_tree(G, start_node, cost_function, max_cost=None, precalculated_cost_function=None)

Find the shortest paths to on-graph nodes starting at a given edge/node, subject to a maximum total "distance"/cost constraint.

Parameters:

Name Type Description Default
G Union[AugmentedDiGraphGPKGView, DiGraphGPKGView]

Network graph.

required
start_node str

Start node (on graph) at which to begin search.

required
cost_function CostFunction

NetworkX-compatible weight function.

required
max_cost Optional[float]

Maximum weight to reach in the tree.

None
precalculated_cost_function Optional[CostFunction]

NetworkX-compatible weight function that represents precalculated weights.

None
Source code in unweaver/shortest_paths/shortest_path_tree.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def shortest_path_tree(
    G: Union[AugmentedDiGraphGPKGView, DiGraphGPKGView],
    start_node: str,
    cost_function: CostFunction,
    max_cost: Optional[float] = None,
    precalculated_cost_function: Optional[CostFunction] = None,
) -> Tuple[ReachedNodes, Paths, Iterable[EdgeData]]:
    """Find the shortest paths to on-graph nodes starting at a given edge/node,
    subject to a maximum total "distance"/cost constraint.

    :param G: Network graph.
    :param start_node: Start node (on graph) at which to begin search.
    :param cost_function: NetworkX-compatible weight function.
    :param max_cost: Maximum weight to reach in the tree.
    :param precalculated_cost_function: NetworkX-compatible weight function
                                        that represents precalculated weights.

    """
    if precalculated_cost_function is not None:
        cost_function = precalculated_cost_function

    paths: Paths
    distances, paths = single_source_dijkstra(
        G, start_node, cutoff=max_cost, weight=cost_function
    )

    # Extract unique edges
    edge_ids = list(
        set([(u, v) for p in paths.values() for u, v in zip(p, p[1:])])
    )

    # FIXME: graph should leverage a 'get an nbunch' method so that this
    # requires only one SQL query.
    def edge_data_generator(
        G: DiGraphGPKGView, edge_ids: List[Tuple[str, str]]
    ) -> Iterable[EdgeData]:
        for u, v in edge_ids:
            edge = dict(G[u][v])
            edge["_u"] = u
            edge["_v"] = v
            yield edge

    edges_data = edge_data_generator(G, edge_ids)

    geom_key = G.network.nodes.geom_column
    # Create nodes dictionary that contains both cost data and node attributes
    nodes: ReachedNodes = {}
    for node_id, distance in distances.items():
        node_attr = G.nodes[node_id]
        nodes[node_id] = ReachedNode(
            key=node_id, geom=node_attr[geom_key], cost=distance
        )

    return nodes, paths, edges_data

unweaver.shortest_paths.reachable_tree.reachable_tree(G, candidate, cost_function, max_cost, precalculated_cost_function=None)

Generate all reachable places on graph, allowing extensions beyond existing nodes (e.g., assuming cost function is distance in meters and max_cost is 400, will extend to 400 meters from origin, creating new fake nodes at the ends.

Parameters:

Name Type Description Default
G Union[AugmentedDiGraphGPKGView, DiGraphGPKGView]

Network graph.

required
candidate ProjectedNode

On-graph candidate metadata as created by waypoint_candidates.

required
cost_function CostFunction

NetworkX-compatible weight function.

required
max_cost float

Maximum weight to reach in the tree.

required
precalculated_cost_function Optional[CostFunction]

NetworkX-compatible weight function that represents precalculated weights.

None
Source code in unweaver/shortest_paths/reachable_tree.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def reachable_tree(
    G: Union[AugmentedDiGraphGPKGView, DiGraphGPKGView],
    candidate: ProjectedNode,
    cost_function: CostFunction,
    max_cost: float,
    precalculated_cost_function: Optional[CostFunction] = None,
) -> Tuple[Dict[str, ReachedNode], List[EdgeData]]:
    """Generate all reachable places on graph, allowing extensions beyond
    existing nodes (e.g., assuming cost function is distance in meters and
    max_cost is 400, will extend to 400 meters from origin, creating new fake
    nodes at the ends.

    :param G: Network graph.
    :param candidate: On-graph candidate metadata as created by
                      waypoint_candidates.
    :param cost_function: NetworkX-compatible weight function.
    :param max_cost: Maximum weight to reach in the tree.
    :param precalculated_cost_function: NetworkX-compatible weight function
                                        that represents precalculated weights.

    """
    # TODO: reuse these edges - lookup of edges from graph is often slowest
    if precalculated_cost_function is None:
        nodes, paths, edges = shortest_path_tree(
            G, candidate.n, cost_function, max_cost
        )
    else:
        nodes, paths, edges = shortest_path_tree(
            G, candidate.n, precalculated_cost_function, max_cost
        )

    # The shortest-path tree already contains all on-graph nodes within
    # max_cost distance. The only edges we need to add to make it the full,
    # extended, 'reachable' graph are:
    #   1) Partial edges at the fringe: these extend a relatively short way
    #      down a given edge and do not connect to any other partial edges.
    #   2) "Internal" edges that weren't counted originally because they don't
    #      fall on a shortest path, but are still reachable. These edges must
    #      connect nodes on the shortest-path tree - if one node wasn't on the
    #      shortest-path tree and we need to include the whole edge, that edge
    #      should've been on the shortest-path tree (proof to come).

    edges = list(edges)
    traveled_edges = set((e["_u"], e["_v"]) for e in edges)
    traveled_nodes = set([n for path in paths.values() for n in path])

    if G.is_directed():
        neighbor_func = G.successors
    else:
        neighbor_func = G.neighbors

    fringe_candidates = {}
    for u in traveled_nodes:
        if u not in G:
            # For some reason, this only happens to the in-memory graph: the
            # "pseudo" node created in paths that start along an edge remains
            # and of course does not exist in the true graph.
            # Investigate: FIXME!
            continue
        for v in neighbor_func(u):
            # Ignore already-traveled edges
            if (u, v) in traveled_edges:
                continue
            traveled_edges.add((u, v))

            # Determine cost of traversal
            edge_data = G[u][v]
            edge_data = dict(edge_data)
            # FIXME:  this value is incorrect for precalculated weights. Need
            # to maintain precalculated and non-precalculated versions of the
            # cost function and apply the non-precalculated for these
            # situations.
            cost = cost_function(u, v, edge_data)

            # Exclude non-traversible edges
            if cost is None:
                continue

            # If the total cost is still less than max_cost, we will have
            # traveled the whole edge - there is no new "pseudo" node, only a
            # new edge.
            if v in nodes and nodes[v].cost + cost < max_cost:
                interpolate_proportion = 1.0
            else:
                remaining = max_cost - nodes[u].cost
                interpolate_proportion = remaining / cost

            # TODO: Use consistent data classes for passing around edge data,
            # leave (de)serialization concerns up to near-db interfaces
            edge_data["_u"] = u
            edge_data["_v"] = v

            fringe_candidate: FringeCandidate = {
                "cost": cost,
                "edge_data": edge_data,
                "proportion": interpolate_proportion,
            }

            fringe_candidates[(u, v)] = fringe_candidate

    fringe_edges = []
    seen = set()

    # Don't treat origin point edge as fringe-y: each start point in the
    # shortest-path tree was reachable from the initial half-edge.
    # started = list(set([path[0] for target, path in paths.items()]))

    # Adjust / remove fringe proportions based on context
    for edge_id, fringe_candidate in fringe_candidates.items():
        # Skip already-seen edges (e.g. reverse edges we looked ahead for).
        if edge_id in seen:
            continue

        edge_data = fringe_candidate["edge_data"]
        proportion = fringe_candidate["proportion"]
        cost = fringe_candidate["cost"]

        # Can traverse whole edge - keep it
        if proportion == 1:
            fringe_edges.append(edge_data)
            continue

        rev_edge_id = (edge_id[1], edge_id[0])
        # reverse_intersected = False
        has_reverse = rev_edge_id in fringe_candidates
        if has_reverse:
            # This edge is "internal": it's being traversed from both sides
            rev_proportion = fringe_candidates[rev_edge_id]["proportion"]
            if proportion + rev_proportion > 1:
                # They intersect - the entire edge can be traversed.
                fringe_edges.append(edge_data)
                continue
            else:
                # They do not intersect. Keep the original proportions
                pass

        # If this point has been reached, this is:
        # (1) A partial extension down an edge
        # (2) It doesn't overlap with any other partial edges

        # Create primary partial edge and node and append to the saved data
        fringe_edge, fringe_node = _make_partial_edge(G, edge_data, proportion)

        fringe_edges.append(fringe_edge)
        fringe_node_id = fringe_node.key

        nodes[fringe_node_id] = ReachedNode(
            key=fringe_node.key, geom=fringe_node.geom, cost=max_cost
        )

        seen.add(edge_id)

    edges = edges + fringe_edges

    for edge in edges:
        geom = edge["geom"]
        if isinstance(geom, LineString):
            edge["geom"] = asdict(geom)

    return nodes, edges