#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# utilities.py
"""Common functions for the DVHA-Stats."""
#
# Copyright (c) 2020 Dan Cutright
# This file is part of DVHA-Stats, released under a MIT license.
# See the file LICENSE included with this distribution, also
# available at https://github.com/cutright/DVHA-Stats
import numpy as np
from os.path import isfile, splitext
from dateutil.parser import parse as date_parser
import csv
[docs]def apply_dtype(value, dtype):
"""Convert value with the provided data type
Parameters
----------
value : any
Value to be converted
dtype : function, None
python reserved types, e.g., int, float, str, etc. However, dtype
could be any callable that raises a ValueError on failure.
Returns
----------
any
The return of dtype(value) or numpy.nan on ValueError
"""
if dtype is None:
return value
try:
value = dtype(value)
except ValueError:
value = np.nan
return value
[docs]def csv_to_dict(csv_file_path, delimiter=",", dtype=None, header_row=True):
"""Read in a csv file, return data as a dictionary
Parameters
----------
csv_file_path : str
File path to the CSV file to be processed.
delimiter : str
Specify the delimiter used in the csv file (default = ',')
dtype : callable, type, optional
Optionally force values to a type (e.g., float, int, str, etc.).
header_row : bool, optional
If True, the first row is interpreted as column keys, otherwise row
indices will be used
Returns
-------
dict
CSV data as a dict, using the first row values as keys
"""
with open(csv_file_path, "r") as fp:
reader = csv.reader(fp, delimiter=delimiter)
if header_row:
first_row = next(reader)
keys = [key.strip() for key in first_row]
data = list(reader)
else:
data = list(reader)
keys = list(range(len(data[0])))
data_dict = {key: [] for key in keys}
for row in data:
for c, value in enumerate(row):
data_dict[keys[c]].append(apply_dtype(value, dtype))
return data_dict
[docs]def dict_to_array(data, key_order=None):
"""Convert a dict of data to a numpy array
Parameters
----------
data : dict
Dictionary of data to be converted to np.array.
key_order : None, list of str
Optionally the order of columns
Returns
-------
dict
A dictionary with keys of 'data' and 'columns', pointing to a
numpy array and list of str, respectively
"""
var_names = key_order if key_order is not None else list(data.keys())
arr_data = [data[key] for key in var_names]
return {"data": np.asarray(arr_data).T, "var_names": var_names}
[docs]def import_data(data, var_names=None):
"""Generalized data importer for np.ndarray, dict, and csv file
Parameters
----------
data : numpy.array, dict, str
Input data (2-D) with N rows of observations and
p columns of variables. The CSV file must have a header row
for column names.
var_names : list of str, optional
If data is a numpy array, optionally provide the column names.
Returns
----------
np.ndarray, list
A tuple: data as an array and variable names as a list
"""
if isinstance(data, np.ndarray):
var_names = (
var_names if var_names is not None else list(range(data.shape[1]))
)
return data, var_names
if isinstance(data, dict):
data = dict_to_array(data)
return data["data"], data["var_names"]
if isinstance(data, str) and isfile(data):
if splitext(data)[1] == ".csv":
data = dict_to_array(csv_to_dict(data, dtype=float))
return data["data"], data["var_names"]
msg = "Invalid data provided - must be a numpy array, dict, or .csv file"
raise NotImplementedError(msg)
[docs]def widen_data(
data_dict,
uid_columns,
x_data_cols,
y_data_col,
date_col=None,
sort_by_date=True,
remove_partial_columns=False,
multi_val_policy="first",
dtype=None,
date_parser_kwargs=None,
):
"""Convert a narrow data dictionary into wide format (i.e., from one row
per dependent value to one row per observation)
Parameters
----------
data_dict : dict
Data to be converted. The length of each array must be uniform.
uid_columns : list
Keys of data_dict used to create an observation uid
x_data_cols : list
Keys of columns representing independent data
y_data_col : int, str
Key of data_dict representing dependent data
date_col : int, str, optional
Key of date column
sort_by_date : bool, optional
Sort output by date (date_col required)
remove_partial_columns : bool, optional
If true, any columns that have a blank row will be removed
multi_val_policy : str
Either 'first', 'last', 'min', 'max'. If multiple values are found for
a particular combination of x_data_cols, one value will be selected
based on this policy.
dtype : function
python reserved types, e.g., int, float, str, etc. However, dtype
could be any callable that raises a ValueError on failure.
date_parser_kwargs : dict, optional
Keyword arguments to be passed into dateutil.parser.parse
Returns
----------
dict
data_dict reformatted to one row per UID
"""
data_lengths = [len(col) for col in data_dict.values()]
if len(set(data_lengths)) != 1:
msg = "Each column of data_dict must be of the same length"
raise NotImplementedError(msg)
if multi_val_policy not in {"first", "last", "min", "max"}:
msg = "multi_val_policy must be in 'first', 'last', 'min', or 'max'"
raise NotImplementedError(msg)
data = {}
for row in range(len(data_dict[y_data_col])):
uid = " && ".join([str(data_dict[col][row]) for col in uid_columns])
if uid not in list(data):
data[uid] = {}
vals = [data_dict[col][row] for col in x_data_cols]
vals = [float(v) if is_numeric(v) else v for v in vals]
params = " && ".join([str(v) for v in vals])
date = 0 if date_col is None else data_dict[date_col][row]
if date not in data[uid].keys():
data[uid][date] = {}
if params not in list(data[uid][date]):
data[uid][date][params] = []
data[uid][date][params].append(data_dict[y_data_col][row])
x_variables = []
for results in data.values():
for date_results in results.values():
for param in date_results.keys():
if param not in {"uid", "date"}:
x_variables.append(param)
x_variables = sorted(list(set(x_variables)))
keys = ["uid", "date"] + x_variables
wide_data = {key: [] for key in keys}
partial_cols = []
for uid, date_data in data.items():
for date, param_data in date_data.items():
wide_data["uid"].append(uid)
wide_data["date"].append(date)
for x in x_variables:
values = param_data.get(x)
if values is None:
if remove_partial_columns:
partial_cols.append(x)
values = [""]
if dtype is not None:
values = [apply_dtype(v, dtype) for v in values]
value = values[0]
if len(values) > 1:
print(
"WARNING: Multiple values found for uid: %s, date: "
"%s, param: %s. Only the %s value is included in "
"widen_data output." % (uid, date, x, multi_val_policy)
)
if multi_val_policy == "last":
value = values[-1]
elif multi_val_policy in {"min", "max"}:
value = {"min": min, "max": max}[multi_val_policy](
values
)
wide_data[x].append(value)
if remove_partial_columns:
partial_cols = set(partial_cols)
if len(partial_cols):
for col in partial_cols:
wide_data.pop(col)
x_variables.pop(x_variables.index(col))
if date_col is None:
wide_data.pop("date")
elif sort_by_date:
kwargs = {} if date_parser_kwargs is None else date_parser_kwargs
dates = str_arr_to_date_arr(wide_data["date"], kwargs)
sorted_indices = get_sorted_indices(dates)
final_data = {key: [] for key in wide_data.keys()}
for row in range(len(wide_data[x_variables[0]])):
final_data["uid"].append(wide_data["uid"][sorted_indices[row]])
final_data["date"].append(wide_data["date"][sorted_indices[row]])
for x in x_variables:
final_data[x].append(wide_data[x][sorted_indices[row]])
return final_data
return wide_data
[docs]def get_sorted_indices(list_data):
"""Get original indices of a list after sorting
Parameters
----------
list_data : list
Any python sortable list
Returns
----------
list
list_data indices of sorted(list_data)
"""
return [i[0] for i in sorted(enumerate(list_data), key=lambda x: x[1])]
[docs]def sort_2d_array(arr, index, mode="col"):
"""Sort a 2-D numpy array
Parameters
----------
arr : np.ndarray
Input 2-D array to be sorted
index : int, list
Index of column or row to sort arr. If list, will sort by each index
in the order provided.
mode : str
Either 'col' or 'row'
"""
if not isinstance(index, list):
index = [index]
if mode not in {"col", "row"}:
msg = (
"Unsupported sort_2d_array mode, "
"must be either 'col' or 'row' - got %s" % mode
)
raise NotImplementedError(msg)
sort_by = arr[:, index[-1]] if mode == "col" else arr[index[-1], :]
arr = arr[sort_by.argsort()]
for i in index[0:-1][::-1]:
sort_by = arr[:, i] if mode == "col" else arr[i, :]
arr = arr[sort_by.argsort(kind="mergesort")]
return arr
[docs]def str_arr_to_date_arr(arr, date_parser_kwargs=None, force=False):
"""Convert an array of datetime strings to a list of datetime objects
Parameters
----------
arr : array-like
Array of datetime strings compatible with dateutil.parser.parse
date_parser_kwargs : dict, optional
Keyword arguments to be passed into dateutil.parser.parse
force : bool
If true, failed parsings will result in original value. If false,
dateutil.parser.parse's error will be raised on failures.
Returns
----------
list
list of datetime objects
"""
kwargs = {} if date_parser_kwargs is None else date_parser_kwargs
dates = []
for date_str in arr:
try:
date = date_parser(date_str, **kwargs)
except Exception as e:
if force:
date = date_str
else:
raise e
dates.append(date)
return dates
[docs]def is_numeric(val):
"""Check if value is numeric (float or int)
Parameters
----------
val : any
Any value
Returns
-------
bool
Returns true if float(val) doesn't raise a ValueError
"""
try:
float(val)
return True
except ValueError:
return False