Source code for aequilibrae.project.data.matrices
import os
from os.path import isfile, join
import pandas as pd
from aequilibrae.matrix import AequilibraeMatrix
from aequilibrae.project.data.matrix_record import MatrixRecord
from aequilibrae.project.table_loader import TableLoader
from aequilibrae.utils.db_utils import commit_and_close
[docs]
class Matrices:
    """Gateway into the matrices available/recorded in the model"""
[docs]
    def __init__(self, project):
        self.project = project
        self.logger = project.logger
        self.__items = {}
        self.__fields = []
        self.fldr = os.path.join(project.project_base_path, "matrices")
        tl = TableLoader()
        with commit_and_close(self.project.connect()) as conn:
            matrices_list = tl.load_table(conn, "matrices")
        self.__fields = list(tl.fields)
        if matrices_list:
            self.__properties = list(matrices_list[0].keys())
        for lt in matrices_list:
            if lt["name"] in self.__items:
                if not self.__items[lt["name"]]._exists:
                    del self.__items[lt["name"]]
            if lt["name"] not in self.__items:
                if isfile(join(self.fldr, lt["file_name"])):
                    lt["fldr"] = self.fldr
                    self.__items[lt["name"].lower()] = MatrixRecord(lt, project) 
[docs]
    def reload(self):
        """Discards all memory matrices in memory and loads recreate them"""
        self.__items.clear()
        self.__init__(self.project) 
[docs]
    def clear_database(self) -> None:
        """Removes records from the matrices database that do not exist in disk"""
        with commit_and_close(self.project.connect()) as conn:
            mats = conn.execute("Select name, file_name from matrices;").fetchall()
            remove = [nm for nm, file in mats if not isfile(join(self.fldr, file))]
            if remove:
                self.logger.warning(f'Matrix records not found in disk cleaned from database: {",".join(remove)}')
                remove = [[x] for x in remove]
                conn.executemany("DELETE from matrices where name=?;", remove) 
[docs]
    def update_database(self) -> None:
        """Adds records to the matrices database for matrix files found on disk"""
        existing_files = os.listdir(self.fldr)
        paths_for_existing = [mat.file_name for mat in self.__items.values()]
        new_files = [x for x in existing_files if x not in paths_for_existing]
        new_files = [x for x in new_files if os.path.splitext(x.lower())[1] in [".omx", ".aem"]]
        if new_files:
            self.logger.warning(f'New matrix found on disk. Added to the database: {",".join(new_files)}')
        for fl in new_files:
            mat = AequilibraeMatrix()
            mat.load(join(self.fldr, fl))
            name = None
            if not mat.is_omx():
                name = str(mat.name).lower()
            if not name:
                name = fl.lower()
            name = name.replace(".", "_").replace(" ", "_")
            if name in self.__items:
                i = 0
                while f"{name}_{i}" in self.__items:
                    i += 1
                name = f"{name}_{i}"
            rec = self.new_record(name, fl)
            rec.save() 
[docs]
    def list(self) -> pd.DataFrame:
        """List of all matrices available
        :Returns:
            **df** (:obj:`pd.DataFrame`): Pandas DataFrame listing all matrices available in the model
        """
        def check_if_exists(file_name):
            if os.path.isfile(os.path.join(self.fldr, file_name)):
                return ""
            else:
                return "file missing"
        with commit_and_close(self.project.connect()) as conn:
            df = pd.read_sql_query("Select * from matrices;", conn)
            df = df.assign(status="")
            df.status = df.file_name.apply(check_if_exists)
        return df 
[docs]
    def get_matrix(self, matrix_name: str) -> AequilibraeMatrix:
        """Returns an AequilibraE matrix available in the project
        Raises an error if matrix does not exist
        :Arguments:
            **matrix_name** (:obj:`str`): Name of the matrix to be loaded
        :Returns:
            **matrix** (:obj:`AequilibraeMatrix`): Matrix object
        """
        return self.get_record(matrix_name).get_data() 
[docs]
    def get_record(self, matrix_name: str) -> MatrixRecord:
        """Returns a model Matrix Record for manipulation in memory"""
        if matrix_name.lower() not in self.__items:
            raise Exception("There is no matrix record with that name")
        if not self.__items[matrix_name.lower()]._exists:
            raise Exception("This matrix was deleted during this session")
        return self.__items[matrix_name.lower()] 
[docs]
    def check_exists(self, name: str) -> bool:
        """Checks whether a matrix with a given name exists
        :Returns:
            **exists** (:obj:`bool`): Does the matrix exist?
        """
        return name.lower() in self.__items 
[docs]
    def delete_record(self, matrix_name: str) -> None:
        """Deletes a Matrix Record from the model and attempts to remove from disk"""
        mr = self.get_record(matrix_name)
        mr.delete() 
[docs]
    def new_record(self, name: str, file_name: str, matrix=None) -> MatrixRecord:
        """Creates a new record for a matrix in disk, but does not save it
        If the matrix file is not already on disk, it will fail
        :Arguments:
            **name** (:obj:`str`): Name of the matrix
            **file_name** (:obj:`str`): Name of the file on disk
        :Returns:
            **matrix_record** (:obj:`MatrixRecord`): A matrix record that can be manipulated in memory before saving
        """
        matrix = matrix or AequilibraeMatrix()
        if name in self.__items:
            raise ValueError(f"There is already a matrix of name ({name}). It must be unique.")
        for mat in self.__items.values():
            if mat.file_name == file_name:
                raise ValueError(f"There is already a matrix record for file name ({file_name}). It must be unique.")
        if matrix.cores > 0:
            if isfile(join(self.fldr, file_name)):
                raise FileExistsError(f"{file_name} already exists. Choose a different name or matrix format")
            mat_format = file_name.split(".")[-1].lower()
            if mat_format not in ["omx", "aem"]:
                raise ValueError("Matrix needs to be either OMX or native AequilibraE")
            matrix.export(join(self.fldr, file_name))
            cores = matrix.cores
        else:
            if not isfile(join(self.fldr, file_name)):
                raise FileExistsError(f"{file_name} does not exist. Cannot create this matrix record")
            mat = AequilibraeMatrix()
            mat.load(join(self.fldr, file_name))
            cores = mat.cores
            mat.close()
            del mat
        tp = {key: None for key in self.__fields}
        tp["name"] = name
        tp["file_name"] = file_name
        tp["cores"] = cores
        mr = MatrixRecord(tp, self.project)
        mr.save()
        self.__items[name.lower()] = mr
        self.logger.warning("Matrix Record has been saved to the database")
        return mr 
    def _clear(self):
        """Eliminates records from memory. For internal use only"""
        self.__items.clear()