Source code for aequilibrae.transit.route_map_matcher

import logging
from typing import Optional, Union

import geopandas as gpd
import numpy as np
import pandas as pd
from shapely import LineString
from shapely.geometry.multilinestring import MultiLineString
from shapely.ops import linemerge

from aequilibrae.context import get_logger
from aequilibrae.paths import Graph
from aequilibrae.transit.functions.breaking_links_for_stop_access import split_links_at_stops
from aequilibrae.utils.geo_utils import metre_crs_for_gdf
from aequilibrae.utils.interface.worker_thread import WorkerThread

DEAD_END_RUN = 40


[docs] class RouteMapMatcher(WorkerThread): def __init__(self, link_gdf: gpd.GeoDataFrame, nodes_gdf: gpd.GeoDataFrame, stops_gdf: gpd.GeoDataFrame, distance_to_project=50): super().__init__(None) utm_zone = metre_crs_for_gdf(link_gdf) self.logger = get_logger() self.links = self.__rename_geo(link_gdf).to_crs(utm_zone) stops_gdf = self.__rename_geo(stops_gdf).to_crs(utm_zone).rename(columns={"stop_id": "real_stop_id"}) self.stops = stops_gdf.assign(stop_id=np.arange(stops_gdf.shape[0]) + nodes_gdf.node_id.max() + 1) self.nodes = self.__rename_geo(nodes_gdf).to_crs(utm_zone) self.dist_thresh = distance_to_project self.node_corresp = [] self.__all_links = {} self.graph: Graph = Graph() self.reverse_graph: Graph = Graph() self.crs = utm_zone self.stop_ids = self.stops[["stop_id", "real_stop_id"]]
[docs] def initialize_graph(self): """Build the graph for links for a certain mode while splitting the closest links at stops' projection :Arguments: **mode_id** (:obj:`int`): Mode ID for which we will build the graph for **distance_to_project** (:obj:`float`, *Optional*): Radius search for links to break at the stops. Defaults to 50m """ self.logger.debug("Called build_graph_with_broken_stops") if not self.links.shape[0]: return self.__build_graph_from_scratch()
[docs] def map_match_route(self, route_stops: gpd.GeoDataFrame, route_shape: Optional[LineString] = None, pattern_id: Optional[str] = None): # `pattern_id` is accepted for API compatibility and reserved for future use (e.g., logging/filtering). _ = pattern_id if np.all(np.isin(route_stops.stop_id.values, self.available_stops)): path_directions, path_links = self._build_full_path_on_broken_links(route_stops, route_shape) else: path_directions, path_links = [], [] return self._build_path_df(path_directions, path_links)
def __build_graph_from_scratch(self): self.logger.debug("Creating map-matching graph") broken_links, new_nodes = split_links_at_stops(self.stops, self.links, self.dist_thresh) # To build connectors, let's get all nodes together # and connect stops to them within the threshold distance nodes = pd.concat([self.nodes[["node_id", "geometry"]], new_nodes], ignore_index=True) stops = self.stops[["stop_id", "geometry"]] buffered_points = stops.copy() buffered_points["orig_geometry"] = buffered_points.geometry buffered_points = buffered_points.set_geometry(buffered_points.geometry.buffer(self.dist_thresh)) joined = gpd.sjoin(buffered_points, nodes, how="inner", predicate="intersects") geos = joined[["node_id"]].merge(nodes[["node_id", "geometry"]], on="node_id", how="left")[["geometry"]] connector_geo = joined.reset_index(drop=True).geometry.shortest_line(geos.reset_index(drop=True).geometry) df = joined[["stop_id", "node_id"]].rename(columns={"stop_id": "a_node", "node_id": "b_node"}) connectors = gpd.GeoDataFrame(df, geometry=connector_geo).set_crs(self.links.crs) min_speed = max(min(self.links.speed_ab.min(), self.links.speed_ba.min()), 1.0) connectors = connectors.assign(direction=0, link_id=0, is_connector=1, speed_ab=min_speed, speed_ba=min_speed, distance=max(1.2 * (connectors.geometry.length ** 1.3))) net_data = pd.concat([broken_links, connectors], ignore_index=True) net_data["link_id"] = np.arange(1, net_data.shape[0] + 1) # Guarantees a non-zero distance net_data.loc[net_data.geometry.length == 0, "distance"] = 0.001 net_data["time_ab"] = net_data["distance"] / net_data.speed_ab net_data["time_ba"] = net_data["distance"] / net_data.speed_ba net_gdf = gpd.GeoDataFrame(net_data, geometry="geometry", crs=self.links.crs) self.__graph_from_broken_net(net_gdf) def __graph_from_broken_net(self, net_data): self.graph.network = net_data centroids = np.array(self.stops.stop_id.values) all_nodes = np.unique(np.hstack([self.graph.network.a_node.to_numpy(), self.graph.network.b_node.to_numpy()])) centroids = centroids[np.isin(centroids, all_nodes)] self.graph.prepare_graph(centroids) self.available_stops = self.stop_ids["real_stop_id"][self.stop_ids.stop_id.isin(centroids)].to_numpy() self.graph.set_graph("distance") self.graph.set_skimming(["distance", "time"]) self.graph.set_blocked_centroid_flows(True) self.reverse_graph = self.graph.reverse() def _build_full_path_on_broken_links(self, route_stops: gpd.GeoDataFrame, route_shape: Optional[LineString] = None): # It assumes that both the graph, stops AND route shape are in the same CRS if route_stops.shape[0] <= 1: return [], [] # If the route shape is not defined, we build it from the stops as a sequence of line segments if route_shape is None: route_shape = LineString(route_stops.geometry.tolist()) route_stops = route_stops.rename(columns={"stop_id": "real_stop_id"}) route_stops = route_stops.merge(self.stop_ids, on="real_stop_id") if not np.all(np.isin(route_stops.real_stop_id.values, self.available_stops)): self.logger.critical("Route is not completely connected.") return [], [] # We discount the likely links for this route to favor them in the map-matching self.graph.cost = np.array(self.graph.graph[self.graph.cost_field]) likely_links = self.__graph_discount(route_shape) for g in [self.graph, self.reverse_graph]: g.cost[(g.graph.link_id.isin(likely_links)) & (g.graph.is_connector == 0)] *= 0.1 current_stop = int(route_stops.stop_id.iat[0]) res = self.graph.compute_path(current_stop, int(route_stops.stop_id.iat[-1]), early_exit=True) if route_stops.shape[0] == 2: if res.milepost is None: return [], [] plnks = list(res.path[1:-1]) pdirecs = list(res.path_link_directions[1:-1]) return pdirecs, plnks access_links = self.graph.network[self.graph.network.a_node.isin(route_stops.stop_id.values)] path_links, path_directions = [], [] is_first = True for i in range(1, route_stops.shape[0]): next_stop = int(route_stops.stop_id.iat[i]) is_not_last = i < route_stops.shape[0] - 1 # Get the next stop for look-ahead path estimation (if not at the end) following_stop = int(route_stops.stop_id.iat[i + 1]) if is_not_last else None logging.debug(f"Computing path from node {current_stop} to stop {next_stop}") connection_candidates = access_links[access_links.a_node == next_stop].b_node.values idx_ = 1 if is_first else 0 is_first = False # If this is the last stop, then we just compute the path directly if not is_not_last: res.compute_path(current_stop, next_stop, early_exit=True) path_links.extend(list(res.path[idx_:-1])) path_directions.extend(list(res.path_link_directions[idx_:-1])) continue # Let's see if we can reach the following stop while going through the next one # This would save us some path computation res.compute_path(current_stop, following_stop, early_exit=False) indices_in_a = np.where(np.isin(connection_candidates, res.path_nodes))[0] if indices_in_a.shape[0] > 0: # We found a candidate that is already in the path to the following stop best_access_node = connection_candidates[indices_in_a[-1]] else: # Find the best network node to exit this stop from # We evaluate all connector links from this stop and choose the one # that minimizes total cost (current path + estimated cost to next stop) # That is MUCH easier done with the reverse graph, though res.update_trace(next_stop) if res.path_nodes is None or res.path_nodes.shape[0] == 0: logging.debug("Failed to find path to the next stop") return [], [] idx_where = np.where(np.isin(res.path_nodes, connection_candidates)) indices = idx_where[0] first_leg_costs = res.milepost[indices] res_reverse = self.reverse_graph.compute_path(following_stop, next_stop, early_exit=True) if res_reverse.path_nodes is None or res_reverse.path_nodes.shape[0] == 0: logging.debug("Failed to find path to the following stop") return [], [] indices = np.where(np.isin(res_reverse.path_nodes, connection_candidates))[0] second_leg_costs = res_reverse.milepost[indices] best_idx = np.argmin(first_leg_costs + second_leg_costs) best_access_node = connection_candidates[best_idx] assert next_stop != current_stop # Update the trace to end at the best access node res.update_trace(int(best_access_node)) # Determine how many links to include # Skip connectors at start and (if not last stop) at end path_links.extend(list(res.path[idx_:])) path_directions.extend(list(res.path_link_directions[idx_:])) current_stop = best_access_node return path_directions, path_links def _build_path_df(self, path_directions, path_links) -> pd.DataFrame: """Builds a cleaned DataFrame of link IDs and directions from raw path data. Performs post-processing to: - Filter out short back-and-forth movements Args: graph: AequilibraE graph with network correspondence path_directions: List of link traversal directions path_links: List of internal link IDs Returns: DataFrame with columns ['link_id', 'dir'] """ if not path_links: return pd.DataFrame({"link_id": [], "dir": []}) corresp = pd.DataFrame(self.graph.network[["link_id", "original_id", "distance"]]) # Build initial result, skipping the first (connector) link result = pd.DataFrame( { "link_id": path_links[1:], "direction": path_directions[1:], "sequence": np.arange(len(path_links) - 1), } ) # Map internal link IDs to original network IDs df = result.merge(corresp, on="link_id", how="left") df.sort_values(by=["sequence"], inplace=True) # Remove consecutive links with same original_id and direction # (these are internal graph subdivisions of the same physical link) df = df[(df.original_id.shift(-1, fill_value=-1) != df.original_id) | ( df.direction.shift(-1, fill_value=-1) != df.direction)] # Filter out isolated short segments (likely noise or dead-end detours) # Keep a link if it differs from both neighbors OR if it's long enough crit_differs_prev = df.original_id.shift(1, fill_value=-1) != df.original_id crit_differs_next = df.original_id.shift(-1, fill_value=-1) != df.original_id df = df[(crit_differs_prev & crit_differs_next) | (df.distance > DEAD_END_RUN)] # Prepare final output with direction based on link sign df.sort_values(by=["sequence"], inplace=True) df = df[["original_id", "direction"]] df.reset_index(drop=True, inplace=True) # Eliminate back-and-forth patterns on the same link # e.g., [A, B, A] -> [A] when all three have the same absolute link_id has_issues = True while has_issues: has_issues = False for i in range(0, df.shape[0] - 2): # Check if three consecutive rows are all the same link if df.loc[i: i + 2, "original_id"].unique().shape[0] == 1: df.drop(index=[i, i + 1], inplace=True) df.index = pd.RangeIndex(df.shape[0]) has_issues = True break df.dropna(subset=["original_id"], inplace=True) df["original_id"] = df["original_id"].astype(int) return df.rename(columns={"original_id": "link_id", "direction": "dir"}) def __graph_discount(self, route_shape: LineString) -> list: """ Finds network links within a buffer of the route shape. These links are candidates for cost discounting during map-matching. Args: route_shape: LineString geometry of the route geolinks: GeoDataFrame of network links with 'link_id' column Returns: List of link_ids that intersect the buffered route shape """ geolinks = self.graph.network[["link_id", "geometry"]] # Create a 20-meter buffer around the route shape buff = gpd.GeoSeries(route_shape, crs=geolinks.crs).buffer(20) gdf = gpd.GeoDataFrame(geometry=buff) # Find all links that intersect this buffer return gdf.sjoin(geolinks, how="inner", predicate="contains").link_id.tolist()
[docs] def assemble_shape(self, df: pd.DataFrame, enforce_single_parts=True) -> Union[LineString, MultiLineString]: """Assembles a LineString shape from the matched path links. Args: df: DataFrame with 'link_id' and 'dir' columns from map_match_route() Returns: LineString: The assembled route shape """ if df.empty: return LineString() df_ = self.links[["link_id", "geometry"]].merge(df.assign(sequence=np.arange(df.shape[0])), on="link_id", how="inner").sort_values("sequence") df_.reset_index(drop=True, inplace=True) # dir < 0 means BA direction (reverse), dir >= 0 means AB direction (forward) shapes = [rec.geometry.reverse() if rec["dir"] < 0 else rec.geometry for _, rec in df_.iterrows()] shape = linemerge(shapes) if isinstance(shape, LineString) or not enforce_single_parts: return shape # If linemerge returned MultiLineString, we need to connect the parts to force it to be a single LineString coords = list(shapes[0].coords) for geom in shapes[1:]: seg = list(geom.coords) if np.allclose(coords[-1], seg[0]): seg = seg[1:] elif np.allclose(coords[-1], seg[-1]): seg = list(reversed(seg)) seg = seg[1:] coords.extend(seg) return LineString(coords)
@staticmethod def __rename_geo(gdf): if gdf.active_geometry_name != "geometry": return gdf.rename_geometry("geometry") return gdf