import functools
import logging
import os
import shutil
import sqlite3
from collections import namedtuple
from contextlib import contextmanager
from pathlib import Path
import pandas as pd
from aequilibrae import global_logger
from aequilibrae.context import activate_project, get_active_project
from aequilibrae.log import Log
from aequilibrae.log import get_log_handler
from aequilibrae.parameters import Parameters
from aequilibrae.project.about import About
from aequilibrae.project.data import Matrices, Results
from aequilibrae.project.network import Network
from aequilibrae.project.project_cleaning import clean
from aequilibrae.project.project_creation import initialize_tables
from aequilibrae.project.scenario import Scenario
from aequilibrae.project.tools import MigrationManager
from aequilibrae.project.zoning import Zoning
from aequilibrae.reference_files import spatialite_database, demo_init_py
from aequilibrae.transit import Transit
from aequilibrae.utils.db_utils import commit_and_close, safe_connect
from aequilibrae.utils.model_run_utils import import_file_as_module
from aequilibrae.utils.spatialite_utils import connect_spatialite
[docs]
class Project:
"""AequilibraE project class
.. code-block:: python
:caption: Create Project
>>> new_project = Project()
>>> new_project.new(project_path)
# Safely closes the project
>>> new_project.close()
.. code-block:: python
:caption: Open Project
>>> existing_project = Project()
>>> existing_project.open(project_path)
>>> existing_project.close()
"""
def __init__(self):
self.root_scenario: Scenario = None
self.scenario: Scenario = None
[docs]
@classmethod
def from_path(cls, project_folder):
project = cls()
project.open(project_folder)
return project
[docs]
def open(self, project_path: str) -> None:
"""
Loads project from disk
:Arguments:
**project_path** (:obj:`str`): Full path to the project data folder. If the project inside does
not exist, it will fail.
"""
base_path = Path(project_path)
file_name = base_path / "project_database.sqlite"
if not file_name.is_file() or not file_name.exists():
raise FileNotFoundError("Model does not exist. Check your path and try again")
path_to_file = file_name
self.root_scenario = Scenario(
name="root",
base_path=base_path,
path_to_file=path_to_file,
)
self.scenario = self.root_scenario
self.scenario.logger = self.__setup_logger()
self.activate()
self.__load_objects()
global_logger.info(f"Opened project on {self.project_base_path}")
clean(self)
@property
def project_base_path(self):
return self.scenario.base_path
@property
def path_to_file(self):
return self.scenario.path_to_file
@property
def about(self) -> About:
return self.scenario.about
@property
def logger(self) -> logging.Logger:
return self.scenario.logger
@property
def network(self) -> Network:
return self.scenario.network
@property
def transit(self) -> Transit:
return self.scenario.transit
@property
def matrices(self) -> Matrices:
return self.scenario.matrices
@property
def results(self) -> Results:
return self.scenario.results
@property
def _project_database_path(self) -> Path:
return self.project_base_path / "project_database.sqlite"
@property
def _results_database_path(self) -> Path:
return self.project_base_path / "results_database.sqlite"
@property
def _transit_database_path(self) -> Path:
return self.project_base_path / "public_transport.sqlite"
@property
@contextmanager
def db_connection(self):
with commit_and_close(self._project_database_path, spatial=False) as conn:
yield conn
@property
@contextmanager
def db_connection_spatial(self):
with commit_and_close(self._project_database_path, spatial=True) as conn:
yield conn
@property
@contextmanager
def results_connection(self):
with commit_and_close(self._results_database_path, spatial=False, missing_ok=True) as conn:
yield conn
@property
@contextmanager
def transit_connection(self):
with commit_and_close(self._transit_database_path, spatial=True, missing_ok=True) as conn:
yield conn
[docs]
def new(self, project_path: str) -> None:
"""Creates a new project
:Arguments:
**project_path** (:obj:`str`): Full path to the project data folder. If folder exists, it will fail
"""
base_path = Path(project_path)
path_to_file = base_path / "project_database.sqlite"
if os.path.isdir(project_path):
raise FileExistsError("Location already exists. Choose a different name or remove the existing directory")
# We create the project folder and create the base file
base_path.mkdir(parents=True, exist_ok=True)
self.root_scenario = Scenario(
name="root",
base_path=base_path,
path_to_file=path_to_file,
)
self.scenario = self.root_scenario
self.scenario.logger = self.__setup_logger()
self.activate()
self.__create_empty_network()
self.__load_objects()
self.about.create()
global_logger.info(f"Created project on {base_path}")
[docs]
def close(self) -> None:
"""Safely closes the project"""
if not self.project_base_path:
global_logger.warning("This Aequilibrae project is not opened")
return
try:
with self.project.db_connection as conn:
conn.commit()
clean(self)
for obj in [self.parameters, self.network]:
del obj
del self.network.link_types
del self.network.modes
global_logger.info(f"Closed project on {self.project_base_path}")
except (sqlite3.ProgrammingError, AttributeError):
global_logger.warning(f"This project at {self.project_base_path} is already closed")
finally:
handlers = global_logger.handlers[:] # Make a copy of the handlers list
for handler in handlers:
handler.close() # Explicitly close each handler to release file handles
global_logger.removeHandler(handler) # Remove the handler from the logger
self.deactivate()
[docs]
def activate(self):
activate_project(self)
[docs]
def deactivate(self):
if get_active_project(must_exist=False) is self:
activate_project(None)
[docs]
def log(self) -> Log:
"""Returns a log object
allows the user to read the log or clear it"""
return Log(self.project_base_path)
[docs]
def upgrade(self):
"""
Find and apply all applicable migrations.
All database upgrades are applied within a single transaction.
If skipping a specific migration is required, use the ``aequilbrae.project.tools.MigrationManager`` object
directly. Consult it's documentation page for details. Take care when skipping migrations.
"""
global_logger.info("Starting database upgrades")
connections = {
"project_conn": connect_spatialite(self._project_database_path),
"transit_conn": None,
"results_conn": None,
}
targets = [
(MigrationManager(MigrationManager.network_migration_file), "project_conn"),
]
if (self.project_base_path / "public_transport.sqlite").exists():
targets.append((MigrationManager(MigrationManager.transit_migration_file), "transit_conn"))
connections["transit_conn"] = connect_spatialite(self._transit_database_path)
if (self.project_base_path / "results_database.sqlite").exists():
connections["results_conn"] = safe_connect(self._results_database_path)
try:
for mm, main_conn in targets:
with connections[main_conn] as conn:
mm.mark_all_as_seen(conn)
for mm, main_conn in targets:
mm.upgrade(main_conn, connections=connections)
global_logger.info("Completed database upgrades")
finally:
for _, main_conn in targets:
connections[main_conn].close()
def __load_objects(self):
matrix_folder = self.project_base_path / "matrices"
matrix_folder.mkdir(parents=True, exist_ok=True)
self.scenario.network = Network(self)
self.scenario.about = About(self)
self.scenario.matrices = Matrices(self)
self.scenario.results = Results(self)
@property
def project_parameters(self) -> Parameters:
return Parameters(path=self.project_base_path)
@property
def parameters(self) -> dict:
return self.project_parameters.parameters
@property
def run(self):
"""
Load and return the AequilibraE run module with the default arguments from
``parameters.yml`` partially applied.
Refer to ``run/__init__.py`` file within the project folder for documentation.
"""
entry_points = self.parameters["run"]
module = import_file_as_module(
self.root_scenario.base_path / "run" / "__init__.py", "aequilibrae.run", force=True
)
res = []
sentinal = object()
for name, kwargs in entry_points.items():
attr = getattr(module, name)
if attr is sentinal:
raise RuntimeError(f"expected to find callable '{name}' in the run module but didn't")
elif not callable(attr):
raise RuntimeError(f"found symbol '{name}' in the run module but it is not callable")
func = functools.partial(attr, **(kwargs if kwargs is not None else {}))
res.append((name, func))
Run = namedtuple("Run", [k for k, _ in res])
return Run._make([v for _, v in res])
[docs]
def check_file_indices(self) -> None:
"""Makes results_database.sqlite and the matrices folder compatible with project database"""
raise NotImplementedError
@property
def zoning(self):
return Zoning(self.network)
def __create_empty_network(self):
shutil.copyfile(spatialite_database, self.path_to_file)
pth = self.project_base_path / "run"
pth.mkdir(parents=True, exist_ok=True)
shutil.copyfile(demo_init_py, pth / "__init__.py")
# Write parameters to the project folder
p = self.project_parameters
p.parameters["system"]["logging_directory"] = str(self.project_base_path)
p.write_back()
# Create actual tables
with self.db_connection_spatial as conn:
conn.execute("PRAGMA foreign_keys = ON;")
initialize_tables(self.logger, "network", conn=conn)
def __setup_logger(self):
logger = logging.getLogger(f"aequilibrae.{self.project_base_path}")
logger.propagate = False
logger.setLevel(logging.DEBUG)
par = self.parameters or self.project_parameters._default
do_log = par["system"]["logging"]
if do_log:
logger.addHandler(get_log_handler(self.project_base_path / "aequilibrae.log"))
return logger
[docs]
def list_scenarios(self):
"""
Lists the existing scenarios.
:Returns:
**scenarios** (:obj:`pd.DataFrame`): Pandas DataFrame with existing scenarios
"""
with self.db_connection as conn:
return pd.read_sql("SELECT * FROM scenarios", conn)
[docs]
def use_scenario(self, scenario_name: str):
"""
Switch the active scenario.
:Arguments:
**scenario_name** (:obj:`str`): name of the scenario to be activated
"""
with commit_and_close(self.root_scenario.path_to_file, spatial=False) as conn:
if conn.execute("SELECT 1 FROM scenarios where scenario_name=?", (scenario_name,)).fetchone() is None:
raise ValueError(f"scenario '{scenario_name}' does not exist")
if scenario_name == "root":
self.scenario = self.root_scenario
else:
self.scenario = Scenario(
name=scenario_name,
base_path=self.root_scenario.base_path / "scenarios" / scenario_name,
path_to_file=self.root_scenario.base_path / "scenarios" / scenario_name / "project_database.sqlite",
)
self.scenario.logger = self.__setup_logger()
self.__load_objects()
[docs]
def create_empty_scenario(self, scenario_name: str, description: str = ""):
"""
Creates an empty scenario, without any links, nodes, and zones.
:Arguments:
**scenario_name** (:obj:`str`): scenario name
**description** (:obj:`str`): useful scenario description
"""
scenario_path = self.root_scenario.base_path / "scenarios" / scenario_name
current_scenario = self.scenario.name
self.use_scenario("root")
try:
with self.db_connection as conn:
if (
conn.execute("SELECT 1 FROM scenarios where scenario_name=?", (scenario_name,)).fetchone()
is not None
):
raise ValueError("a scenario of that name already exists")
scenario_path.mkdir(parents=True, exist_ok=True)
db = scenario_path / "project_database.sqlite"
shutil.copyfile(spatialite_database, db)
# Write parameters to the project folder
p = Parameters(path=scenario_path)
p.parameters["system"]["logging_directory"] = str(scenario_path)
p.write_back()
# Create actual tables
with commit_and_close(db, spatial=True) as conn:
conn.execute("PRAGMA foreign_keys = ON;")
initialize_tables(self.logger, "network", conn=conn)
conn.execute("DROP TABLE IF EXISTS scenarios")
with self.db_connection as conn:
conn.execute(
"INSERT INTO scenarios (scenario_name, description) VALUES(?,?)", (scenario_name, description)
)
finally:
self.use_scenario(current_scenario)
[docs]
def clone_scenario(self, scenario_name: str, description: str = ""):
"""
Clones the active scenario.
:Arguments:
**scenario_name** (:obj:`str`): scenario name
**description** (:obj:`str`): useful scenario description
"""
scenario_path = self.root_scenario.base_path / "scenarios" / scenario_name
current_scenario = self.scenario.name
matrices_path = self.matrices.fldr
project_db_path = self._project_database_path
transit_db_path = self._transit_database_path
results_db_path = self._results_database_path
parameters_path = self.project_parameters.file
self.use_scenario("root")
try:
with self.db_connection as conn:
if (
conn.execute("SELECT 1 FROM scenarios where scenario_name=?", (scenario_name,)).fetchone()
is not None
):
raise ValueError("a scenario of that name already exists")
shutil.copytree(matrices_path, scenario_path / "matrices")
db = scenario_path / "project_database.sqlite"
shutil.copyfile(project_db_path, db)
try:
shutil.copyfile(transit_db_path, scenario_path / "public_transport.sqlite")
except FileNotFoundError:
pass
try:
shutil.copyfile(results_db_path, scenario_path / "results_database.sqlite")
except FileNotFoundError:
pass
shutil.copy(parameters_path, scenario_path)
with commit_and_close(db, spatial=True) as conn:
conn.execute("DROP TABLE IF EXISTS scenarios")
with self.db_connection as conn:
conn.execute(
"INSERT INTO scenarios (scenario_name, description) VALUES(?,?)", (scenario_name, description)
)
finally:
self.use_scenario(current_scenario)