Source code for aequilibrae.transit.transit_elements.pattern

from sqlite3 import Connection
from typing import List, Tuple, Optional

import geopandas as gpd
import numpy as np
import pandas as pd
from shapely.geometry import LineString

from aequilibrae.paths import PathResults
from aequilibrae.transit.functions.get_srid import get_srid
from .basic_element import BasicPTElement
from .link import Link
from .mode_correspondence import mode_correspondence

DEAD_END_RUN = 40


[docs] class Pattern(BasicPTElement): """ Represents a stop pattern for a particular route, as defined in GTFS. """ def __init__(self, route_id, gtfs_feed) -> None: """ :Arguments: *route_id* (:obj:`str`): route ID for which this stop pattern belongs *gtfs_feed* (:obj:`Geo`): Parent feed object """ self.pattern_hash = "" self.pattern_id = -1 self.route_id = route_id self.route = "" self.agency_id = None self.longname = "" self.shortname = "" self.description = "" self.pce = 2.0 self.seated_capacity = None self.total_capacity = None self.__srid = get_srid() self.__geolinks = gtfs_feed.geo_links self.__logger = gtfs_feed.logger self.__feed = gtfs_feed # For map matching self.raw_shape: LineString = None self._stop_based_shape: LineString = None self.shape: LineString = None self.route_type: int = None self.links: List[Link] = [] self.network_candidates = [] self.full_path: List[int] = [] self.fpath_dir: List[int] = [] self.pattern_mapping = pd.DataFrame([]) self.stops = [] self.__map_matching_error = {} self.__graph = None self.__res = None self.__curr_net_nodes_from_stops = [] self.__net_links_from_stops = [] self.__net_nodes_from_stops = [] self.__mm_fail_position = -1 self.__map_matched = False self.shape_length = -1
[docs] def save_to_database(self, conn: Connection, commit=True) -> None: """Saves the pattern to the routes table""" shp = self.best_shape() geo = None if shp is None else shp.wkb data = [ self.pattern_id, self.route_id, self.route, self.agency_id, self.shortname, self.longname, self.description, self.route_type, self.pce, self.seated_capacity, self.total_capacity, geo, self.__srid, ] sql = """insert into routes (pattern_id, route_id, route, agency_id, shortname, longname, description, route_type, pce, seated_capacity, total_capacity, geometry) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ST_Multi(GeomFromWKB(?, ?)));""" conn.execute(sql, data) if self.pattern_mapping.shape[0]: sqlgeo = """insert into pattern_mapping (pattern_id, seq, link, dir, geometry) values (?, ?, ?, ?, GeomFromWKB(?, ?));""" sql = "insert into pattern_mapping (pattern_id, seq, link, dir) values (?, ?, ?, ?);" if "wkb" in self.pattern_mapping.columns: cols = ["pattern_id", "seq", "link_id", "dir", "wkb", "srid"] data = self.pattern_mapping[cols].to_records(index=False) conn.executemany(sqlgeo, data) else: data = self.pattern_mapping[["pattern_id", "seq", "link_id", "dir"]].to_records() conn.executemany(sql, data) if commit: conn.commit()
[docs] def best_shape(self) -> LineString: """Gets the best version of shape available for this pattern""" shp = self._stop_based_shape if self.raw_shape is None else self.raw_shape return shp
[docs] def map_match(self): """Map matches the route into the network, considering its appropriate shape. Part of the map-matching process is to find the network links corresponding the pattern's raw shape, so that method will be called in case it has not been called before. The basic algorithm behind the map-matching algorithm is described in https://doi.org/10.3141%2F2646-08 In a nutshell, we compute the shortest path between the nodes corresponding to the links to which stops were geographically matched, for each pair of identified links. We do not consider links that are in perfect sequence, as we found that it introduces severe issues when stops are close to intersections without clear upsides. When issues are found, we remove the stops in the immediate vicinity of the issue and attempt new path finding. The First and last stops/corresponding links are always kept. If an error was found, (record for it will show in the log), it is stored within the object. """ if self.__map_matched: return self.__map_matched = True self.__logger.debug(f"Map-matching pattern ID {self.pattern_id}") if not self.__feed.graphs: self.__feed.builds_link_graphs_with_broken_stops() if self.route_type not in mode_correspondence or mode_correspondence[self.route_type] not in self.__feed.graphs: return self.__map_matching_error.clear() df = self.__map_matching_complete_path_building() if df.shape[0] == 0: self.__logger.warning(f"Could not rebuild path for pattern {self.pattern_id}") return self.full_path = df.link_id.to_list() self.fpath_dir = df.dir.to_list() self.__assemble__mm_shape(df) self.__build_pattern_mapping() self.__logger.info(f"Map-matched pattern {self.pattern_id}")
# TODO: consider improving the link selection for discount applying an overlay and use a cost proportional to the # link length in the route (raw_shape) buffer. def __graph_discount(self): buff = gpd.GeoSeries(self.raw_shape, crs="EPSG:4326").to_crs(3857).buffer(20).geometry gdf = gpd.GeoDataFrame(geometry=buff.to_crs(4326), crs=self.__geolinks.crs) gdf = self.__geolinks.overlay(gdf, how="intersection") gdf = gdf.loc[gdf.modes.str.contains(mode_correspondence[self.route_type])] return gdf.link_id.tolist() def __map_matching_complete_path_building(self): mode_ = mode_correspondence[self.route_type] # We get the graph for our job graph = self.__feed.graphs[mode_] empty_frame = pd.DataFrame([]) # We search for disconnected stops: candidate_stops = list(self.stops) stop_node_idxs = [stop.__map_matching_id__[self.route_type] for stop in candidate_stops] node0 = graph.network.a_node[~graph.network.a_node.isin(graph.centroids)].min() connected_stops = [] res = PathResults() res.prepare(graph) res1 = PathResults() res1.prepare(graph) for i, stop in enumerate(candidate_stops): node_o = stop.__map_matching_id__[self.route_type] self.__logger.debug(f"Computing paths between {node_o} and {node0}") res.compute_path(node_o, int(node0), early_exit=False) # Get skims, as proxy for connectivity, for all stops other than the origin other_nodes = stop_node_idxs[:i] + stop_node_idxs[i + 1 :] dest_skim = res.skims[other_nodes, 0] if dest_skim.min() < 1.0e308: candidate_stops = candidate_stops[i:] connected_stops = [stop for i, stop in enumerate(candidate_stops[1:]) if dest_skim[i] < 1.0e308] connected_stops = [candidate_stops[0]] + connected_stops break if not connected_stops: self.__logger.critical(f"Route completely disconnected. {self.route}/{self.route_id}") return empty_frame graph.cost = np.array(graph.graph.distance) likely_links = self.__graph_discount() graph.cost[graph.graph.original_id.abs().isin(likely_links)] *= 0.1 fstop = connected_stops[0] if len(connected_stops) == 1: return empty_frame if len(connected_stops) == 2: nstop = connected_stops[1].__map_matching_id__[self.route_type] self.__logger.debug(f"Computing paths between {fstop.__map_matching_id__[self.route_type]} and {nstop}") res.compute_path(fstop.__map_matching_id__[self.route_type], int(nstop), early_exit=True) if res.milepost is None: return empty_frame pdist = list(res.milepost[1:-1] - res.milepost[:-2])[1:] plnks = list(res.path[1:-1]) pdirecs = list(res.path_link_directions[1:-1]) return self.__build_path_df(graph, pdirecs, pdist, plnks) path_links = [] path_directions = [] path_distances = [] start = fstop.__map_matching_id__[self.route_type] for idx, tstop in enumerate(connected_stops[1:]): end = tstop.__map_matching_id__[self.route_type] not_last = idx + 2 <= len(connected_stops) - 1 if not_last: following_stop = connected_stops[idx + 2] n_end = following_stop.__map_matching_id__[self.route_type] self.__logger.debug(f"Computing paths between {start} and {end}") res.compute_path(start, int(end), early_exit=True) connection_candidates = graph.network[graph.network.a_node == end].b_node.values min_cost = np.inf access_node = -1 follow_val = 0 for connec in connection_candidates: if connec == start: continue if not_last: res1.compute_path(int(connec), int(n_end), early_exit=True) if res1.milepost is None: continue follow_val = res1.milepost[-1] estimate = follow_val + res.skims[connec, 0] if estimate < min_cost: min_cost = estimate access_node = connec if access_node >= 0: res.update_trace(int(access_node)) shift = 1 if not_last else 0 if len(res.path) <= 1 + shift: # Stop connectors only continue if not_last: path_links.extend(list(res.path[:-1])) path_directions.extend(list(res.path_link_directions[:-1])) path_distances.extend(list(res.milepost[1:] - res.milepost[:-1])[1:]) else: path_links.extend(list(res.path[:])) path_directions.extend(list(res.path_link_directions[:])) path_distances.extend(list(res.milepost[1:] - res.milepost[:-1])[:]) else: self.__logger.debug(f"Failed path computation when map-matching {self.pattern_id}") return empty_frame start = res.path_nodes[-2] if len(res.path_nodes) > 3 else start # Connection to the last stop return self.__build_path_df(graph, path_directions, path_distances, path_links) def __build_path_df(self, graph, path_directions, path_distances, path_links): corresp = pd.DataFrame(graph.network[["link_id", "original_id"]]) if not path_links: return pd.DataFrame({"link_id": [], "dir": []}) result = pd.DataFrame( { "link_id": path_links[1:], "direction": path_directions[1:], "sequence": np.arange(len(path_links) - 1), "distance": path_distances[1:], } ) df = result.merge(corresp, on="link_id", how="left") df.sort_values(by=["sequence"], inplace=True) # We just guarantee that we haven't messed up anything df = df[(df.original_id.shift(-1) != df.original_id) | (df.direction.shift(-1) != df.direction)] crit1 = df.original_id.shift(1) != df.original_id crit2 = df.original_id.shift(-1) != df.original_id df = df[(crit1 & crit2) | (df.distance > DEAD_END_RUN)] df = df[["original_id", "direction"]] df.columns = ["link_id", "dir"] df.loc[df.link_id > 0, "dir"] = 1 df.loc[df.link_id < 0, "dir"] = -1 df.reset_index(drop=True, inplace=True) has_issues = True while has_issues: # We eliminate multiple backs-and-forth on links has_issues = False for i in range(0, df.shape[0] - 2): if df.loc[i : i + 2, "link_id"].abs().unique().shape[0] == 1: df.drop(index=[i, i + 1], inplace=True) df.reset_index(drop=True, inplace=True) has_issues = True break return df def __assemble__mm_shape(self, df: pd.DataFrame): shape = [] # type: List[Tuple[float, float]] for _, rec in df.iterrows(): line_geo = self.__geolinks.loc[self.__geolinks.link_id == abs(rec.link_id)].geometry.values[0] coords = list(line_geo.coords)[::-1] if rec.link_id < 0 else list(line_geo.coords) data = coords[1:] if shape else coords shape.extend(data) self.shape = LineString(shape)
[docs] def get_error(self, what_to_get="culprit") -> Optional[tuple]: """Returns information on the area of the network a map-matching error occurred :Arguments: *what_to_get* (:obj:`str`): The object you want returned. Options are 'culprit' and 'partial_path' :Returns: """ if not self.__map_matching_error: self.__logger.debug("No map-matching error recorded for this pattern") return None if what_to_get not in self.__map_matching_error: return None return self.__map_matching_error[what_to_get]
def __build_pattern_mapping(self): # We find what is the position along routes that we have for each stop and make sure they are always growing self.pattern_mapping = pd.DataFrame( {"seq": np.arange(len(self.full_path)), "link_id": np.abs(self.full_path), "dir": self.fpath_dir} ) self.pattern_mapping = self.pattern_mapping.assign(pattern_id=self.pattern_id, srid=4326) links_with_geo = self.__geolinks.assign(wkb=self.__geolinks.geometry.to_wkb()) links_with_geo = links_with_geo[["link_id", "wkb"]] self.pattern_mapping = self.pattern_mapping.merge(links_with_geo, on="link_id", how="left")