Source code for aequilibrae.project.network.connector_creation

from sqlite3 import Connection
from typing import Optional

from shapely.geometry import LineString, Polygon

from aequilibrae.utils.db_utils import commit_and_close

INFINITE_CAPACITY = 99999


[docs] def connector_creation( zone_id: int, mode_id: str, network, proj_nodes, proj_links, link_types="", connectors=1, conn: Optional[Connection] = None, delimiting_area: Polygon = None, ): if len(mode_id) > 1: raise Exception("We can only add centroid connectors for one mode at a time") with conn or network.project.db_connection as connec: logger = network.project.logger if sum(connec.execute("select count(*) from nodes where node_id=?", [zone_id]).fetchone()) == 0: logger.warning("This centroid does not exist. Please create it first") return sql = "select count(*) from links where a_node=? and instr(modes,?) > 0" if connec.execute(sql, [zone_id, mode_id]).fetchone()[0] > 0: logger.warning("Mode is already connected") return centroid = proj_nodes.query("node_id == @zone_id") # type: gpd.GeoDataFrame centroid = centroid.rename(columns={"node_id": "zone_id"})[["zone_id", "geometry"]] nodes = proj_nodes.query("is_centroid != 1 and modes.str.contains(@mode_id)", engine="python") if len(link_types) > 0: nodes = nodes[nodes.link_types.str.contains("|".join(list(link_types)))] if delimiting_area is not None: nodes = nodes[nodes.geometry.within(delimiting_area)] if nodes.empty: zone_id = centroid["zone_id"].values[0] logger.warning(f"No nodes found for centroid {zone_id} (mode {mode_id} and link types {link_types})") return joined = nodes[["node_id", "geometry"]].sjoin_nearest(centroid, distance_col="distance_connector") joined = joined.nsmallest(connectors, "distance_connector") # Check if link with a/b nodes exists to avoid unnecessary repetition centr_geo = centroid.geometry.values[0] links = network.links query = """(a_node==@zone_id | b_node==@zone_id) & (a_node==@rec.node_id | b_node==@rec.node_id) & link_type=='centroid_connector'""" for _, rec in joined.iterrows(): link_exist = proj_links.query(query) if link_exist.empty: link = links.new() link.geometry = LineString([centr_geo, rec.geometry]) link.modes = mode_id link.direction = 0 link.link_type = "centroid_connector" link.name = f"centroid connector zone {zone_id}" link.capacity_ab = INFINITE_CAPACITY link.capacity_ba = INFINITE_CAPACITY link.save(conn) else: link = links.get(link_exist.link_id.values[0]) link.add_mode(mode_id) link.save(conn) if not joined.empty: logger.info(f"{joined.shape[0]} new centroid connectors for mode {mode_id} added for centroid {zone_id}")