Source code for aequilibrae.matrix.aequilibrae_data

import os
import sqlite3
import tempfile
import uuid

import numpy as np
from numpy.lib.format import open_memmap

MEMORY = 1
DISK = 0
data_export_types = ["aed", "csv", "sqlite"]


[docs] class AequilibraeData(object): """AequilibraE dataset"""
[docs] def __init__(self): self.data = None self.file_path = None self.entries = None self.fields = None self.num_fields = None self.data_types = None self.aeq_index_type = None self.memory_mode = None
[docs] @classmethod def empty(cls, *args, **kwargs): instance = cls() instance.create_empty(*args, **kwargs) return instance
[docs] def create_empty( self, file_path=None, entries=1, field_names=None, data_types=None, memory_mode=False, fill=None, index=None ): """ Creates a new empty dataset :Arguments: **file_path** (:obj:`str`, *Optional*): Full path for the output data file. If *memory_mode* is 'false' and path is missing, then the file is created in the temp folder **entries** (:obj:`int`, *Optional*): Number of records in the dataset. Default is 1 **field_names** (:obj:`list`, *Optional*): List of field names for this dataset. If no list is provided, the field 'data' will be created **data_types** (:obj:`np.dtype`, *Optional*): List of data types for the dataset. Types need to be NumPy data types (e.g. ``np.int16``, ``np.float64``). If no list of types are provided, type will be ``np.float64`` **memory_mode** (:obj:`bool`, *Optional*): If ``True``, dataset will be kept in memory. If ``False``, the dataset will be a memory-mapped numpy array .. code-block:: python >>> from aequilibrae.matrix import AequilibraeData, AequilibraeMatrix >>> mat = AequilibraeMatrix() >>> mat.load('/tmp/test_project/matrices/demand.omx') >>> mat.computational_view() >>> vectors = "/tmp/test_project/vectors.aed" >>> args = { ... "file_path": vectors, ... "entries": mat.zones, ... "field_names": ["origins", "destinations"], ... "data_types": [np.float64, np.float64] ... } >>> dataset = AequilibraeData() >>> dataset.create_empty(**args) """ if file_path is not None or memory_mode: if field_names is None: field_names = ["data"] if data_types is None: data_types = [np.float64] * len(field_names) self.file_path = file_path self.entries = entries self.fields = field_names self.data_types = data_types self.aeq_index_type = np.uint64 if memory_mode: self.memory_mode = MEMORY else: self.memory_mode = DISK if self.file_path is None: self.file_path = self.random_name() # Consistency checks if not isinstance(self.fields, list): raise ValueError('Titles for fields, "field_names", needs to be a list') if not isinstance(self.data_types, list): raise ValueError('Data types, "data_types", needs to be a list') for field in self.fields: if not isinstance(field, str): raise TypeError(f"{field} is not a string. You cannot use it as a field name") if not field.isidentifier(): raise Exception(field + " is a not a valid identifier name. You cannot use it as a field name") if field in object.__dict__: raise Exception(field + " is a reserved name. You cannot use it as a field name") self.num_fields = len(self.fields) dtype = [("index", self.aeq_index_type)] + list(zip(self.fields, self.data_types)) # the file if self.memory_mode: self.data = np.recarray((self.entries,), dtype=dtype) else: self.data = open_memmap(self.file_path, mode="w+", dtype=dtype, shape=(self.entries,)) if fill is not None: [self.data[f].fill(fill) for f in self.fields] if index is not None: self.index[:] = index[:]
def __getattr__(self, field_name): if field_name in object.__dict__: return self.__dict__[field_name] if self.data is not None: if field_name in self.fields: return self.data[field_name] if field_name == "index": return self.data["index"] raise AttributeError("No such method or data field! --> " + str(field_name)) else: raise AttributeError("Data container is empty")
[docs] def load(self, file_path): """ Loads dataset from file :Arguments: **file_path** (:obj:`str`): Full file path to the AequilibraeData to be loaded .. code-block:: python >>> from aequilibrae.matrix import AequilibraeData >>> dataset = AequilibraeData() >>> dataset.load("/tmp/test_project/vectors.aed") """ f = open(file_path) self.file_path = os.path.realpath(f.name) f.close() # Map in memory and load data names plus dimensions self.data = open_memmap(self.file_path, mode="r+") self.entries = self.data.shape[0] self.fields = [x for x in self.data.dtype.fields if x != "index"] self.num_fields = len(self.fields) self.data_types = [self.data[x].dtype.type for x in self.fields]
[docs] def export(self, file_name, table_name="aequilibrae_table"): """ Exports the dataset to another format. Supports CSV and SQLite :Arguments: **file_name** (:obj:`str`): File name with PATH and extension (csv, or sqlite3, sqlite or db) **table_name** (:obj:`str`): It only applies if you are saving to an SQLite table. Otherwise ignored .. code-block:: python >>> from aequilibrae.matrix import AequilibraeData >>> dataset = AequilibraeData() >>> dataset.load("/tmp/test_project/vectors.aed") >>> dataset.export("/tmp/test_project/vectors.csv") """ file_type = os.path.splitext(file_name)[1] headers = ["index"] headers.extend(self.fields) if file_type.lower() == ".aed": dtype = [("index", self.aeq_index_type)] dtype.extend([(self.fields[i], self.data_types[i]) for i in range(self.num_fields)]) data = open_memmap(file_name, mode="w+", dtype=dtype, shape=(self.entries,)) for field in data.dtype.names: data[field] = self.data[field] data.flush() del data elif file_type.lower() == ".csv": fmt = "%d" for dt in self.data_types: if np.issubdtype(dt, np.floating): fmt += ",%f" elif np.issubdtype(dt, np.integer): fmt += ",%d" data = np.array(self.data, copy=True) for nm in self.data.dtype.names: np.nan_to_num(data[nm], copy=False) np.savetxt(file_name, data[np.newaxis, :][0], delimiter=",", fmt=fmt, header=",".join(headers), comments="") elif file_type.lower() in [".sqlite", ".sqlite3", ".db"]: # Connecting to the database file conn = sqlite3.connect(file_name) c = conn.cursor() # Creating the table, but before deletes if the table exists c.execute("""DROP TABLE IF EXISTS """ + table_name) fi = "" qm = "?" for f in headers[1:]: fi += ", " + f + " REAL" qm += ", ?" c.execute("""CREATE TABLE """ + table_name + """ (link_id INTEGER PRIMARY KEY""" + fi + ")" "") c.execute("BEGIN TRANSACTION") c.executemany("INSERT INTO " + table_name + " VALUES (" + qm + ")", self.data) c.execute("END TRANSACTION") conn.commit() conn.close()
[docs] @staticmethod def random_name(): """ Returns a random name for a dataset with root in the temp directory of the user .. code-block:: python >>> from aequilibrae.matrix import AequilibraeData >>> name = AequilibraeData().random_name() # doctest: +ELLIPSIS # This is an example of output # '/tmp/Aequilibrae_data_5werr5f36-b123-asdf-4587-adfglkjhqwe.aed' """ return os.path.join(tempfile.gettempdir(), f"Aequilibrae_data_{uuid.uuid4()}.aed")