import multiprocessing as mp
from abc import ABC, abstractmethod
import numpy as np
import pandas as pd
from aequilibrae.paths.AoN import sum_axis1, assign_link_loads
from aequilibrae.matrix import AequilibraeMatrix
from aequilibrae.parameters import Parameters
from aequilibrae.paths.graph import Graph, TransitGraph, GraphBase, _get_graph_to_network_mapping
"""
TO-DO:
1. Create a file type for memory-mapped path files
Same idea of the AequilibraEData container, but using the format.memmap from NumPy
2. Make the writing to SQL faster by disabling all checks before the actual writing
"""
class AssignmentResultsBase(ABC):
"""Assignment results base class for traffic and transit assignments."""
def __init__(self):
self.link_loads = np.array([]) # The actual results for assignment
self.no_path = None # The list os paths
self.num_skims = 0 # number of skims that will be computed. Depends on the setting of the graph provided
p = Parameters().parameters["system"]["cpus"]
if not isinstance(p, int):
p = 0
self.set_cores(p)
self.nodes = -1
self.zones = -1
self.links = -1
self.lids = None
@abstractmethod
def prepare(self, graph: GraphBase, matrix: AequilibraeMatrix) -> None:
pass
@abstractmethod
def reset(self) -> None:
pass
def set_cores(self, cores: int) -> None:
"""
Sets number of cores (threads) to be used in computation
Value of zero sets number of threads to all available in the system, while negative values indicate the number
of threads to be left out of the computational effort.
Resulting number of cores will be adjusted to a minimum of zero or the maximum available in the system if the
inputs result in values outside those limits
:Arguments:
**cores** (:obj:`int`): Number of cores to be used in computation
"""
if not isinstance(cores, int):
raise ValueError("Number of cores needs to be an integer")
if cores < 0:
self.cores = max(1, mp.cpu_count() + cores)
elif cores == 0:
self.cores = mp.cpu_count()
elif cores > 0:
cores = min(mp.cpu_count(), cores)
if self.cores != cores:
self.cores = cores
if self.link_loads.shape[0]:
self.__redim()
[docs]
class AssignmentResults(AssignmentResultsBase):
"""
Assignment result holder for a single :obj:`TrafficClass` with multiple user classes
"""
[docs]
def __init__(self):
super().__init__()
self.compact_link_loads = np.array([]) # Results for assignment on simplified graph
self.compact_total_link_loads = np.array([]) # Results for all user classes summed on simplified graph
self.crosswalk = np.array([]) # crosswalk between compact graph link IDs and actual link IDs
self.skims = AequilibraeMatrix() # The array of skims
self.total_link_loads = np.array([]) # The result of the assignment for all user classes summed
self.compact_links = -1
self.compact_nodes = -1
self.direcs = None
self.classes = {"number": 1, "names": ["flow"]}
self._selected_links = {}
self.select_link_od = None
self.select_link_loading = {}
self._graph_id = None
self.__float_type = None
self.__integer_type = None
# save path files. Need extra metadata for file paths
self.save_path_file = False
self.path_file_dir = None
self.write_feather = True # we use feather as default, parquet is slower but with better compression
# In case we want to do by hand, we can prepare each method individually
[docs]
def prepare(self, graph: Graph, matrix: AequilibraeMatrix) -> None:
"""
Prepares the object with dimensions corresponding to the assignment matrix and graph objects
:Arguments:
**graph** (:obj:`Graph`): Needs to have been set with number of centroids and list of skims (if any)
**matrix** (:obj:`AequilibraeMatrix`): Matrix properly set for computation with
``matrix.computational_view(:obj:`list`)``
"""
self.__float_type = graph.default_types("float")
self.__integer_type = graph.default_types("int")
if matrix.view_names is None:
raise ValueError("Please set the matrix_procedures computational view")
self.classes["number"] = 1
if len(matrix.matrix_view.shape) > 2:
self.classes["number"] = matrix.matrix_view.shape[2]
self.classes["names"] = matrix.view_names
if graph is None:
raise ValueError("Please provide a graph")
self.compact_nodes = graph.compact_num_nodes
self.compact_links = graph.compact_num_links
self.nodes = graph.num_nodes
self.zones = graph.num_zones
self.centroids = graph.centroids
self.links = graph.num_links
self.num_skims = len(graph.skim_fields)
self.skim_names = list(graph.skim_fields)
self.lids = graph.graph.link_id.values
self.direcs = graph.graph.direction.values
self.crosswalk = np.zeros(graph.graph.shape[0], self.__integer_type)
self.crosswalk[graph.graph.__supernet_id__.values] = graph.graph.__compressed_id__.values
self._graph_ids = graph.graph.__supernet_id__.values
self._graph_compressed_ids = graph.graph.__compressed_id__.values
self.__redim()
self._graph_id = graph._id
if self._selected_links:
self.select_link_od = AequilibraeMatrix()
self.select_link_od.create_empty(
memory_only=True,
zones=matrix.zones,
matrix_names=list(self._selected_links.keys()),
index_names=matrix.index_names,
)
self.select_link_loading = {}
# Combine each set of selected links into one large matrix that can be parsed into Cython
# Each row corresponds a link set, and the equivalent rows in temp_sl_od_matrix and temp_sl_link_loading
# Correspond to that set
self.select_links = np.full(
(len(self._selected_links), max([len(x) for x in self._selected_links.values()])),
-1,
dtype=graph.default_types("int"),
)
sl_idx = {}
for i, (name, arr) in enumerate(self._selected_links.items()):
sl_idx[name] = i
# Filling select_links array with linksets. Note the default value is -1, which is used as a placeholder
# It also denotes when the given row has no more selected links, since Cython cannot handle
# Multidimensional arrays where each row has different lengths
self.select_links[i][: len(arr)] = arr
# Correctly sets the dimensions for the final output matrices
self.select_link_od.matrix[name] = np.zeros(
(graph.num_zones, graph.num_zones, self.classes["number"]),
dtype=graph.default_types("float"),
)
self.select_link_loading[name] = np.zeros(
(graph.compact_num_links, self.classes["number"]),
dtype=graph.default_types("float"),
)
# Overwrites previous arrays on assignment results level with the index to access that array in Cython
self._selected_links = sl_idx
[docs]
def reset(self) -> None:
"""
Resets object to prepared and pre-computation state
"""
if self.num_skims > 0:
self.skims.matrices.fill(0)
if self.link_loads is not None:
self.no_path.fill(0)
self.link_loads.fill(0)
self.total_link_loads.fill(0)
self.compact_link_loads.fill(0)
self.compact_total_link_loads.fill(0)
else:
raise ValueError("Exception: Assignment results object was not yet prepared/initialized")
def __redim(self):
self.compact_link_loads = np.zeros((self.compact_links + 1, self.classes["number"]), self.__float_type)
self.compact_total_link_loads = np.zeros(self.compact_links, self.__float_type)
self.link_loads = np.zeros((self.links, self.classes["number"]), self.__float_type)
self.total_link_loads = np.zeros(self.links, self.__float_type)
self.no_path = np.zeros((self.zones, self.zones), dtype=self.__integer_type)
if self.num_skims > 0:
self.skims = AequilibraeMatrix()
self.skims.create_empty(file_name=self.skims.random_name(), zones=self.zones, matrix_names=self.skim_names)
self.skims.index[:] = self.centroids[:]
self.skims.computational_view()
if len(self.skims.matrix_view.shape[:]) == 2:
self.skims.matrix_view = self.skims.matrix_view.reshape((self.zones, self.zones, 1))
else:
self.skims = AequilibraeMatrix()
self.skims.matrix_view = np.array((1, 1, 1))
self.reset()
[docs]
def total_flows(self) -> None:
"""
Totals all link flows for this class into a single link load
Results are placed into *total_link_loads* class member
"""
sum_axis1(self.total_link_loads, self.link_loads, self.cores)
[docs]
def get_graph_to_network_mapping(self):
return _get_graph_to_network_mapping(self.lids, self.direcs)
[docs]
def get_load_results(self) -> pd.DataFrame:
"""
Translates the assignment results from the graph format into the network format
:Returns:
**dataset** (:obj:`pd.DataFrame`): Pandas DataFrame data with the traffic class assignment results
"""
# Get a mapping from the compressed graph to/from the network graph
m = self.get_graph_to_network_mapping()
recs = np.unique(self.lids).shape[0]
# Link flows
link_flows = self.link_loads[self._graph_ids, :]
aux = {}
for i, n in enumerate(self.classes["names"]):
# Directional Flows
aux[n + "_ab"] = np.zeros(recs, self.__float_type)
aux[n + "_ab"][m.network_ab_idx] = np.nan_to_num(link_flows[m.graph_ab_idx, i])
aux[n + "_ba"] = np.zeros(recs, self.__float_type)
aux[n + "_ba"][m.network_ba_idx] = np.nan_to_num(link_flows[m.graph_ba_idx, i])
# Tot Flow
aux[n + "_tot"] = np.nan_to_num(aux[n + "_ab"]) + np.nan_to_num(aux[n + "_ba"])
return pd.DataFrame(aux, index=np.unique(self.lids))
[docs]
def get_sl_results(self) -> pd.DataFrame:
# Set up the name for each column. Each set of select links has a column for ab, ba, total flows
# for each subclass contained in the TrafficClass
fields = [
e
for name in self._selected_links.keys()
for n in self.classes["names"]
for e in [f"{name}_{n}_ab", f"{name}_{n}_ba", f"{name}_{n}_tot"]
]
res = pd.DataFrame([], columns=fields, index=np.unique(self.lids))
m = self.get_graph_to_network_mapping()
for name in self._selected_links.keys():
# Link flows initialised
link_flows = np.full((self.links, self.classes["number"]), np.nan)
# maps link flows from the compressed graph to the uncompressed graph
assign_link_loads(link_flows, self.select_link_loading[name], self._graph_compressed_ids, self.cores)
for i, n in enumerate(self.classes["names"]):
# Directional Flows
res[f"{name}_{n}_ab"].values[m.network_ab_idx] = link_flows[m.graph_ab_idx, i]
res[f"{name}_{n}_ba"].values[m.network_ba_idx] = link_flows[m.graph_ba_idx, i]
# Tot Flow
res[f"{name}_{n}_tot"] = np.nansum(res[[f"{name}_{n}_ab", f"{name}_{n}_ba"]].to_numpy(), axis=1)
return res
[docs]
class TransitAssignmentResults(AssignmentResultsBase):
"""
Assignment result holder for a single :obj:`Transit`
"""
[docs]
def __init__(self):
super().__init__()
self.link_loads = np.array([])
# self.skims = AequilibraeMatrix()
[docs]
def prepare(self, graph: TransitGraph, matrix: AequilibraeMatrix) -> None:
"""
Prepares the object with dimensions corresponding to the assignment matrix and graph objects
:Arguments:
**graph** (:obj:`TransitGraph`): Needs to have been set with number of centroids
**matrix** (:obj:`AequilibraeMatrix`): Matrix properly set for computation with
``matrix.computational_view(:obj:`list`)``
"""
self.reset()
self.nodes = graph.num_nodes
self.zones = graph.num_zones
self.centroids = graph.centroids
self.links = graph.num_links
self.lids = graph.graph.link_id.values
[docs]
def reset(self) -> None:
"""
Resets object to prepared and pre-computation state
"""
# Since all memory for the assignment is managed by the HyperpathGenerating
# object we don't need to do much here
self.link_loads.fill(0)
[docs]
def get_load_results(self) -> pd.DataFrame:
"""
Translates the assignment results from the graph format into the network format
:Returns:
**dataset** (:obj:`pd.DataFrame`): DataFrame data with the transit class assignment results
"""
if not self.link_loads.shape[0]:
raise ValueError("Transit assignment has not been executed yet")
return pd.DataFrame({"volume": self.link_loads}, index=self.lids)