Source code for aequilibrae.paths.results.path_results

import numpy as np

from aequilibrae import global_logger
from aequilibrae.paths.graph import Graph
from typing import Union, List

try:
    from aequilibrae.paths.AoN import update_path_trace, path_computation, HEURISTIC_MAP
except ImportError as ie:
    global_logger.warning(f"Could not import procedures from the binary. {ie.args}")


[docs] class PathResults: """Path computation result holder .. code-block:: python >>> from aequilibrae.paths.results import PathResults >>> project = create_example(project_path) >>> project.network.build_graphs() # Mode c is car in this project >>> car_graph = project.network.graphs['c'] # minimize distance >>> car_graph.set_graph('distance') # If you want to compute skims # It does increase path computation time substantially >>> car_graph.set_skimming(['distance', 'free_flow_time']) >>> res = PathResults() >>> res.prepare(car_graph) >>> res.compute_path(1, 17) # Update all the outputs mentioned above for destination 9. Same origin: 1 >>> res.update_trace(9) # clears all computation results >>> res.reset() """
[docs] def __init__(self) -> None: self.predecessors = None self.connectors = None self.skims = None self._skimming_array = None self.path = None self.path_nodes = None self.path_link_directions = None self.milepost = None self.reached_first = None self.origin = None self.destination = None self.graph: Graph = None self.early_exit = False self.a_star = False self.links = -1 self.nodes = -1 self.zones = -1 self.num_skims = -1 self.__integer_type = None self.__float_type = None self._graph_id = None self.__graph_sum = None self._early_exit = self.early_exit self._a_star = self.a_star self._heuristic = "equirectangular"
[docs] def compute_path( self, origin: int, destination: int, early_exit: bool = False, a_star: bool = False, heuristic: Union[str, None] = None, ) -> None: """Computes the path between two nodes in the network. `A*` heuristics are currently only valid distance cost fields. :Arguments: **origin** (:obj:`int`): Origin for the path **destination** (:obj:`int`): Destination for the path **early_exit** (:obj:`bool`): Stop constructing the shortest path tree once the destination is found. Doing so may cause subsequent calls to ``update_trace`` to recompute the tree. Default is ``False``. **a_star** (:obj:`bool`): Whether or not to use A* over Dijkstra's algorithm. When ``True``, ``early_exit`` is always ``True``. Default is ``False``. **heuristic** (:obj:`str`): Heuristic to use if ``a_star`` is enabled. Default is ``None``. """ if self.graph is None: raise Exception("You need to set graph skimming before you compute a path") if a_star and self.graph.lonlat_index.empty: raise Exception("You need to supply a lon/lat index to graph.prepare_graph to use A*") self.early_exit = self._early_exit = early_exit or a_star self.a_star = self._a_star = a_star if heuristic is not None: self.set_heuristic(heuristic) path_computation(origin, destination, self.graph, self) if self.graph.skim_fields: self.skims.fill(np.inf) self.skims[self.graph.all_nodes, :] = self._skimming_array[:-1, :] self.skims[self.skims > self.__graph_sum] = np.inf
[docs] def prepare(self, graph: Graph) -> None: """ Prepares the object with dimensions corresponding to the graph object :Arguments: **graph** (:obj:`Graph`): Needs to have been set with number of centroids and list of skims (if any) """ if not graph.cost_field: raise Exception('Cost field needs to be set for cost computation. use graph.set_graph("your_cost_field")') self.__integer_type = graph.default_types("int") self.__float_type = graph.default_types("float") self.nodes = graph.num_nodes + 1 self.zones = graph.centroids + 1 self.links = graph.num_links + 1 self.num_skims = len(graph.skim_fields) self.predecessors = np.zeros(self.nodes, dtype=self.__integer_type) self.connectors = np.zeros(self.nodes, dtype=self.__integer_type) self.reached_first = np.zeros(self.nodes, dtype=self.__integer_type) if self.num_skims: self.skims = np.empty((np.max(graph.all_nodes) + 1, self.num_skims), self.__float_type) self.skims.fill(np.inf) self._skimming_array = np.zeros((self.nodes, self.num_skims), self.__float_type) else: self._skimming_array = np.zeros((1, 2), self.__float_type) self._graph_id = graph._id # We can imagine somebody creating a worst-case scenario network (imagining that turn penalties are considered) # where one needs to traverse all links (or almost all) in both directions. self.__graph_sum = 2 * graph.cost.sum() self.graph = graph
[docs] def reset(self) -> None: """ Resets object to prepared and pre-computation state """ if self.predecessors is not None: self.predecessors.fill(-1) self.connectors.fill(-1) if self.skims is not None: self.skims.fill(np.inf) self._skimming_array.fill(np.inf) self.path = None self.path_nodes = None self.path_link_directions = None self.milepost = None self._early_exit = self.early_exit = False self._a_star = self.a_star = False self._heuristic = "equirectangular" else: raise ValueError("Exception: Path results object was not yet prepared/initialized")
[docs] def update_trace(self, destination: int) -> None: """ Updates the path's nodes, links, skims and mileposts If the previously computed path had `early_exit` enabled, `update_trace` will check if the `destination` has already been found, if not the shortest path tree will be recomputed with the `early_exit` argument passed on. If the previously computed path had `a_star` enabled, `update_trace` always recompute the path. :Arguments: **destination** (:obj:`int`): ID of the node we are computing the path too """ if not isinstance(destination, int): raise TypeError("destination needs to be an integer") if destination >= self.graph.nodes_to_indices.shape[0]: raise ValueError("destination out of the range of node numbers in the graph") update_path_trace(self, destination, self.graph)
[docs] def set_heuristic(self, heuristic: str) -> None: """ Set the heuristics to be used in A*. Must be one of `get_heuristics()`. :Arguments: **heuristic** (:obj:`str`): Heuristic to use in A*. """ if heuristic not in HEURISTIC_MAP.keys(): raise ValueError(f"heruistic must be one of {self.get_heuristics()}") self._heuristic = heuristic
[docs] def get_heuristics(self) -> List[str]: """Return the availiable heuristics.""" return list(HEURISTIC_MAP.keys())