import math
from sqlite3 import Connection
from typing import List, Tuple, Optional
import numpy as np
import pandas as pd
import shapely.wkb
from shapely.geometry import LineString, Polygon
from shapely.ops import substring
from aequilibrae.log import logger
from aequilibrae.transit.functions.get_srid import get_srid
from aequilibrae.utils.geo_index import GeoIndex
from .basic_element import BasicPTElement
from .link import Link
from .mode_correspondence import mode_correspondence
DEAD_END_RUN = 40
[docs]
class Pattern(BasicPTElement):
"""
Represents a stop pattern for a particular route, as defined in GTFS.
"""
def __init__(self, route_id, gtfs_feed) -> None:
"""
Args:
*pattern_id* (:obj:`str`): Pre-computed ID for this pattern
*geotool* (:obj:`Geo`): Suite of geographic utilities.
"""
self.pattern_hash = ""
self.pattern_id = -1
self.route_id = route_id
self.route = ""
self.agency_id = None
self.longname = ""
self.shortname = ""
self.description = ""
self.seated_capacity = None
self.total_capacity = None
self.__srid = get_srid()
self.__geotool = gtfs_feed.geotool
self.__geolinks = self.__geotool.network.links.data
self.__logger = logger
self.__feed = gtfs_feed
# For map matching
self.raw_shape: LineString = None
self._stop_based_shape: LineString = None
self.shape: LineString = None
self.route_type: int = None
self.links: List[Link] = []
self.network_candidates = []
self.full_path: List[int] = []
self.fpath_dir: List[int] = []
self.pattern_mapping = []
self.stops = []
self.__map_matching_error = {}
self.__graph = None
self.__res = None
self.__curr_net_nodes_from_stops = []
self.__net_links_from_stops = []
self.__net_nodes_from_stops = []
self.__mm_fail_position = -1
self.__map_matched = False
self.__match_quality = None
self.shape_length = -1
[docs]
def save_to_database(self, conn: Connection, commit=True) -> None:
"""Saves the pattern to the routes table"""
shp = self.best_shape()
geo = None if shp is None else shp.wkb
data = [
self.pattern_id,
self.route_id,
self.route,
self.agency_id,
self.shortname,
self.longname,
self.description,
self.route_type,
self.seated_capacity,
self.total_capacity,
geo,
self.__srid,
]
sql = """insert into routes (pattern_id, route_id, route, agency_id, shortname, longname, description, route_type, seated_capacity,
total_capacity, geometry) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ST_Multi(GeomFromWKB(?, ?)));"""
conn.execute(sql, data)
if self.pattern_mapping and self.shape:
sqlgeo = """insert into pattern_mapping (pattern_id, seq, link, dir, geometry)
values (?, ?, ?, ?, GeomFromWKB(?, ?));"""
sql = """insert into pattern_mapping (pattern_id, seq, link, dir)
values (?, ?, ?, ?);"""
for record in self.pattern_mapping:
if record[-1] is None:
conn.execute(sql, record[:-1])
else:
geo = shapely.wkb.loads(record[-1])
if isinstance(geo, LineString):
conn.execute(sqlgeo, record + [self.__srid])
else:
conn.execute(sql, record[:-1])
[docs]
def best_shape(self) -> LineString:
"""Gets the best version of shape available for this pattern"""
shp = self._stop_based_shape if self.raw_shape is None else self.raw_shape
shp = shp if self.shape is None else self.shape
return shp
[docs]
def map_match(self):
"""Map matches the route into the network, considering its appropriate shape.
Part of the map-matching process is to find the network links corresponding the pattern's
raw shape, so that method will be called in case it has not been called before.
The basic algorithm behind the map-matching algorithm is described in https://doi.org/10.3141%2F2646-08
In a nutshell, we compute the shortest path between the nodes corresponding to the links to which
stops were geographically matched, for each pair of identified links.
We do not consider links that are in perfect sequence, as we found that it introduces severe issues when
stops are close to intersections without clear upsides.
When issues are found, we remove the stops in the immediate vicinity of the issue and attempt new
path finding. The First and last stops/corresponding links are always kept.
If an error was found, (record for it will show in the log), it is stored within the object.
"""
if self.__map_matched:
return
self.__logger.debug(f"Map-matching pattern ID {self.pattern_id}")
if not self.__feed.graphs:
self.__feed.builds_link_graphs_with_broken_stops()
if mode_correspondence[self.route_type] not in self.__feed.graphs:
return
self.__feed.path_store.add_graph(
self.__feed.graphs[mode_correspondence[self.route_type]], mode_correspondence[self.route_type]
)
self.__map_matching_error.clear()
df = self.__map_matching_complete_path_building()
if df.shape[0] == 0:
logger.warning(f"Could not rebuild path for pattern {self.pattern_id}")
return
self.full_path = df.link_id.to_list()
self.fpath_dir = df.dir.to_list()
self.__assemble__mm_shape(df)
self.__build_pattern_mapping()
logger.info(f"Map-matched pattern {self.pattern_id}")
def __graph_discount(self, connected_stops):
self.__geolinks = self.__geolinks[self.__geolinks.modes.str.contains(mode_correspondence[self.route_type])]
link_idx = GeoIndex()
for _, row in self.__geolinks.iterrows():
link_idx.insert(feature_id=row.link_id, geometry=row.geometry)
links = set()
for stop in connected_stops:
links.update(list(link_idx.nearest(stop.geo, 3)))
buffer = self.best_shape().buffer(40) # type: Polygon
return [
lnk
for lnk in links
if buffer.contains(self.__geolinks.loc[self.__geolinks.link_id == lnk].geometry.values[0])
]
def __map_matching_complete_path_building(self):
mode_ = mode_correspondence[self.route_type]
# We get the graph for our job
graph = self.__feed.graphs[mode_]
empty_frame = pd.DataFrame([])
# We search for disconnected stops:
candidate_stops = list(self.stops)
stop_node_idxs = [stop.___map_matching_id__[self.route_type] for stop in candidate_stops]
node0 = graph.network.a_node[~graph.network.a_node.isin(graph.centroids)].min()
connected_stops = []
for i, stop in enumerate(candidate_stops):
node_o = stop.___map_matching_id__[self.route_type]
logger.debug(f"Computing paths between {node_o} and {node0}")
res = self.__feed.path_store.get_path_results(node_o, mode_)
res.update_trace(int(node0))
# Get skims, as proxy for connectivity, for all stops other than the origin
other_nodes = stop_node_idxs[:i] + stop_node_idxs[i + 1 :]
dest_skim = res.skims[other_nodes, 0]
if dest_skim.min() < 1.0e308:
candidate_stops = candidate_stops[i:]
connected_stops = [stop for i, stop in enumerate(candidate_stops[1:]) if dest_skim[i] < 1.0e308]
connected_stops = [candidate_stops[0]] + connected_stops
break
if not connected_stops:
self.__logger.critical(f"Route completely disconnected. {self.route}/{self.route_id}")
return empty_frame
graph.cost = np.array(graph.graph.distance)
likely_links = self.__graph_discount(connected_stops)
graph.cost[graph.graph.original_id.abs().isin(likely_links)] *= 0.1
fstop = connected_stops[0]
if len(connected_stops) == 1:
return empty_frame
if len(connected_stops) == 2:
nstop = connected_stops[1].___map_matching_id__[self.route_type]
logger.debug(f"Computing paths between {fstop.___map_matching_id__[self.route_type]} and {nstop}")
res = self.__feed.path_store.get_path_results(fstop.___map_matching_id__[self.route_type], mode_)
res.update_trace(int(nstop))
if res.milepost is None:
return empty_frame
pdist = list(res.milepost[1:-1] - res.milepost[:-2])[1:]
plnks = list(res.path[1:-1])
pdirecs = list(res.path_link_directions[1:-1])
return self.__build_path_df(graph, pdirecs, pdist, plnks)
path_links = []
path_directions = []
path_distances = []
start = fstop.___map_matching_id__[self.route_type]
for idx, tstop in enumerate(connected_stops[1:]):
end = tstop.___map_matching_id__[self.route_type]
not_last = idx + 2 <= len(connected_stops) - 1
if not_last:
following_stop = connected_stops[idx + 2]
n_end = following_stop.___map_matching_id__[self.route_type]
logger.debug(f"Computing paths between {start} and {end}")
res = self.__feed.path_store.get_path_results(start, mode_)
res.update_trace(int(end))
connection_candidates = graph.network[graph.network.a_node == end].b_node.values
min_cost = np.inf
access_node = -1
follow_val = 0
for connec in connection_candidates:
if connec == start:
continue
if not_last:
res1 = self.__feed.path_store.get_path_results(connec, mode_)
res1.update_trace(int(n_end))
if res1.milepost is None:
continue
follow_val = res1.milepost[-1]
estimate = follow_val + res.skims[connec, 0]
if estimate < min_cost:
min_cost = estimate
access_node = connec
if access_node >= 0:
res.update_trace(int(access_node))
shift = 1 if not_last else 0
if len(res.path) <= 1 + shift:
# Stop connectors only
continue
if not_last:
path_links.extend(list(res.path[:-1]))
path_directions.extend(list(res.path_link_directions[:-1]))
path_distances.extend(list(res.milepost[1:] - res.milepost[:-1])[1:])
else:
path_links.extend(list(res.path[:]))
path_directions.extend(list(res.path_link_directions[:]))
path_distances.extend(list(res.milepost[1:] - res.milepost[:-1])[:])
else:
self.__logger.debug(f"Failed path computation when map-matching {self.pattern_id}")
return empty_frame
start = res.path_nodes[-2] if len(res.path_nodes) > 3 else start
# Connection to the last stop
return self.__build_path_df(graph, path_directions, path_distances, path_links)
def __build_path_df(self, graph, path_directions, path_distances, path_links):
corresp = pd.DataFrame(graph.network[["link_id", "original_id"]])
if not path_links:
return pd.DataFrame({"link_id": [], "dir": []})
result = pd.DataFrame(
{
"link_id": path_links[1:],
"direction": path_directions[1:],
"sequence": np.arange(len(path_links) - 1),
"distance": path_distances[1:],
}
)
df = result.merge(corresp, on="link_id", how="left")
df.sort_values(by=["sequence"], inplace=True) # We just guarantee that we haven't messed up anything
df = df[(df.original_id.shift(-1) != df.original_id) | (df.direction.shift(-1) != df.direction)]
crit1 = df.original_id.shift(1) != df.original_id
crit2 = df.original_id.shift(-1) != df.original_id
df = df[(crit1 & crit2) | (df.distance > DEAD_END_RUN)]
df = df[["original_id", "direction"]]
df.columns = ["link_id", "dir"]
df.loc[df.link_id > 0, "dir"] = 0
df.loc[df.link_id < 0, "dir"] = 1
df.reset_index(drop=True, inplace=True)
has_issues = True
while has_issues:
# We eliminate multiple backs-and-forth on links
has_issues = False
for i in range(0, df.shape[0] - 2):
if df.loc[i : i + 2, "link_id"].abs().unique().shape[0] == 1:
df.drop(index=[i, i + 1], inplace=True)
df.reset_index(drop=True, inplace=True)
has_issues = True
break
return df
def __assemble__mm_shape(self, df: pd.DataFrame):
shape = [] # type: List[Tuple[float, float]]
for _, rec in df.iterrows():
line_geo = self.__geolinks.loc[self.__geolinks.link_id == abs(rec.link_id)].geometry.values[0]
coords = list(line_geo.coords)[::-1] if rec.link_id < 0 else list(line_geo.coords)
data = coords[1:] if shape else coords
shape.extend(data)
self.shape = LineString(shape)
[docs]
def get_error(self, what_to_get="culprit") -> Optional[tuple]:
"""
Returns information on the area of the network a map-matching error occurred
Args:
*what_to_get* (:obj:`str`): The object you want returned. Options are 'culprit' and 'partial_path'
:return:
"""
if not self.__map_matching_error:
self.__logger.debug("No map-matching error recorded for this pattern")
return None
if what_to_get not in self.__map_matching_error:
return None
return self.__map_matching_error[what_to_get]
def __build_pattern_mapping(self):
# We find what is the position along routes that we have for each stop and make sure they are always growing
self.pattern_mapping = []
segments = [LineString([pt1, pt2]) for pt1, pt2 in zip(self.shape.coords, self.shape.coords[1:])]
seg_len = [seg.length for seg in segments]
distances = []
min_idx_so_far = 0
for i, stop in enumerate(self.stops):
d = [round(stop.geo.distance(line), 1) for line in segments]
idx = d[min_idx_so_far:].index(min(d[min_idx_so_far:])) + min_idx_so_far
idx = idx if i > 0 else 0
projection = segments[idx].project(stop.geo) + sum(seg_len[:idx])
distances.append(projection)
min_idx_so_far = idx
# the list needs to be monotonically increasing
if not all(x <= y for x, y in zip(distances, distances[1:])):
for i, (x, y) in enumerate(zip(distances, distances[1:])):
distances[i + 1] = distances[i] if y < x else distances[i + 1]
logger.warning(
f"""Pattern {self.pattern_id} has a map-matching pattern resulting in backwards movement.
It was fixed, but it should be checked."""
)
distances[-1] = distances[-1] if self.shape.length > distances[-1] else self.shape.length - 0.00001
# We make sure we don't have projections going beyond the length we will accumulate
# This is only required because of numerical precision issues
tot_broken_length = sum(
[self.__geolinks.loc[self.__geolinks.link_id == abs(x)].geometry.values[0].length for x in self.full_path]
)
distances = [min(x, tot_broken_length) for x in distances]
pid = self.pattern_id
stop_pos = 0
cum_dist = 0
index = 0
for link_pos, link_id in enumerate([abs(x) for x in self.full_path]):
direc = self.fpath_dir[link_pos]
link_geo = self.__geolinks.loc[self.__geolinks.link_id == link_id].geometry.values[0]
link_geo = link_geo if direc == 0 else LineString(link_geo.coords[::-1])
if distances[stop_pos] > cum_dist + link_geo.length:
# If we have no stops falling right in this link
dt = [pid, index, link_id, direc, link_geo.wkb]
self.pattern_mapping.append(dt)
index += 1
cum_dist += link_geo.length
continue
start_point = cum_dist
end_point = cum_dist + link_geo.length
while cum_dist + link_geo.length >= distances[stop_pos]:
milepost = distances[stop_pos]
wkb = None if stop_pos == 0 else substring(self.shape, start_point, milepost).wkb
stop = self.stops[stop_pos]
dt = [pid, index, link_id, direc, wkb]
self.pattern_mapping.append(dt)
index += 1
stop_pos += 1
if stop_pos >= len(distances):
break
start_point = milepost
cum_dist += link_geo.length
if start_point != end_point:
wkb = substring(self.shape, start_point, end_point).wkb
self.pattern_mapping.append([pid, index, link_id, direc, wkb])
index += 1
if stop_pos >= len(distances):
break
self.__compute_match_quality()
def __compute_match_quality(self):
dispersion = [(self.shape.distance(stop.geo) * math.pi * 6371000 / 180) for stop in self.stops]
dispersion = sum([x * x for x in dispersion]) / len(self.stops)
self.__match_quality = dispersion