Source code for aequilibrae.paths.network_skimming
import multiprocessing as mp
import sys
import threading
from datetime import datetime
from multiprocessing.dummy import Pool as ThreadPool
from uuid import uuid4
from aequilibrae.paths.AoN import skimming_single_origin
from aequilibrae.context import get_active_project
from aequilibrae.paths.multi_threaded_skimming import MultiThreadedNetworkSkimming
from aequilibrae.paths.results.skim_results import SkimResults
from aequilibrae.utils.core_setter import set_cores
from aequilibrae.utils.aeq_signal import SIGNAL
from aequilibrae.utils.interface.worker_thread import WorkerThread
sys.dont_write_bytecode = True
[docs]
class NetworkSkimming(WorkerThread):
signal = SIGNAL(object)
"""
.. code-block:: python
>>> from aequilibrae.paths.network_skimming import NetworkSkimming
>>> project = create_example(project_path)
>>> network = project.network
>>> network.build_graphs()
>>> graph = network.graphs['c']
>>> graph.set_graph(cost_field="distance")
>>> graph.set_skimming("distance")
>>> skm = NetworkSkimming(graph)
>>> skm.execute()
# The skim report (if any error generated) is available here
>>> skm.report
[]
# To access the skim matrix directly from its temporary file
>>> matrix = skm.results.skims
# Or you can save the results to disk
>>> skm.save_to_project(os.path.join(project_path, 'matrices/skimming_result.omx'))
# Or specify the AequilibraE's matrix file format
>>> skm.save_to_project(os.path.join(project_path, 'matrices/skimming_result.aem'), 'aem')
>>> project.close()
"""
[docs]
def __init__(self, graph, origins=None, project=None):
WorkerThread.__init__(self, None)
self.project = project
self.origins = origins
self.graph = graph
self.cores = mp.cpu_count()
self.results = SkimResults()
self.aux_res = MultiThreadedNetworkSkimming()
self.report = []
self.procedure_id = ""
self.procedure_date = ""
self.cumulative = 0
[docs]
def doWork(self):
self.execute()
[docs]
def execute(self):
"""Runs the skimming process as specified in the graph"""
self.signal.emit(["start", self.graph.num_zones, ""])
self.results.cores = self.cores
self.results.prepare(self.graph)
self.aux_res = MultiThreadedNetworkSkimming()
self.aux_res.prepare(self.graph, self.results.cores, self.results.nodes, self.results.num_skims)
pool = ThreadPool(self.results.cores)
all_threads = {"count": 0}
for orig in list(self.graph.centroids):
i = int(self.graph.nodes_to_indices[orig])
if i >= self.graph.nodes_to_indices.shape[0]:
self.report.append(f"Centroid {orig} is beyond the domain of the graph")
elif self.graph.fs[int(i)] == self.graph.fs[int(i) + 1]:
self.report.append(f"Centroid {orig} does not exist in the graph")
else:
pool.apply_async(self.__func_skim_thread, args=(orig, all_threads))
pool.close()
pool.join()
self.aux_res = None
self.procedure_id = uuid4().hex
self.procedure_date = str(datetime.today())
self.signal.emit(["set_text", "Saving Outputs"])
self.signal.emit(["finished"])
[docs]
def set_cores(self, cores: int) -> None:
"""
Sets number of cores (threads) to be used in computation
Value of zero sets number of threads to all available in the system, while negative values indicate the number
of threads to be left out of the computational effort.
Resulting number of cores will be adjusted to a minimum of zero or the maximum available in the system if the
inputs result in values outside those limits
:Arguments:
**cores** (:obj:`int`): Number of cores to be used in computation
"""
self.cores = set_cores(cores)
[docs]
def save_to_project(self, name: str, format="omx", project=None) -> None:
"""Saves skim results to the project folder and creates record in the database
:Arguments:
**name** (:obj:`str`): Name of the matrix. Same value for matrix record name and file (plus extension)
**format** (:obj:`str`, *Optional*): File format ('aem' or 'omx'). Default is 'omx'
**project** (:obj:`Project`, *Optional*): Project we want to save the results to.
Defaults to the active project
"""
file_name = f"{name}.{format.lower()}"
if not project:
project = self.project or get_active_project()
mats = project.matrices
record = mats.new_record(name, file_name, self.results.skims)
record.procedure_id = self.procedure_id
record.timestamp = self.procedure_date
record.procedure = "Network skimming"
record.save()
def __func_skim_thread(self, origin, all_threads):
if threading.get_ident() in all_threads:
th = all_threads[threading.get_ident()]
else:
all_threads[threading.get_ident()] = all_threads["count"]
th = all_threads["count"]
all_threads["count"] += 1
x = skimming_single_origin(origin, self.graph, self.results, self.aux_res, th)
self.cumulative += 1
if x != origin:
self.report.append(x)
self.signal.emit(["update", self.cumulative, f"{self.cumulative}/{self.graph.num_zones}"])