"""
Algorithms to **calibrate** synthetic gravity models with power and exponential functions
The procedures implemented in this code are some of those suggested in
Modelling Transport, 4th Edition, Ortuzar and Willumsen, Wiley 2011
"""
from time import perf_counter
import numpy as np
import pandas as pd
from aequilibrae.distribution.gravity_application import GravityApplication, SyntheticGravityModel
from aequilibrae.matrix import AequilibraeMatrix
from aequilibrae.parameters import Parameters
[docs]
class GravityCalibration:
"""Calibrate a traditional gravity model
Available deterrence function forms are: 'EXPO', 'POWER' or 'GAMMA'.
.. code-block:: python
>>> from aequilibrae.distribution import GravityCalibration
>>> project = create_example(project_path)
# We load the demand matrix
>>> demand = project.matrices.get_matrix("demand_omx")
>>> demand.computational_view()
# We load the skim matrix
>>> skim = project.matrices.get_matrix("skims")
>>> skim.computational_view(["time_final"])
>>> args = {"matrix": demand,
... "impedance": skim,
... "row_field": "productions",
... "function": 'expo',
... "nan_as_zero": True}
>>> gravity = GravityCalibration(**args)
# Solve and save outputs
>>> gravity.calibrate()
>>> gravity.model.save(os.path.join(project_path, 'dist_expo_model.mod'))
"""
[docs]
def __init__(self, project=None, **kwargs):
"""
Instantiates the Gravity calibration problem
:Arguments:
**matrix** (:obj:`AequilibraeMatrix`): Seed/base trip matrix
**impedance** (:obj:`AequilibraeMatrix`): Impedance matrix to be used
**function** (:obj:`str`): Function name to be calibrated. "EXPO" or "POWER"
**project** (:obj:`Project`, *Optional*): The Project to connect to.
By default, uses the currently active project
**parameters** (:obj:`str`, *Optional*): Convergence parameters. Defaults to those in the parameter file
**nan_as_zero** (:obj:`bool`, *Optional*): If Nan values should be treated as zero. Defaults to ``True``
:Results:
**model** (:obj:`SyntheticGravityModel`): Calibrated model
**report** (:obj:`list`): Iteration and convergence report
**error** (:obj:`str`): Error description
"""
self.project = project
self.__required_parameters = ["max trip length", "max iterations", "max error"]
self.parameters = kwargs.get("parameters", self.__get_parameters())
self.nan_as_zero = kwargs.get("nan_as_zero", False)
self.matrix = kwargs.get("matrix") # type: AequilibraeMatrix
self.impedance = kwargs.get("impedance") # type: AequilibraeMatrix
deterrence_function = str(kwargs.get("function", "")).upper()
if self.nan_as_zero:
self.matrix = self.matrix.copy(memory_only=True)
self.impedance = self.impedance.copy(memory_only=True)
self.result_matrix = None
self.vectors: pd.DataFrame = pd.DataFrame([])
self.gap = np.inf
self.error = None
self.gravity = None
self.comput_core = None
self.impedance_core = None
self.itera = 0
self.max_iter = None
self.max_error = None
self.gap = np.inf
self.report = [" ##### GRAVITY CALIBRATION ##### ", ""]
self.report.append("Functional form: " + deterrence_function)
self.model = SyntheticGravityModel()
if deterrence_function not in self.model.valid_functions:
raise ValueError("Function needs to be one of these: " + ", ".join(self.model.valid_functions))
else:
self.model.function = deterrence_function
def __assemble_model(self, b1):
# NEED TO SET PARAMETERS #
if self.model.function == "EXPO":
self.model.beta = float(b1)
elif self.model.function == "POWER":
self.model.alpha = float(b1)
[docs]
def calibrate(self):
"""Calibrate the model
Resulting model is in *output* class member
"""
t = perf_counter()
# initialize auxiliary variables
max_cost = self.parameters["max trip length"]
self.max_iter = self.parameters["max iterations"]
self.max_error = self.parameters["max error"]
# Check the inputs
self.__check_inputs()
if self.model.function in ["EXPO", "POWER"]:
# filtering for all costs over limit
a = 1
if max_cost > 0:
a = (self.impedance.matrix_view[:, :] < max_cost).astype(int)
# weighted average cost
self.report.append("Iteration: 1")
cstar = np.nansum(self.impedance.matrix_view[:, :] * self.result_matrix.gravity[:, :] * a) / np.nansum(
self.result_matrix.gravity[:, :] * a
)
b0 = 1 / cstar
self.__assemble_model(b0)
c0 = self.__apply_gravity()
for i in self.gravity.report:
self.report.append(" " + i)
self.report.append("")
self.report.append("")
bm1 = b0
bm = b0 * c0 / cstar
self.report.append("Iteration: 2")
self.__assemble_model(bm)
cm = self.__apply_gravity()
for i in self.gravity.report:
self.report.append(" " + i)
self.report.append("Error: " + "{:.2E}".format(float(np.nansum(abs((bm / bm1) - 1)))))
self.report.append("")
cm1 = c0
# While the max iterations has not been reached and the error is still too large
self.itera = 2
while self.itera < self.max_iter and self.gap > self.max_error:
self.report.append("Iteration: " + str(self.itera + 1))
aux = bm
bm = ((cstar - cm1) * bm - (cstar - cm) * bm) / (cm - cm1)
bm1 = aux
cm1 = cm
self.__assemble_model(bm1)
cm = self.__apply_gravity()
for i in self.gravity.report:
self.report.append(" " + i)
self.report.append("Error: " + "{:.2E}".format(float(np.nansum(abs((bm / bm1) - 1)))))
self.report.append("")
# compute convergence criteria
self.gap = abs((bm / bm1) - 1)
self.itera += 1
if self.itera == self.max_iter:
self.report.append(
"DID NOT CONVERGE. Stopped in " + str(self.itera) + " with a global error of " + str(self.gap)
)
else:
self.report.append("Converged in " + str(self.itera) + " iterations to a global error of " + str(self.gap))
s = perf_counter() - t
m, s1 = divmod(s, 60)
s -= m * 60
h, m = divmod(m, 60)
t = "%d:%02d:%2.4f" % (h, m, s)
self.report.append("Running time: " + t)
def __check_inputs(self):
if not isinstance(self.impedance, AequilibraeMatrix):
raise TypeError("Impedance matrix needs to be an instance of AequilibraEMatrix")
if not isinstance(self.matrix, AequilibraeMatrix):
raise TypeError("Observed matrix needs to be an instance of AequilibraEMatrix")
# Check data dimensions
if not np.array_equal(self.impedance.index, self.impedance.index):
raise ValueError("Indices from impedance matrix do not match those from seed matrix")
# Check if matrices were set for computation
mats = [(self.matrix, "Observed matrix"), (self.impedance, "Impedance matrix")]
for matrix, title in mats:
if matrix.matrix_view is None:
raise ValueError(f"{title} needs to be set for computation")
if matrix.matrix_view.ndim > 2:
raise ValueError(f"{title} computational view needs to be set for a single matrix core")
if np.nansum(matrix.matrix_view.data) == 0:
raise ValueError(f"{title} has only zero values")
if np.nanmin(matrix.matrix_view.data) < 0:
raise ValueError(f"{title} has negative values")
# Augment parameters if we happen to have only passed one
default_parameters = self.__get_parameters()
for para in self.__required_parameters:
if para not in self.parameters:
self.parameters[para] = default_parameters[para]
# Prepare the data for computation
self.comput_core = self.matrix.view_names[0]
self.result_matrix = self.matrix.copy(cores=[self.comput_core], names=["gravity"], memory_only=True)
self.vectors = pd.DataFrame(
{"rows": self.matrix.rows()[:], "columns": self.matrix.columns()[:]}, index=self.matrix.index[:]
)
self.impedance_core = self.impedance.view_names[0]
def __apply_gravity(self):
args = {
"impedance": self.impedance,
"vectors": self.vectors,
"row_field": "rows",
"column_field": "columns",
"model": self.model,
"parameters": self.parameters,
"nan_as_zero": self.nan_as_zero,
}
self.gravity = GravityApplication(self.project, **args)
self.gravity.apply()
self.result_matrix = self.gravity.output
return np.nansum(self.impedance.matrix_view[:, :] * self.result_matrix.gravity[:, :]) / np.nansum(
self.result_matrix.gravity[:, :]
)
def __get_parameters(self):
par = Parameters().parameters
para = par["distribution"]["ipf"].copy()
para.update(par["distribution"]["gravity"])
return para