Source code for aequilibrae.paths.all_or_nothing

import importlib.util as iutil
import threading
from multiprocessing.dummy import Pool as ThreadPool
import numpy as np

from .multi_threaded_aon import MultiThreadedAoN
from ..utils import WorkerThread
from aequilibrae.matrix import AequilibraeMatrix
from aequilibrae import global_logger

try:
    from aequilibrae.paths.AoN import one_to_all, assign_link_loads
except ImportError as ie:
    global_logger.warning(f"Could not import procedures from the binary. {ie.args}")

spec = iutil.find_spec("PyQt5")
pyqt = spec is not None
if pyqt:
    from PyQt5.QtCore import pyqtSignal as SIGNAL

if False:
    from .results import AssignmentResults
    from .graph import Graph


[docs] class allOrNothing(WorkerThread): if pyqt: assignment = SIGNAL(object) def __init__(self, matrix, graph, results): # type: (AequilibraeMatrix, Graph, AssignmentResults)->None WorkerThread.__init__(self, None) self.matrix = matrix self.graph = graph self.results = results self.aux_res = MultiThreadedAoN() if results._graph_id != graph._id: raise ValueError("Results object not prepared. Use --> results.prepare(graph)") elif matrix.matrix_view is None: raise ValueError( "Matrix was not prepared for assignment. " "Please create a matrix_procedures view with all classes you want to assign" ) elif not np.array_equal(matrix.index, graph.centroids): raise ValueError("Matrix and graph do not have compatible sets of centroids.")
[docs] def doWork(self): self.execute()
[docs] def execute(self): self.report = [] self.cumulative = 0 if pyqt: self.assignment.emit(["zones finalized", 0]) self.aux_res.prepare(self.graph, self.results) self.matrix.matrix_view = self.matrix.matrix_view.reshape( (self.graph.num_zones, self.graph.num_zones, self.results.classes["number"]) ) mat = self.matrix.matrix_view pool = ThreadPool(self.results.cores) all_threads = {"count": 0} for orig in self.matrix.index: i = int(self.graph.nodes_to_indices[orig]) if np.nansum(mat[i, :, :]) > 0 or self.results.num_skims > 0: if self.graph.fs[i] == self.graph.fs[i + 1]: self.report.append("Centroid " + str(orig) + " is not connected") else: pool.apply_async(self.func_assig_thread, args=(orig, all_threads)) pool.close() pool.join() # TODO: Multi-thread this sum self.results.compact_link_loads = np.sum(self.aux_res.temp_link_loads, axis=0) assign_link_loads( self.results.link_loads, self.results.compact_link_loads, self.results.crosswalk, self.results.cores ) if pyqt: self.assignment.emit(["finished_threaded_procedure", None])
[docs] def func_assig_thread(self, origin, all_threads): thread_id = threading.get_ident() th = all_threads.get(thread_id, all_threads["count"]) if th == all_threads["count"]: all_threads[thread_id] = all_threads["count"] all_threads["count"] += 1 x = one_to_all(origin, self.matrix, self.graph, self.results, self.aux_res, th) self.cumulative += 1 if x != origin: self.report.append(x) if pyqt: self.assignment.emit(["zones finalized", self.cumulative]) self.assignment.emit(["text AoN", f"{self.cumulative:,}/{self.matrix.zones:,}"])