Source code for aequilibrae.paths.linear_approximation

import importlib.util as iutil
import logging
import os
from functools import partial
from pathlib import Path
from tempfile import gettempdir
from typing import List, Dict

import numpy as np

from aequilibrae import global_logger
from aequilibrae.paths.all_or_nothing import allOrNothing
from aequilibrae.paths.results import AssignmentResults
from aequilibrae.paths.traffic_class import TrafficClass
from ..utils import WorkerThread

try:
    from aequilibrae.paths.AoN import linear_combination, linear_combination_skims, aggregate_link_costs
    from aequilibrae.paths.AoN import triple_linear_combination, triple_linear_combination_skims
    from aequilibrae.paths.AoN import copy_one_dimension, copy_two_dimensions, copy_three_dimensions
    from aequilibrae.paths.AoN import sum_a_times_b_minus_c, linear_combination_1d
except ImportError as ie:
    global_logger.warning(f"Could not import procedures from the binary. {ie.args}")

import scipy

if int(scipy.__version__.split(".")[1]) >= 3:
    from scipy.optimize import root_scalar

    recent_scipy = True
else:
    from scipy.optimize import root as root_scalar

    recent_scipy = False
    global_logger.warning("Using older version of Scipy. For better performance, use Scipy >= 1.4")

if False:
    from aequilibrae.paths.traffic_assignment import TrafficAssignment

spec = iutil.find_spec("PyQt5")
pyqt = spec is not None
if pyqt:
    from PyQt5.QtCore import pyqtSignal as SIGNAL


[docs] class LinearApproximation(WorkerThread): if pyqt: equilibration = SIGNAL(object) assignment = SIGNAL(object) def __init__(self, assig_spec, algorithm, project=None) -> None: WorkerThread.__init__(self, None) self.logger = project.logger if project else logging.getLogger("aequilibrae") self.project_path = project.project_base_path if project else gettempdir() self.algorithm = algorithm self.rgap_target = assig_spec.rgap_target self.max_iter = assig_spec.max_iter self.cores = assig_spec.cores self.iteration_issue = [] self.convergence_report = {"iteration": [], "rgap": [], "alpha": [], "warnings": []} if algorithm in ["cfw", "bfw"]: self.convergence_report["beta0"] = [] self.convergence_report["beta1"] = [] self.convergence_report["beta2"] = [] self.assig = assig_spec # type: TrafficAssignment if None in [ assig_spec.classes, assig_spec.vdf, assig_spec.capacity_field, assig_spec.time_field, assig_spec.vdf_parameters, ]: all_par = "Traffic classes, VDF, VDF_parameters, capacity field & time_field" raise Exception( "Parameter missing. Setting the algorithm is the last thing to do " f"when assigning. Check if you have all of these: {all_par}" ) self.traffic_classes = assig_spec.classes # type: List[TrafficClass] self.num_classes = len(assig_spec.classes) self.cap_field = assig_spec.capacity_field self.time_field = assig_spec.time_field self.vdf = assig_spec.vdf self.vdf_parameters = assig_spec.vdf_parameters self.procedure_id = assig_spec.procedure_id self.iter = 0 self.rgap = np.inf self.stepsize = 1.0 self.conjugate_stepsize = 0.0 self.fw_class_flow = 0 # rgap can be a bit wiggly, specifying how many times we need to be below target rgap is a quick way to # ensure a better result. We might want to demand that the solution is that many consecutive times below. self.steps_below_needed_to_terminate = assig_spec.steps_below_needed_to_terminate self.steps_below = 0 # if this is one, we do not have a new direction and will get stuck. Make it 1. self.conjugate_direction_max = 0.99999 # if FW stepsize is zero, we set it to the corresponding MSA stepsize and then need to not make # the step direction conjugate to the previous direction. self.do_fw_step = False self.conjugate_failed = False self.do_conjugate_step = False # BFW specific stuff self.betas = np.array([1.0, 0.0, 0.0]) # Instantiates the arrays that we will use over and over self.capacity = assig_spec.capacity self.free_flow_tt = assig_spec.free_flow_tt self.fw_total_flow = assig_spec.total_flow self.congested_time = assig_spec.congested_time self.vdf_der = np.array(assig_spec.congested_time, copy=True) self.congested_value = np.array(assig_spec.congested_time, copy=True) self.step_direction = {} # type: Dict[AssignmentResults] self.previous_step_direction = {} # type: Dict[AssignmentResults] self.temp_step_direction_for_copy = {} # type: Dict[AssignmentResults] self.aons = {} for c in self.traffic_classes: r = AssignmentResults() r.prepare(c.graph, c.matrix) self.step_direction[c._id] = r if self.algorithm in ["cfw", "bfw"]: for c in self.traffic_classes: for d in [self.step_direction, self.previous_step_direction, self.temp_step_direction_for_copy]: r = AssignmentResults() r.prepare(c.graph, c.matrix) r.compact_link_loads = np.zeros([]) r.compact_total_link_loads = np.zeros([]) d[c._id] = r
[docs] def calculate_conjugate_stepsize(self): self.vdf.apply_derivative( self.vdf_der, self.fw_total_flow, self.capacity, self.free_flow_tt, *self.vdf_parameters, self.cores ) numerator = 0.0 denominator = 0.0 prev_dir_minus_current_sol = {} aon_minus_current_sol = {} aon_minus_prev_dir = {} for c in self.traffic_classes: stp_dir = self.step_direction[c._id] prev_dir_minus_current_sol[c._id] = np.sum(stp_dir.link_loads[:, :] - c.results.link_loads[:, :], axis=1) aon_minus_current_sol[c._id] = np.sum(c._aon_results.link_loads[:, :] - c.results.link_loads[:, :], axis=1) aon_minus_prev_dir[c._id] = np.sum(c._aon_results.link_loads[:, :] - stp_dir.link_loads[:, :], axis=1) for c_0 in self.traffic_classes: for c_1 in self.traffic_classes: numerator += prev_dir_minus_current_sol[c_0._id] * aon_minus_current_sol[c_1._id] denominator += prev_dir_minus_current_sol[c_0._id] * aon_minus_prev_dir[c_1._id] numerator = np.sum(numerator * self.vdf_der) denominator = np.sum(denominator * self.vdf_der) alpha = numerator / denominator if alpha < 0.0: self.conjugate_stepsize = 0.0 elif alpha > self.conjugate_direction_max: self.conjugate_stepsize = self.conjugate_direction_max else: self.conjugate_stepsize = alpha # for reporting, we use a different convention, consistent with BFW: beta_0 corresponds to multiplier for AON; # in calculations we follow the conventions of our TRB paper. self.betas[0] = 1.0 - self.conjugate_stepsize self.betas[1] = self.conjugate_stepsize self.betas[2] = 0.0
[docs] def calculate_biconjugate_direction(self): self.vdf.apply_derivative( self.vdf_der, self.fw_total_flow, self.capacity, self.free_flow_tt, *self.vdf_parameters, self.cores ) mu_numerator = 0.0 mu_denominator = 0.0 nu_nom = 0.0 nu_denom = 0.0 w_ = {} x_ = {} y_ = {} z_ = {} for c in self.traffic_classes: sd = self.step_direction[c._id].link_loads[:, :] psd = self.previous_step_direction[c._id].link_loads[:, :] ll = c.results.link_loads[:, :] x_[c._id] = np.sum(sd * self.stepsize + psd * (1.0 - self.stepsize) - ll, axis=1) y_[c._id] = np.sum(c._aon_results.link_loads[:, :] - ll, axis=1) z_[c._id] = np.sum(sd - ll, axis=1) w_[c._id] = np.sum(psd - sd, axis=1) for c_0 in self.traffic_classes: for c_1 in self.traffic_classes: mu_numerator += x_[c_0._id] * y_[c_1._id] mu_denominator += x_[c_0._id] * w_[c_1._id] nu_nom += z_[c_0._id] * y_[c_1._id] nu_denom += z_[c_0._id] * z_[c_1._id] mu_numerator = np.sum(mu_numerator * self.vdf_der) mu_denominator = np.sum(mu_denominator * self.vdf_der) if mu_denominator == 0.0: mu = 0.0 else: mu = -mu_numerator / mu_denominator mu = max(0.0, mu) nu_nom = np.sum(nu_nom * self.vdf_der) nu_denom = np.sum(nu_denom * self.vdf_der) if nu_denom == 0.0: nu = 0.0 else: nu = -(nu_nom / nu_denom) + mu * self.stepsize / (1.0 - self.stepsize) nu = max(0.0, nu) self.betas[0] = 1.0 / (1.0 + nu + mu) self.betas[1] = nu * self.betas[0] self.betas[2] = mu * self.betas[0]
def __calculate_step_direction(self): """Calculates step direction depending on the method""" sd_flows = [] # 2nd iteration is a fw step. if the previous step replaced the aggregated # solution so far, we need to start anew. if self.iter == 2 or self.stepsize == 1.0 or self.do_fw_step or self.algorithm in ["msa", "frank-wolfe"]: self.do_fw_step = False self.do_conjugate_step = True self.conjugate_stepsize = 0.0 for c in self.traffic_classes: aon_res = c._aon_results stp_dir_res = self.step_direction[c._id] copy_two_dimensions(stp_dir_res.link_loads, aon_res.link_loads, self.cores) stp_dir_res.total_flows() if c.results.num_skims > 0: copy_three_dimensions(stp_dir_res.skims.matrix_view, aon_res.skims.matrix_view, self.cores) sd_flows.append(aon_res.total_link_loads) if c._selected_links: aux_res = self.aons[c._id].aux_res for name, idx in c._aon_results._selected_links.items(): copy_two_dimensions( self.sl_step_dir_ll[c._id][name]["sdr"], np.sum(aux_res.temp_sl_link_loading, axis=0)[idx, :, :], self.cores, ) copy_three_dimensions( self.sl_step_dir_od[c._id][name]["sdr"], np.sum(aux_res.temp_sl_od_matrix, axis=0)[idx, :, :, :], self.cores, ) # 3rd iteration is cfw. also, if we had to reset direction search we need a cfw step before bfw elif (self.iter == 3) or (self.do_conjugate_step) or (self.algorithm == "cfw"): self.do_conjugate_step = False self.calculate_conjugate_stepsize() for c in self.traffic_classes: sdr = self.step_direction[c._id] previous = self.previous_step_direction[c._id] copy_two_dimensions(previous.link_loads, sdr.link_loads, self.cores) previous.total_flows() if c.results.num_skims > 0: copy_three_dimensions(previous.skims.matrix_view, sdr.skims.matrix_view, self.cores) linear_combination( sdr.link_loads, sdr.link_loads, c._aon_results.link_loads, self.conjugate_stepsize, self.cores ) if c.results.num_skims > 0: linear_combination_skims( sdr.skims.matrix_view, sdr.skims.matrix_view, c._aon_results.skims.matrix_view, self.conjugate_stepsize, self.cores, ) if c._selected_links: aux_res = self.aons[c._id].aux_res for name, idx in c._aon_results._selected_links.items(): sl_step_dir_ll = self.sl_step_dir_ll[c._id][name] sl_step_dir_od = self.sl_step_dir_od[c._id][name] copy_two_dimensions( sl_step_dir_ll["prev_sdr"], sl_step_dir_ll["sdr"], self.cores, ) copy_three_dimensions( sl_step_dir_od["prev_sdr"], sl_step_dir_od["sdr"], self.cores, ) linear_combination( sl_step_dir_ll["sdr"], sl_step_dir_ll["sdr"], np.sum(aux_res.temp_sl_link_loading, axis=0)[idx, :, :], self.conjugate_stepsize, self.cores, ) linear_combination_skims( sl_step_dir_od["sdr"], sl_step_dir_od["sdr"], np.sum(aux_res.temp_sl_od_matrix, axis=0)[idx, :, :, :], self.conjugate_stepsize, self.cores, ) sdr.total_flows() sd_flows.append(sdr.total_link_loads) # biconjugate else: self.calculate_biconjugate_direction() # deep copy because we overwrite step_direction but need it on next iteration for c in self.traffic_classes: ppst = self.temp_step_direction_for_copy[c._id] # type: AssignmentResults prev_stp_dir = self.previous_step_direction[c._id] # type: AssignmentResults stp_dir = self.step_direction[c._id] # type: AssignmentResults copy_two_dimensions(ppst.link_loads, stp_dir.link_loads, self.cores) ppst.total_flows() if c.results.num_skims > 0: copy_three_dimensions(ppst.skims.matrix_view, stp_dir.skims.matrix_view, self.cores) triple_linear_combination( stp_dir.link_loads, c._aon_results.link_loads, stp_dir.link_loads, prev_stp_dir.link_loads, self.betas, self.cores, ) stp_dir.total_flows() if c.results.num_skims > 0: triple_linear_combination_skims( stp_dir.skims.matrix_view, c._aon_results.skims.matrix_view, stp_dir.skims.matrix_view, prev_stp_dir.skims.matrix_view, self.betas, self.cores, ) if c._selected_links: aux_res = self.aons[c._id].aux_res for name, idx in c._aon_results._selected_links.items(): sl_step_dir_ll = self.sl_step_dir_ll[c._id][name] sl_step_dir_od = self.sl_step_dir_od[c._id][name] copy_two_dimensions( sl_step_dir_ll["temp_prev_sdr"], sl_step_dir_ll["sdr"], self.cores, ) copy_three_dimensions( sl_step_dir_od["temp_prev_sdr"], sl_step_dir_od["sdr"], self.cores, ) triple_linear_combination( sl_step_dir_ll["sdr"], np.sum(aux_res.temp_sl_link_loading, axis=0)[idx, :, :], sl_step_dir_ll["sdr"], sl_step_dir_ll["prev_sdr"], self.betas, self.cores, ) triple_linear_combination_skims( sl_step_dir_od["sdr"], np.sum(aux_res.temp_sl_od_matrix, axis=0)[idx, :, :, :], sl_step_dir_od["sdr"], sl_step_dir_od["prev_sdr"], self.betas, self.cores, ) copy_two_dimensions( sl_step_dir_ll["prev_sdr"], sl_step_dir_ll["temp_prev_sdr"], self.cores, ) copy_three_dimensions( sl_step_dir_od["prev_sdr"], sl_step_dir_od["temp_prev_sdr"], self.cores, ) sd_flows.append(np.sum(stp_dir.link_loads, axis=1)) copy_two_dimensions(prev_stp_dir.link_loads, ppst.link_loads, self.cores) prev_stp_dir.total_flows() if c.results.num_skims > 0: copy_three_dimensions(prev_stp_dir.skims.matrix_view, ppst.skims.matrix_view, self.cores) self.step_direction_flow = np.sum(sd_flows, axis=0) def __maybe_create_path_file_directories(self): path_base_dir = os.path.join(self.project_path, "path_files", self.procedure_id) for c in self.traffic_classes: if c._aon_results.save_path_file: c._aon_results.path_file_dir = os.path.join( path_base_dir, f"iter{self.iter}", f"path_c{c.mode}_{c._id}" ) Path(c._aon_results.path_file_dir).mkdir(parents=True, exist_ok=True) if self.iter == 1: # save simplified graph correspondences, this could change after assignment c.graph.save_compressed_correspondence(path_base_dir, c.mode, c._id)
[docs] def doWork(self): self.execute()
[docs] def execute(self): # noqa: C901 # We build the fixed cost field self.sl_step_dir_ll = {} self.sl_step_dir_od = {} for c in self.traffic_classes: # Copying select link dictionary that maps name to its relevant matrices into the class' results c._aon_results._selected_links = c._selected_links c.results._selected_links = c._selected_links link_loads_step_dir_shape = ( c.graph.compact_num_links, c.results.classes["number"], ) od_step_dir_shape = ( c.graph.num_zones, c.graph.num_zones, c.results.classes["number"], ) self.sl_step_dir_ll[c._id] = {} self.sl_step_dir_od[c._id] = {} for name in c._selected_links.keys(): self.sl_step_dir_ll[c._id][name] = { "sdr": np.zeros(link_loads_step_dir_shape, dtype=c.graph.default_types("float")), "prev_sdr": np.zeros(link_loads_step_dir_shape, dtype=c.graph.default_types("float")), "temp_prev_sdr": np.zeros(link_loads_step_dir_shape, dtype=c.graph.default_types("float")), } self.sl_step_dir_od[c._id][name] = { "sdr": np.zeros(od_step_dir_shape, dtype=c.graph.default_types("float")), "prev_sdr": np.zeros(od_step_dir_shape, dtype=c.graph.default_types("float")), "temp_prev_sdr": np.zeros(od_step_dir_shape, dtype=c.graph.default_types("float")), } # Sizes the temporary objects used for the results c.results.prepare(c.graph, c.matrix) c._aon_results.prepare(c.graph, c.matrix) c.results.reset() # Prepares the fixed cost to be used if c.fixed_cost_field: # divide fixed cost by volume-dependent prefactor (vot) such that we don't have to do it for # each occurence in the objective funtion. TODO: Need to think about cost skims here, we do # not want this there I think v = c.graph.graph[c.fixed_cost_field].values[:] c.fixed_cost[c.graph.graph.__supernet_id__] = v * c.fc_multiplier / c.vot c.fixed_cost[np.isnan(c.fixed_cost)] = 0 # TODO: Review how to eliminate this. It looks unnecessary # Just need to create some arrays for cost c.graph.set_graph(self.time_field) self.aons[c._id] = allOrNothing(c.matrix, c.graph, c._aon_results) self.logger.info(f"{self.algorithm} Assignment STATS") self.logger.info("Iteration, RelativeGap, stepsize") for self.iter in range(1, self.max_iter + 1): # noqa: B020 self.iteration_issue = [] if pyqt: self.equilibration.emit(["rgap", self.rgap]) self.equilibration.emit(["iterations", self.iter]) aon_flows = [] self.__maybe_create_path_file_directories() for c in self.traffic_classes: # type: TrafficClass # cost = c.fixed_cost / c.vot + self.congested_time # now only once cost = c.fixed_cost + self.congested_time aggregate_link_costs(cost, c.graph.compact_cost, c.results.crosswalk) aon = self.aons[c._id] # This is a new object every iteration, with new aux_res if pyqt: aon.assignment.connect(self.signal_handler) aon.execute() c._aon_results.link_loads *= c.pce c._aon_results.total_flows() aon_flows.append(c._aon_results.total_link_loads) self.aon_total_flow = np.sum(aon_flows, axis=0) flows = [] if self.iter == 1: for c in self.traffic_classes: copy_two_dimensions(c.results.link_loads, c._aon_results.link_loads, self.cores) c.results.total_flows() if c.results.num_skims > 0: copy_three_dimensions(c.results.skims.matrix_view, c._aon_results.skims.matrix_view, self.cores) if c._selected_links: for name, idx in c._aon_results._selected_links.items(): # Copy the temporary results into the final od matrix, referenced by link_set name # The temp has an index associated with the link_set name copy_three_dimensions( c.results.select_link_od.matrix[name], # matrix being written into np.sum(self.aons[c._id].aux_res.temp_sl_od_matrix, axis=0)[ idx, :, :, : ], # results after the iteration self.cores, # core count ) copy_two_dimensions( c.results.select_link_loading[name], # ouput matrix np.sum(self.aons[c._id].aux_res.temp_sl_link_loading, axis=0)[idx, :, :], # matrix 1 self.cores, # core count ) flows.append(c.results.total_link_loads) else: self.__calculate_step_direction() self.calculate_stepsize() for c in self.traffic_classes: stp_dir = self.step_direction[c._id] cls_res = c.results linear_combination( cls_res.link_loads, stp_dir.link_loads, cls_res.link_loads, self.stepsize, self.cores ) if cls_res.num_skims > 0: linear_combination_skims( cls_res.skims.matrix_view, stp_dir.skims.matrix_view, cls_res.skims.matrix_view, self.stepsize, self.cores, ) if c._selected_links: for name, idx in c._aon_results._selected_links.items(): # Copy the temporary results into the final od matrix, referenced by link_set name # The temp flows have an index associated with the link_set name linear_combination_skims( cls_res.select_link_od.matrix[name], # output matrix self.sl_step_dir_od[c._id][name]["sdr"], cls_res.select_link_od.matrix[name], # matrix 2 (previous iteration) self.stepsize, # stepsize self.cores, # core count ) linear_combination( cls_res.select_link_loading[name], # output matrix self.sl_step_dir_ll[c._id][name]["sdr"], cls_res.select_link_loading[name], # matrix 2 (previous iteration) self.stepsize, # stepsize self.cores, # core count ) cls_res.total_flows() flows.append(cls_res.total_link_loads) self.fw_total_flow = np.sum(flows, axis=0) if self.algorithm == "all-or-nothing": break # Check convergence # This needs to be done with the current costs, and not the future ones converged = self.check_convergence() if self.iter > 1 else False self.vdf.apply_vdf( self.congested_time, self.fw_total_flow, self.capacity, self.free_flow_tt, *self.vdf_parameters, self.cores, ) for c in self.traffic_classes: if self.time_field in c.graph.skim_fields: k = c.graph.skim_fields.index(self.time_field) aggregate_link_costs(self.congested_time[:], c.graph.compact_skims[:, k], c.results.crosswalk) self.convergence_report["iteration"].append(self.iter) self.convergence_report["rgap"].append(self.rgap) self.convergence_report["warnings"].append("; ".join(self.iteration_issue)) self.convergence_report["alpha"].append(self.stepsize) if self.algorithm in ["cfw", "bfw"]: self.convergence_report["beta0"].append(self.betas[0]) self.convergence_report["beta1"].append(self.betas[1]) self.convergence_report["beta2"].append(self.betas[2]) self.logger.info(f"{self.iter},{self.rgap},{self.stepsize}") if converged: self.steps_below += 1 if self.steps_below >= self.steps_below_needed_to_terminate: break else: self.steps_below = 0 if self.iter < self.max_iter: for c in self.traffic_classes: c._aon_results.reset() if self.time_field not in c.graph.skim_fields: continue idx = c.graph.skim_fields.index(self.time_field) c.graph.skims[:, idx] = self.congested_time[:] for c in self.traffic_classes: c.results.link_loads /= c.pce c.results.total_flows() if (self.rgap > self.rgap_target) and (self.algorithm != "all-or-nothing"): self.logger.error(f"Desired RGap of {self.rgap_target} was NOT reached") self.logger.info(f"{self.algorithm} Assignment finished. {self.iter} iterations and {self.rgap} final gap") if pyqt: self.equilibration.emit(["rgap", self.rgap]) self.equilibration.emit(["iterations", self.iter]) self.equilibration.emit(["finished_threaded_procedure"])
def __derivative_of_objective_stepsize_dependent(self, stepsize, const_term): """The stepsize-dependent part of the derivative of the objective function. If fixed costs are defined, the corresponding contribution needs to be passed in""" x = np.zeros_like(self.fw_total_flow) linear_combination_1d(x, self.step_direction_flow, self.fw_total_flow, stepsize, self.cores) # x = self.fw_total_flow + stepsize * (self.step_direction_flow - self.fw_total_flow) self.vdf.apply_vdf(self.congested_value, x, self.capacity, self.free_flow_tt, *self.vdf_parameters, self.cores) link_cost_term = sum_a_times_b_minus_c( self.congested_value, self.step_direction_flow, self.fw_total_flow, self.cores ) return link_cost_term + const_term def __derivative_of_objective_stepsize_independent(self): """The part of the derivative of the objective function that does not dependent on stepsize. Non-zero only for fixed cost contributions.""" class_specific_term = 0.0 for c in self.traffic_classes: # fixed cost is scaled by vot class_link_costs = sum_a_times_b_minus_c( c.fixed_cost, self.step_direction[c._id].link_loads[:, 0], c.results.link_loads[:, 0], self.cores ) class_specific_term += class_link_costs return class_specific_term
[docs] def calculate_stepsize(self): """Calculate optimal stepsize in descent direction""" if self.algorithm == "msa": self.stepsize = 1.0 / self.iter return class_specific_term = self.__derivative_of_objective_stepsize_independent() derivative_of_objective = partial( self.__derivative_of_objective_stepsize_dependent, const_term=class_specific_term ) x_tol = max(min(1e-6, self.rgap * 1e-5), 1e-12) try: if recent_scipy: min_res = root_scalar(derivative_of_objective, bracket=[0, 1], xtol=x_tol) self.stepsize = min_res.root if not min_res.converged: self.logger.warning("Descent direction stepsize finder has not converged") else: min_res = root_scalar(derivative_of_objective, 1 / self.iter, xtol=x_tol) if not min_res.success: self.logger.warning("Descent direction stepsize finder has not converged") self.stepsize = min_res.x[0] if self.stepsize <= 0.0 or self.stepsize >= 1.0: raise ValueError("wrong root") self.conjugate_failed = False except ValueError as e: # We can have iterations where the objective function is not *strictly* convex, but the scipy method cannot deal # with this. Stepsize is then either given by 1 or 0, depending on where the objective function is smaller. # However, using zero would mean the overall solution would not get updated, and therefore we assert the stepsize # in order to add a small fraction of the AoN. A heuristic value equal to the corresponding MSA step size # seems to work well in practice. if self.algorithm == "bfw": self.betas.fill(-1) if derivative_of_objective(0.0) < derivative_of_objective(1.0): if self.algorithm == "frank-wolfe" or self.conjugate_failed: tiny_step = 1e-2 / self.iter # use a fraction of the MSA stepsize. We observe that using 1e-4 # works well in practice, however for a large number of iterations this might be too much so # use this heuristic instead. self.logger.warning(f"# Alert: Adding {tiny_step} as step size to make it non-zero. {e.args}") self.stepsize = tiny_step else: self.stepsize = 0.0 # need to reset conjugate / bi-conjugate direction search self.do_fw_step = True self.conjugate_failed = True self.iteration_issue.append("Found bad conjugate direction step. Performing FW search. {e.args}") # By doing it recursively, we avoid doing the same AoN again self.__calculate_step_direction() self.calculate_stepsize() else: # Do we want to keep some of the old solution, or just throw away everything? self.stepsize = 1.0 assert 0 <= self.stepsize <= 1.0
[docs] def check_convergence(self): """Calculate relative gap and return True if it is smaller than desired precision""" aon_cost = np.sum(self.congested_time * self.aon_total_flow) current_cost = np.sum(self.congested_time * self.fw_total_flow) self.rgap = abs(current_cost - aon_cost) / current_cost if self.rgap_target >= self.rgap: return True return False
[docs] def signal_handler(self, val): if pyqt: self.assignment.emit(val)