Source code for aequilibrae.project.project_creation

import logging
import re
from pathlib import Path
from sqlite3 import Connection
from aequilibrae.utils.db_utils import commit_and_close
from aequilibrae.project.tools.migration_manager import MigrationManager, MigrationStatus


req_link_flds = ["link_id", "a_node", "b_node", "direction", "distance", "modes", "link_type"]
req_node_flds = ["node_id", "is_centroid"]
protected_fields = ["ogc_fid", "geometry"]


[docs] def initialize_tables(logger, db_type: str, conn: Connection) -> None: with conn as conn: create_base_tables(conn, logger, db_type) add_triggers(conn, logger, db_type)
[docs] def create_base_tables(conn: Connection, logger: logging.Logger, db_type: str) -> None: base_folder = Path(__file__).resolve().parent / "database_specification" / db_type spec_folder = base_folder / "tables" with open(spec_folder / "table_list.txt", "r") as file_list: all_tables = file_list.readlines() all_tables = [x.rstrip() for x in all_tables] for f in all_tables: qry_file = spec_folder / f"{f}.sql" run_queries_from_sql_file(conn, logger, qry_file) # For a new database construction all present migrations should have already been applied implicitly by the new # schema. So we mark them all as skipped. mm = MigrationManager(base_folder / "migrations" / "migrations.py") mm.mark_all_as_seen(conn) for migration in mm.migrations.values(): migration.mark_as(conn, MigrationStatus.SKIPPED)
[docs] def add_triggers(conn: Connection, logger: logging.Logger, db_type: str) -> None: """Adds consistency triggers to the project""" spec_folder = Path(__file__).resolve().parent / "database_specification" / db_type / "triggers" with open(spec_folder / "triggers_list.txt", "r") as file_list: all_trigger_sets = file_list.readlines() all_trigger_sets = [x.rstrip() for x in all_trigger_sets] for f in all_trigger_sets: qry_file = spec_folder / f"{f}.sql" run_queries_from_sql_file(conn, logger, qry_file)
[docs] def remove_triggers( conn: Connection, logger: logging.Logger, db_type: str, use_aequilibrae_prefix: bool = True ) -> None: spec_folder = Path(__file__).resolve().parent / "database_specification" / db_type / "triggers" with open(spec_folder / "triggers_list.txt", "r") as file_list: all_trigger_sets = file_list.readlines() create_drop_regex = re.compile(r"create\s+trigger\s+(\w+)", flags=re.I) for table in all_trigger_sets: qry_file = spec_folder / f"{table.rstrip()}.sql" with open(qry_file, "r") as sql_file: query_list = sql_file.read() for cmd in query_list.split("--#"): for qry in cmd.split("\n"): if qry[:2] == "--": continue while " " in qry: qry = qry.replace(" ", " ") m = re.search(create_drop_regex, qry) if m: name = m.group(1).lower() if not use_aequilibrae_prefix: name = name.removeprefix("aequilibrae_") try: conn.execute(f"drop trigger if exists {name}") except Exception as e: logger.error(f"Failed removing triggers table - > {e.args}") logger.error(f"Point of failure - > {qry}")
[docs] def run_queries_from_sql_file(conn: Connection, logger: logging.Logger, qry_file: Path) -> None: with open(qry_file, "r") as sql_file: query_list = sql_file.read() # Running one query/command at a time helps debugging in the case a particular command fails for cmd in query_list.split("--#"): try: conn.execute(cmd) except Exception as e: msg = f"Error running SQL command: {e.args}" logger.error(msg) logger.info(cmd) raise e
[docs] def recreate_columns(conn: Connection, logger: logging.Logger, table: str, old_table: str) -> dict[str, str]: """ Recreate columns for a table if any were added. Returns a dict of the old column names to type """ columns = conn.execute(f"SELECT name, type FROM PRAGMA_TABLE_INFO('{old_table}') AS table_info").fetchall() columns = {f"{x[0]}": x[1] for x in columns if x[0]} orig_columns = conn.execute(f"SELECT name, type FROM PRAGMA_TABLE_INFO('{table}') AS table_info").fetchall() orig_columns = {f"{x[0]}" for x in orig_columns} new_columns = {k: v for k, v in columns.items() if k not in orig_columns} logger.info(f"Found {len(new_columns)} new columns: {list(new_columns.keys())}") sql = "ALTER TABLE {} ADD COLUMN {} {};" for k, v in new_columns.items(): conn.execute(sql.format(table, k, v)) return columns