import pickle
import uuid
from abc import ABC
from datetime import datetime
from os.path import join
from typing import List, Tuple, Optional
import dataclasses
import numpy as np
import pandas as pd
from aequilibrae.paths.graph_building import build_compressed_graph, create_compressed_link_network_mapping
from aequilibrae.context import get_logger
@dataclasses.dataclass
class NetworkGraphIndices:
network_ab_idx: np.array
network_ba_idx: np.array
graph_ab_idx: np.array
graph_ba_idx: np.array
def _get_graph_to_network_mapping(lids, direcs):
num_uncompressed_links = int(np.unique(lids).shape[0])
indexing = np.zeros(int(lids.max()) + 1, np.uint64)
indexing[np.unique(lids)[:]] = np.arange(num_uncompressed_links)
graph_ab_idx = direcs > 0
graph_ba_idx = direcs < 0
network_ab_idx = indexing[lids[graph_ab_idx]]
network_ba_idx = indexing[lids[graph_ba_idx]]
return NetworkGraphIndices(network_ab_idx, network_ba_idx, graph_ab_idx, graph_ba_idx)
class GraphBase(ABC): # noqa: B024
"""
Graph class.
AequilibraE graphs implement two forms of compression.
- link contraction, and
- dead end removal.
Link contraction creates a topological equivalent graph by contracting sequences of links between nodes
with degrees of two. This compresses long streams of links, such as along highways or curved roads, into
single links.
Dead end removal attempts to remove dead ends and fish spines from the network. It does this based on the
observation that in a graph with non-negative weights a dead end will only ever appear in the results of a
short(est) path if the origin or destination is present within that dead end.
Dead end removal is applied before link contraction and does not create a strictly topological equivalent graph,
however, all centroids are preserved.
The compressed graph is used internally.
"""
def __init__(self, logger=None):
self.logger = logger or get_logger()
self.__integer_type = np.int64
self.__float_type = np.float64
self.required_default_fields = ["link_id", "a_node", "b_node", "direction", "id"]
self.__required_default_types = [
self.__integer_type,
self.__integer_type,
self.__integer_type,
np.int8,
self.__integer_type,
]
self.other_fields = ""
self.mode = ""
self.date = str(datetime.now())
self.description = "No description added so far"
self.num_links = -1
self.num_nodes = -1
self.num_zones = -1
self.compact_num_links = -1
self.compact_num_nodes = -1
self.network = pd.DataFrame([]) # This method will hold ALL information on the network
self.graph = pd.DataFrame([]) # This method will hold an array with ALL fields in the graph.
self.compact_graph = pd.DataFrame([]) # This method will hold an array with ALL fields in the graph.
# These are the fields actually used in computing paths
self.all_nodes = np.array(0) # Holds an array with all nodes in the original network
self.nodes_to_indices = np.array(0, np.int64) # Holds the reverse of the all_nodes
self.fs = np.array([]) # This method will hold the forward star for the graph
self.cost = np.array([]) # This array holds the values being used in the shortest path routine
self.skims = None
self.lonlat_index = pd.DataFrame([]) # Holds a node_id to lon/lat coord index for nodes within this graph
self.compact_all_nodes = np.array(0) # Holds an array with all nodes in the original network
self.compact_nodes_to_indices = np.array(0) # Holds the reverse of the all_nodes
self.compact_fs = np.array([]) # This method will hold the forward star for the graph
self.compact_cost = np.array([]) # This array holds the values being used in the shortest path routine
self.compact_skims = None
self.capacity = np.array([]) # Array holds the capacity for links
self.free_flow_time = np.array([]) # Array holds the free flow travel time by link
# sake of the Cython code
self.skim_fields = [] # List of skim fields to be used in computation
self.cost_field = False # Name of the cost field
self.block_centroid_flows = True
self.penalty_through_centroids = np.inf
self.centroids = None # NumPy array of centroid IDs
self.g_link_crosswalk = np.array([]) # 4 a link ID in the BIG graph, a corresponding link in the compressed 1
self.dead_end_links = np.array([])
self.compressed_link_network_mapping_idx = None
self.compressed_link_network_mapping_data = None
# Randomly generate a unique Graph ID randomly
self._id = uuid.uuid4().hex
def default_types(self, tp: str):
"""
Returns the default integer and float types used for computation
:Arguments:
**tp** (:obj:`str`): data type. 'int' or 'float'
"""
if tp == "int":
return self.__integer_type
elif tp == "float":
return self.__float_type
else:
raise ValueError("It must be either a int or a float")
def prepare_graph(self, centroids: Optional[np.ndarray] = None) -> None:
"""
Prepares the graph for a computation for a certain set of centroids
Under the hood, if sets all centroids to have IDs from 1 through **n**,
which should correspond to the index of the matrix being assigned.
This is what enables having any node IDs as centroids, and it relies on
the inference that all links connected to these nodes are centroid
connectors.
:Arguments:
**centroids** (:obj:`np.ndarray`): Array with centroid IDs. Mandatory type Int64, unique and positive
"""
self.__network_error_checking__()
# Creates the centroids
if centroids is not None:
if not np.issubdtype(centroids.dtype, np.integer):
raise ValueError("Centroids need to be a NumPy array of integers 64 bits")
if centroids.shape[0] == 0:
raise ValueError("You need at least one centroid")
if centroids.min() <= 0:
raise ValueError("Centroid IDs need to be positive")
if centroids.shape[0] != np.unique(centroids).shape[0]:
raise ValueError("Centroid IDs are not unique")
self.centroids = np.array(centroids, np.uint32)
else:
self.centroids = np.array([], np.uint32)
self.network = self.network.astype(
{
"direction": np.int8,
"a_node": self.__integer_type,
"b_node": self.__integer_type,
"link_id": self.__integer_type,
}
)
properties = self._build_directed_graph(self.network, self.centroids)
self.all_nodes, self.num_nodes, self.nodes_to_indices, self.fs, self.graph = properties
# We generate IDs that we KNOW will be constant across modes
self.graph.sort_values(by=["link_id", "direction"], inplace=True)
self.graph["__supernet_id__"] = np.arange(self.graph.shape[0]).astype(self.__integer_type)
self.graph.sort_values(by=["a_node", "b_node"], inplace=True)
self.num_links = self.graph.shape[0]
self.__build_derived_properties()
if self.centroids.shape[0]:
self.__build_compressed_graph()
self.compact_num_links = self.compact_graph.shape[0]
# The cache property should be recalculated when the graph has been re-prepared
self.compressed_link_network_mapping_idx = None
self.compressed_link_network_mapping_data = None
self.network_compressed_node_mapping = None
def __build_compressed_graph(self):
build_compressed_graph(self)
# We build a groupby to save time later
self.__graph_groupby = self.graph.groupby(["__compressed_id__"])
def _build_directed_graph(self, network: pd.DataFrame, centroids: np.ndarray):
all_titles = list(network.columns)
not_pos = network.loc[network.direction != 1, :]
not_negs = network.loc[network.direction != -1, :]
names, types = self.__build_column_names(all_titles)
neg_names = []
for name in names:
if name in not_pos.columns:
neg_names.append(name)
elif name + "_ba" in not_pos.columns:
neg_names.append(name + "_ba")
not_pos = pd.DataFrame(not_pos, copy=True)[neg_names]
not_pos.columns = names
# Swap the a and b nodes of these edges. Direction is used for mapping the graph.graph back to the network. It does not indicate the direction of the link.
not_pos.loc[:, "direction"] = -1
aux = np.array(not_pos.a_node.values, copy=True)
not_pos.loc[:, "a_node"] = not_pos.loc[:, "b_node"]
not_pos.loc[:, "b_node"] = aux[:]
del aux
pos_names = []
for name in names:
if name in not_negs.columns:
pos_names.append(name)
elif name + "_ab" in not_negs.columns:
pos_names.append(name + "_ab")
not_negs = pd.DataFrame(not_negs, copy=True)[pos_names]
not_negs.columns = names
not_negs.loc[:, "direction"] = 1
df = pd.concat([not_negs, not_pos])
# Now we take care of centroids
nodes = np.unique(np.hstack((df.a_node.values, df.b_node.values))).astype(self.__integer_type)
nodes = np.setdiff1d(nodes, centroids, assume_unique=True)
all_nodes = np.hstack((centroids, nodes)).astype(self.__integer_type)
num_nodes = all_nodes.shape[0]
nodes_to_indices = np.full(int(all_nodes.max()) + 1, -1, dtype=np.int64)
nlist = np.arange(num_nodes)
nodes_to_indices[all_nodes] = nlist
df.a_node = nodes_to_indices[df.a_node.values]
df.b_node = nodes_to_indices[df.b_node.values]
df = df.sort_values(by=["a_node", "b_node"])
df.index = np.arange(df.shape[0])
df["id"] = np.arange(df.shape[0])
fs = np.empty(num_nodes + 1, dtype=self.__integer_type)
fs.fill(-1)
y, x, _ = np.intersect1d(df.a_node.values, nlist, assume_unique=False, return_indices=True)
fs[y] = x[:]
fs[-1] = df.shape[0]
for i in range(num_nodes, 1, -1):
if fs[i - 1] == -1:
fs[i - 1] = fs[i]
nans = ", ".join([i for i in df.columns if df[i].isnull().any().any()])
if nans:
self.logger.warning(f"Field(s) {nans} has(ve) at least one NaN value. Check your computations")
df["link_id"] = df["link_id"].astype(self.__integer_type)
df["b_node"] = df.b_node.values.astype(self.__integer_type)
df["id"] = df.id.values.astype(self.__integer_type)
df["direction"] = df.direction.values.astype(np.int8)
return all_nodes, num_nodes, nodes_to_indices, fs, df
def exclude_links(self, links: list) -> None:
"""
Excludes a list of links from a graph by setting their B node equal to their A node
:Arguments:
**links** (:obj:`list`): List of link IDs to be excluded from the graph
"""
filter = self.network.link_id.isin(links)
# We check is the list makes sense in order to warn the user
if filter.sum() != len(set(links)):
self.logger.warning("At least one link does not exist in the network and therefore cannot be excluded")
self.network.loc[filter, "b_node"] = self.network.loc[filter, "a_node"]
if self.centroids is not None:
self.prepare_graph(self.centroids)
self.set_blocked_centroid_flows(self.block_centroid_flows)
self._id = uuid.uuid4().hex
def __build_column_names(self, all_titles: List[str]) -> Tuple[list, list]:
fields = list(self.required_default_fields)
types = list(self.__required_default_types)
for column in all_titles:
if column not in self.required_default_fields and column[0:-3] not in self.required_default_fields:
if column[-3:] == "_ab":
if column[:-3] + "_ba" in all_titles:
fields.append(column[:-3])
types.append(self.network[column].dtype)
else:
raise ValueError("Field {} exists for ab direction but does not exist for ba".format(column))
elif column[-3:] == "_ba":
if column[:-3] + "_ab" not in all_titles:
raise ValueError("Field {} exists for ba direction but does not exist for ab".format(column))
else:
fields.append(column)
types.append(self.network[column].dtype)
return fields, types
def __build_dtype(self, all_titles) -> list:
dtype = [
("link_id", self.__integer_type),
("a_node", self.__integer_type),
("b_node", self.__integer_type),
("direction", np.int8),
("id", self.__integer_type),
]
for i in all_titles:
if i not in self.required_default_fields and i[0:-3] not in self.required_default_fields:
if i[-3:] == "_ab":
if i[:-3] + "_ba" in all_titles:
dtype.append((i[:-3], self.network[i].dtype))
else:
raise ValueError("Field {} exists for ab direction but does not exist for ba".format(i))
elif i[-3:] == "_ba":
if i[:-3] + "_ab" not in all_titles:
raise ValueError("Field {} exists for ba direction but does not exist for ab".format(i))
else:
dtype.append((i, self.network[i].dtype))
return dtype
def set_graph(self, cost_field) -> None:
"""
Sets the field to be used for path computation
:Arguments:
**cost_field** (:obj:`str`): Field name. Must be numeric
"""
if cost_field not in self.graph.columns:
raise ValueError("cost_field not available in the graph:" + str(self.graph.columns))
self.cost_field = cost_field
# We only have a compact graph if we have added centroids, as that's used for skimming and assignment
if not self.compact_graph.empty:
self.compact_cost = np.zeros(self.compact_graph.id.max() + 2, self.__float_type)
df = self.__graph_groupby.sum(numeric_only=True)[[cost_field]].reset_index()
self.compact_cost[df.index.values] = df[cost_field].values
if self.graph[cost_field].dtype == self.__float_type:
self.cost = np.array(self.graph[cost_field].values, copy=True)
else:
self.cost = np.array(self.graph[cost_field].values, dtype=self.__float_type)
self.logger.warning("Cost field with wrong type. Converting to float64")
self.__build_derived_properties()
def set_skimming(self, skim_fields: list) -> None:
"""
Sets the list of skims to be computed
Skimming with A* may produce results that differ from traditional Dijkstra's due to its use a heuristic.
:Arguments:
**skim_fields** (:obj:`list`): Fields must be numeric
"""
if not skim_fields:
self.skim_fields = []
self.skims = np.array([])
if isinstance(skim_fields, str):
skim_fields = [skim_fields]
elif not isinstance(skim_fields, list):
raise ValueError("You need to provide a list of skims or the same of a single field")
# Check if list of fields make sense
k = [x for x in skim_fields if x not in self.graph.columns]
if k:
raise ValueError("At least one of the skim fields does not exist in the graph: {}".format(",".join(k)))
self.compact_skims = np.zeros((self.compact_num_links + 1, len(skim_fields) + 1), self.__float_type)
df = self.__graph_groupby.sum(numeric_only=True)[skim_fields].reset_index()
for i, skm in enumerate(skim_fields):
self.compact_skims[df.index.values, i] = df[skm].values.astype(self.__float_type)
self.skims = np.zeros((self.num_links, len(skim_fields) + 1), self.__float_type)
t = [x for x in skim_fields if self.graph[x].dtype != self.__float_type]
if t:
Warning("Some skim field with wrong type. Converting to float64")
for i, j in enumerate(skim_fields):
self.skims[:, i] = self.graph[j].astype(self.__float_type).values[:]
else:
for i, j in enumerate(skim_fields):
self.skims[:, i] = self.graph[j].values[:]
self.skim_fields = skim_fields
def set_blocked_centroid_flows(self, block_centroid_flows) -> None:
"""
Chooses whether we want to block paths to go through centroids or not.
Default value is ``True``
:Arguments:
**block_centroid_flows** (:obj:`bool`): Blocking or not
"""
if not isinstance(block_centroid_flows, bool):
raise TypeError("Blocking flows through centroids needs to be boolean")
if self.num_zones == 0:
self.logger.warning("No centroids in the model. Nothing to block")
return
self.block_centroid_flows = block_centroid_flows
# Procedure to pickle graph and save to disk
def save_to_disk(self, filename: str) -> None:
"""
Saves graph to disk
:Arguments:
**filename** (:obj:`str`): Path to file. Usual file extension is 'aeg'
"""
mygraph = {}
mygraph["description"] = self.description
mygraph["num_links"] = self.num_links
mygraph["num_nodes"] = self.num_nodes
mygraph["network"] = self.network
mygraph["graph"] = self.graph
mygraph["all_nodes"] = self.all_nodes
mygraph["nodes_to_indices"] = self.nodes_to_indices
mygraph["num_nodes"] = self.num_nodes
mygraph["fs"] = self.fs
mygraph["cost"] = self.cost
mygraph["cost_field"] = self.cost_field
mygraph["skims"] = self.skims
mygraph["skim_fields"] = self.skim_fields
mygraph["block_centroid_flows"] = self.block_centroid_flows
mygraph["centroids"] = self.centroids
mygraph["graph_id"] = self._id
mygraph["mode"] = self.mode
with open(filename, "wb") as f:
pickle.dump(mygraph, f)
def load_from_disk(self, filename: str) -> None:
"""
Loads graph from disk
:Arguments:
**filename** (:obj:`str`): Path to file
"""
with open(filename, "rb") as f:
mygraph = pickle.load(f)
self.description = mygraph["description"]
self.num_links = mygraph["num_links"]
self.num_nodes = mygraph["num_nodes"]
self.network = mygraph["network"]
self.graph = mygraph["graph"]
self.all_nodes = mygraph["all_nodes"]
self.nodes_to_indices = mygraph["nodes_to_indices"]
self.num_nodes = mygraph["num_nodes"]
self.fs = mygraph["fs"]
self.cost = mygraph["cost"]
self.cost_field = mygraph["cost_field"]
self.skims = mygraph["skims"]
self.skim_fields = mygraph["skim_fields"]
self.block_centroid_flows = mygraph["block_centroid_flows"]
self.centroids = mygraph["centroids"]
self._id = mygraph["graph_id"]
self.mode = mygraph["mode"]
self.__build_derived_properties()
def __build_derived_properties(self):
if self.centroids is None:
return
self.num_zones = self.centroids.shape[0] if self.centroids.shape else 0
def available_skims(self) -> List[str]:
"""
Returns graph fields that are available to be set as skims
:Returns:
**list** (:obj:`str`): Field names
"""
return [x for x in self.graph.columns if x not in ["link_id", "a_node", "b_node", "direction", "id"]]
# We check if all minimum fields are there
def __network_error_checking__(self):
# Checking field names
has_fields = self.network.columns
must_fields = ["link_id", "a_node", "b_node", "direction"]
for field in must_fields:
if field not in has_fields:
raise ValueError(f"could not find field {field} in the network array")
# Uniqueness of the id
link_ids = self.network["link_id"].astype(int)
if link_ids.shape[0] != np.unique(link_ids).shape[0]:
raise ValueError('"link_id" field not unique')
# Direction values
if np.max(self.network["direction"]) > 1 or np.min(self.network["direction"]) < -1:
raise ValueError('"direction" field not limited to (-1,0,1) values')
if "id" not in self.network.columns:
self.network = self.network.assign(id=np.nan)
def __determine_types__(self, new_type, current_type):
if new_type.isdigit():
new_type = int(new_type)
else:
try:
new_type = float(new_type)
except ValueError as verr:
self.logger.warning("Could not convert {} - {}".format(new_type, verr.__str__()))
if isinstance(new_type, int):
def_type = int
if current_type is float:
def_type = float
elif current_type is str:
def_type = str
elif isinstance(new_type, float):
def_type = float
if current_type is str:
def_type = str
elif isinstance(new_type, str):
def_type = str
else:
raise ValueError("WRONG TYPE OR NULL VALUE")
return def_type
def save_compressed_correspondence(self, path, mode_name, mode_id):
"""Save graph and nodes_to_indices to disk"""
graph_path = join(path, f"correspondence_c{mode_name}_{mode_id}.feather")
self.graph.to_feather(graph_path)
node_path = join(path, f"nodes_to_indices_c{mode_name}_{mode_id}.feather")
pd.DataFrame(self.nodes_to_indices, columns=["node_index"]).to_feather(node_path)
def create_compressed_link_network_mapping(self):
"""
Create three arrays providing a mapping of compressed ID to link ID.
Uses sparse compression. Index 'idx' by the by compressed ID and compressed ID + 1, the
network IDs are then in the range ``idx[id]:idx[id + 1]``.
Links not in the compressed graph are not contained within the 'data' array.
.. code-block:: python
>>> project = create_example(project_path)
>>> project.network.build_graphs()
>>> graph = project.network.graphs['c']
>>> graph.prepare_graph(np.arange(1,25))
>>> idx, data, node_mapping = graph.create_compressed_link_network_mapping()
:Returns:
**idx** (:obj:`np.array`): index array for ``data``
**data** (:obj:`np.array`): array of link ids
**node_mapping**: (:obj:`np.array`): array of node_mapping ids
"""
return create_compressed_link_network_mapping(self)
[docs]
class Graph(GraphBase):
[docs]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
[docs]
class TransitGraph(GraphBase):
[docs]
def __init__(self, config: dict = None, od_node_mapping: pd.DataFrame = None, *args, **kwargs):
super().__init__(*args, **kwargs)
self._config = config
self.od_node_mapping = od_node_mapping
self.mode = "t"