Source code for aequilibrae.distribution.ipf

import os
from datetime import datetime
from time import perf_counter
from uuid import uuid4

import numpy as np
import pandas as pd
import yaml
from aequilibrae.distribution.ipf_core import ipf_core

from aequilibrae.context import get_active_project
from aequilibrae.matrix import AequilibraeMatrix
from aequilibrae.project.data.matrix_record import MatrixRecord


[docs] class Ipf: """Iterative proportional fitting procedure .. code-block:: python >>> from aequilibrae.distribution import Ipf >>> import pandas as pd >>> import numpy as np >>> project = create_example(project_path) >>> matrix = project.matrices.get_matrix("demand_omx") >>> matrix.computational_view() >>> vectors = pd.DataFrame({"productions":np.zeros(matrix.zones), "attractions":np.zeros(matrix.zones)}, index=matrix.index) >>> vectors["productions"] = matrix.rows() >>> vectors["attractions"] = matrix.columns() >>> ipf_args = {"matrix": matrix, ... "vectors": vectors, ... "row_field": "productions", ... "column_field": "attractions", ... "nan_as_zero": False} >>> fratar = Ipf(**ipf_args) >>> fratar.fit() # We can get back to our OMX matrix in the end >>> fratar.output.export(os.path.join(my_folder_path, "to_omx_output.omx")) """
[docs] def __init__(self, project=None, **kwargs): """ Instantiates the IPF problem :Arguments: **matrix** (:obj:`AequilibraeMatrix`): Seed Matrix **vectors** (:obj:`pd.DataFrame`): Dataframe with the vectors to be used for the IPF **row_field** (:obj:`str`): Field name that contains the data for the row totals **column_field** (:obj:`str`): Field name that contains the data for the column totals **parameters** (:obj:`str`, *Optional*): Convergence parameters. Defaults to those in the parameter file **nan_as_zero** (:obj:`bool`, *Optional*): If Nan values should be treated as zero. Defaults to ``True`` :Results: **output** (:obj:`AequilibraeMatrix`): Result Matrix **report** (:obj:`list`): Iteration and convergence report **error** (:obj:`str`): Error description """ self.cpus = 0 self.parameters = kwargs.get("parameters", self.__get_parameters("ipf")) # Seed matrix self.matrix = kwargs.get("matrix", None) # type: AequilibraeMatrix # NaN as zero self.nan_as_zero = kwargs.get("nan_as_zero", True) # row vector self.__col_vector: np.array = np.zeros([]) self.__row_vector: np.array = np.zeros([]) self.vectors = kwargs.get("vectors", None) self.rows_ = kwargs.get("row_field", None) self.cols_ = kwargs.get("column_field", None) self.output_name = kwargs.get("output") self.output = AequilibraeMatrix() self.error = None self.__required_parameters = ["convergence level", "max iterations", "balancing tolerance"] self.error_free = True self.report = [" ##### IPF computation ##### ", ""] self.gap = None self.procedure_date = "" self.procedure_id = ""
def __check_data(self): self.__check_parameters() # check data types if not isinstance(self.vectors, pd.DataFrame): raise TypeError("Row vector needs to be a Pandas DataFrame") if not isinstance(self.matrix, AequilibraeMatrix): raise TypeError("Seed matrix needs to be an instance of AequilibraeMatrix") # Check data type if not np.issubdtype(self.matrix.dtype, np.floating): raise ValueError("Seed matrix need to be a float type") row_data = self.vectors[self.rows_] col_data = self.vectors[self.cols_] if not np.issubdtype(row_data.dtype, np.floating): raise ValueError("production/rows vector must be a float type") if not np.issubdtype(col_data.dtype, np.floating): raise ValueError("Attraction/columns vector must be a float type") if not np.array_equal(self.matrix.index, self.vectors.index): raise ValueError("Indices from vectors do not match those from seed matrix") # Check if matrix was set for computation if self.matrix.matrix_view is None: raise ValueError("Matrix needs to be set for computation") else: if len(self.matrix.matrix_view.shape[:]) > 2: raise ValueError("Matrix' computational view needs to be set for a single matrix core") # check balancing: sum_rows = np.nansum(row_data) sum_cols = np.nansum(col_data) self.__col_vector = col_data.to_numpy() * (sum_rows / sum_cols) self.__row_vector = row_data.to_numpy() if abs(sum_rows - sum_cols) > self.parameters["balancing tolerance"]: self.error = "Vectors are not balanced" else: # guarantees that they are precisely balanced self.__col_vector = col_data.to_numpy() * (sum_rows / sum_cols) def __check_parameters(self): for i in self.__required_parameters: if i not in self.parameters: self.error = "Parameters error. It needs to be a dictionary with the following keys: " for t in self.__required_parameters: self.error = self.error + t + ", " if self.error: raise ValueError(self.error)
[docs] def fit(self): """Runs the IPF instance problem to adjust the matrix Resulting matrix is the *output* class member """ self.procedure_id = uuid4().hex self.procedure_date = str(datetime.today()) t = perf_counter() self.__check_data() if self.error_free: max_iter = self.parameters["max iterations"] conv_criteria = self.parameters["convergence level"] if self.matrix.is_omx(): self.output = AequilibraeMatrix() self.output.create_from_omx(omx_path=self.matrix.file_path, cores=self.matrix.view_names) self.output.computational_view() else: self.output = self.matrix.copy(self.output_name) if self.nan_as_zero: self.output.matrix_view[:, :] = np.nan_to_num(self.output.matrix_view)[:, :] tot_matrix = np.nansum(self.output.matrix_view[:, :]) # Reporting self.report.append("Target convergence criteria: " + str(conv_criteria)) self.report.append("Maximum iterations: " + str(max_iter)) self.report.append("") self.report.append(f"Rows/columns: {self.vectors.shape[0]}") self.report.append("Total of seed matrix: " + "{:28,.4f}".format(float(tot_matrix))) self.report.append("Total of target vectors: " + "{:25,.4f}".format(float(np.nansum(self.__row_vector)))) self.report.append("") self.report.append("Iteration, Convergence") self.gap = conv_criteria + 1 seed = np.array(self.output.matrix_view[:, :], copy=True) iter, self.gap = ipf_core( seed, self.__row_vector, self.__col_vector, max_iterations=max_iter, tolerance=conv_criteria, cores=self.cpus, ) self.output.matrix_view[:, :] = seed[:, :] self.report.append(str(iter) + " , " + str("{:4,.10f}".format(float(np.nansum(self.gap))))) self.report.append("") self.report.append("Running time: " + str("{:4,.3f}".format(perf_counter() - t)) + "s")
[docs] def save_to_project(self, name: str, file_name: str, project=None) -> MatrixRecord: """Saves the matrix output to the project file :Arguments: **name** (:obj:`str`): Name of the desired matrix record **file_name** (:obj:`str`): Name for the matrix file name. AEM and OMX supported **project** (:obj:`Project`, *Optional*): Project we want to save the results to. Defaults to the active project """ project = project or get_active_project() mats = project.matrices record = mats.new_record(name, file_name, self.output) record.procedure_id = self.procedure_id record.timestamp = self.procedure_date record.procedure = "Iterative Proportional fitting" record.save() return record
def __tot_rows(self, matrix): return np.nansum(matrix, axis=1) def __tot_columns(self, matrix): return np.nansum(matrix, axis=0) def __factor(self, marginals, targets): f = np.divide(targets, marginals) # We compute the factors f[f == np.NINF] = 1 # And treat the errors return f def __get_parameters(self, model): path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) with open(path + "/parameters.yml", "r") as yml: path = yaml.safe_load(yml) self.cpus = int(path["system"]["cpus"]) return path["distribution"][model]