Source code for aequilibrae.transit.parse_csv
from io import TextIOWrapper
import numpy as np
import csv
import copy
from numpy.lib.recfunctions import append_fields
[docs]
def parse_csv(file_name: str, column_order=[]): # noqa B006
tot = []
if isinstance(file_name, str):
csvfile = open(file_name, encoding="utf-8-sig")
else:
csvfile = TextIOWrapper(file_name, encoding="utf-8-sig")
contents = csv.reader(csvfile, delimiter=",", quotechar='"')
numcols = 0
for row in contents:
if not len("".join(row).strip()):
continue
broken = [x.encode("ascii", errors="ignore").decode().strip() for x in row]
if not numcols:
numcols = len(broken)
else:
if numcols < len(broken):
broken.extend([""] * (numcols - len(broken)))
tot.append(broken)
titles = tot.pop(0)
csvfile.close()
if tot:
data = np.core.records.fromrecords(tot, names=[x.lower() for x in titles])
else:
return empty()
missing_cols_names = [x for x in column_order.keys() if x not in data.dtype.names]
for col in missing_cols_names:
data = append_fields(data, col, np.array([""] * len(tot)))
if column_order:
col_names = [x for x in column_order.keys() if x in data.dtype.names]
data = data[col_names]
# Define sizes for the string variables
column_order = copy.deepcopy(column_order)
for c in col_names:
if column_order[c] is str:
column_order[c] = object
else:
if data[c].dtype.char.upper() in ["U", "S"]:
data[c][data[c] == ""] = "0"
new_data_dt = [(f, column_order[f]) for f in col_names]
if int(data.shape.__len__()) > 0:
# handle the case of int data given as a float string
for j, (_, dtype) in enumerate(new_data_dt):
if dtype == int:
for item in data:
item[j] = item[j].split(".")[0]
return np.array(data, dtype=new_data_dt)
else:
return data
else:
return data
[docs]
class empty:
shape = [0]