#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# stats.py
"""Statistical calculations and class objects"""
#
# Copyright (c) 2020 Dan Cutright
# Copyright (c) 2020 Arka Roy
# This file is part of DVHA-Stats, released under a MIT license.
# See the file LICENSE included with this distribution, also
# available at https://github.com/cutright/DVHA-Stats
import numpy as np
from scipy import stats as scipy_stats
from sklearn import linear_model
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.decomposition import PCA as sklearn_PCA
from regressors import stats as regressors_stats
from prettytable import PrettyTable
[docs]class Histogram:
"""Basic histogram plot using matplotlib histogram calculation
Parameters
----------
y: array-like
Input array.
bins : int, list, str, optional
If bins is an int, it defines the number of equal-width bins in
the given range (10, by default). If bins is a sequence, it
defines a monotonically increasing array of bin edges, including
the rightmost edge, allowing for non-uniform bin widths.
If bins is a string, it defines the method used to calculate the
optimal bin width, as defined by histogram_bin_edges.
‘auto’ - Maximum of the ‘sturges’ and ‘fd’ estimators.
Provides good all around performance.
‘fd’ - (Freedman Diaconis Estimator) Robust (resilient to
outliers) estimator that takes into account data variability and
data size.
‘doane’ - An improved version of Sturges’ estimator that works
better with non-normal datasets.
‘scott’ - Less robust estimator that that takes into account data
variability and data size.
‘stone’ - Estimator based on leave-one-out cross-validation
estimate of the integrated squared error. Can be regarded as a
generalization of Scott’s rule.
‘rice’ - Estimator does not take variability into account, only
data size. Commonly overestimates number of bins required.
‘sturges’ - R’s default method, only accounts for data size. Only
optimal for gaussian data and underestimates number of bins for
large non-gaussian datasets.
‘sqrt’ - Square root (of data size) estimator, used by Excel and
other programs for its speed and simplicity.
nan_policy : str
Value must be one of the following: ‘propagate’, ‘raise’, ‘omit’
Defines how to handle when input contains nan. The following
options are available (default is ‘omit’):
‘propagate’: returns nan
‘raise’: throws an error
‘omit’: performs the calculations ignoring nan values"""
def __init__(self, y, bins, nan_policy="omit"):
"""Initialization of Histogram class object"""
self.y = process_nan_policy(y, nan_policy)
self.bins = bins
self.nan_policy = nan_policy
@property
def mean(self):
"""The mean value of the input array
Returns
----------
np.ndarray
Mean value of y with np.mean()
"""
return np.mean(self.y)
@property
def median(self):
"""The median value of the input array
Returns
----------
np.ndarray
Median value of y with np.median()
"""
return np.median(self.y)
@property
def std(self):
"""The standard deviation of the input array
Returns
----------
np.ndarray
Standard deviation of y with np.std()
"""
return np.std(self.y)
@property
def normality(self):
"""The normality and normality p-value of the input array
Returns
----------
statistic : float
Normality calculated with scipy.stats.normaltest
p-value : float
A 2-sided chi squared probability for the hypothesis test.
"""
return scipy_stats.normaltest(self.y, nan_policy=self.nan_policy)
@property
def hist_data(self):
"""Get the histogram data
Returns
----------
hist : np.ndarray
The values of the histogram
center : np.ndarray
The centers of the bins
"""
hist, bin_edges = np.histogram(self.y, bins=self.bins)
center = (bin_edges[:-1] + bin_edges[1:]) / 2.0
return hist, center
@property
def chart_data(self):
"""JSON compatible dict for chart generation
Returns
----------
dict
Data used for Histogram visuals. Keys include 'x', 'y', 'mean',
'median', 'std', 'normality', 'normality_p'
"""
hist, center = self.hist_data
norm, norm_p = self.normality
return {
"x": center.tolist(),
"y": hist.tolist(),
"mean": float(self.mean),
"median": float(self.median),
"std": float(self.std),
"normality": float(norm),
"normality_p": float(norm_p),
}
[docs]class MultiVariableRegression:
"""Multi-variable regression using scikit-learn
Parameters
----------
X : array-like
Independent data
y : array-like
Dependent data
saved_reg : MultiVariableRegression, optional
Optionally provide a previously calculated regression
var_names : list, optional
Optionally provide names of the variables
y_var_name : int, str, optional
Optionally provide name of the dependent variable
back_elim : bool
Automatically perform backward elimination if True
back_elim_p : float
p-value threshold for backward elimination
"""
def __init__(
self,
X,
y,
saved_reg=None,
var_names=None,
y_var_name=None,
back_elim=False,
back_elim_p=0.05,
):
"""Initialization of a MultiVariableRegression"""
self.X = np.array(X)
self.y = np.array(y)
self.y_var_name = y_var_name
self.var_names = (
list(range(self.X.shape[1])) if var_names is None else var_names
)
self._do_fit(saved_reg=saved_reg)
if back_elim:
self.backward_elimination(back_elim_p)
def __str__(self):
"""String representation of MultiVariableRegression object"""
table = PrettyTable()
table.field_names = ["", "Coef", "Std. Err.", "t-value", "p-value"]
for c in table.field_names[1:]:
table.align[c] = "r"
data = {
"": ["y-int"] + self.var_names,
"Coef": ["%0.3E" % v for v in self.coef],
"Std. Err.": ["%0.3E" % v for v in self.std_err],
"t-value": ["%0.3f" % v for v in self.t],
"p-value": ["%0.3f" % v for v in self.p],
}
for r in range(0, len(data[""])):
table.add_row([data[c][r] for c in table.field_names])
msg = [
"Multi-Variable Regression results/model\n",
"R²: %0.3f" % self.r_sq,
"MSE: %0.3f" % self.mse,
"f-stat: %0.3f" % self.f_stat,
"f-stat p-value: %0.3f" % self.f_p_value,
str(table),
]
return "\n".join(msg)
def __repr__(self):
"""Return the string representation"""
return str(self)
def _do_fit(self, saved_reg=None):
"""Perform linear regression
Parameters
----------
saved_reg : MultiVariableRegression, optional
Optionally provide a previously calculated regression
"""
if saved_reg is None:
self.reg = linear_model.LinearRegression()
self.ols = self.reg.fit(self.X, self.y)
else:
self.reg = saved_reg.reg
self.ols = saved_reg.ols
self.predictions = self.reg.predict(self.X)
self.p, self.std_err, self.t = get_lin_reg_p_values(
self.X, self.y, self.predictions, self.y_intercept, self.slope
)
@property
def y_intercept(self):
"""The y-intercept of the linear regression
Returns
----------
float
Independent term in the linear model.
"""
return self.reg.intercept_ if self.reg is not None else None
@property
def slope(self):
"""The slope of the linear regression
Returns
----------
np.ndarray
Estimated coefficients for the linear regression problem. If
multiple targets are passed during the fit (y 2D), this is a 2D
array of shape (n_targets, n_features), while if only one target
is passed, this is a 1D array of length n_features.
"""
return self.reg.coef_ if self.reg is not None else None
@property
def r_sq(self):
"""R^2 (coefficient of determination) regression score function.
Returns
----------
float
The R^2 score
"""
return r2_score(self.y, self.predictions)
@property
def mse(self):
"""Mean squared error of the linear regression
Returns
----------
float, nd.array
A non-negative floating point value (the best value is 0.0), or
an array of floating point values, one for each individual target.
"""
return mean_squared_error(self.y, self.predictions)
@property
def residuals(self):
"""Residuals of the prediction and sample data
Returns
----------
np.ndarray
y - predictions
"""
return np.subtract(self.y, self.predictions)
@property
def f_stat(self):
"""The F-statistic of the regression
Returns
----------
float
F-statistic of beta coefficients using regressors.stats
"""
return regressors_stats.f_stat(self.ols, self.X, self.y)
@property
def df_error(self):
"""Error degrees of freedom
Returns
----------
int
Degrees of freedom for the error
"""
return len(self.X[:, 0]) - len(self.X[0, :]) - 1
@property
def df_model(self):
"""Model degrees of freedom
Returns
----------
int
Degrees of freedom for the model
"""
return len(self.X[0, :])
@property
def f_p_value(self):
"""p-value of the f-statistic
Returns
----------
float
p-value of the F-statistic of beta coefficients using scipy
"""
return scipy_stats.f.cdf(self.f_stat, self.df_model, self.df_error)
@property
def coef(self):
"""Coefficients for the regression
Returns
----------
np.ndarray
An array of regression coefficients (i.e., y_intercept,
1st var slope, 2nd var slope, etc.)
"""
return np.concatenate(([self.y_intercept], self.slope))
[docs] def backward_elimination(self, p_value=0.05):
"""Remove insignificant variables from regression
p_value : float
Iteratively remove the least significant variable until all
variables have p-values less than p_value or only one variable
remains.
"""
finished = False
while not finished:
index = self.p[1:].argmax() # ignore y-int
if len(self.var_names) > 1 and self.p[index + 1] > p_value:
self.X = np.delete(self.X, index, axis=1)
self.var_names.pop(index)
self._do_fit()
else:
finished = True
@property
def chart_data(self):
"""JSON compatible dict for chart generation
Returns
----------
dict
Data used for residual visuals. Keys include 'x', 'y', 'pred',
'resid', 'coef', 'r_sq', 'mse', 'std_err', 't_value', 'p_value'
"""
return {
"y": self.y.tolist(),
"pred": self.predictions.tolist(),
"resid": self.residuals.tolist(),
"coef": self.coef.tolist(),
"r_sq": float(self.r_sq),
"mse": float(self.mse),
"std_err": self.std_err.tolist(),
"t_value": self.t.tolist(),
"p_value": self.p.tolist(),
}
@property
def prob_plot(self):
"""
Calculate quantiles for a probability plot
Returns
----------
dict
Data for generating a probablity plot. Keys include:
'x', 'y', 'y_intercept', 'slope', 'x_trend', 'y_trend'
"""
norm_prob_plot = scipy_stats.probplot(
self.residuals, dist="norm", fit=False, plot=None, rvalue=False
)
reg_prob = linear_model.LinearRegression()
reg_prob.fit([[val] for val in norm_prob_plot[0]], norm_prob_plot[1])
x_trend = [
float(np.min(norm_prob_plot[0])),
float(np.max(norm_prob_plot[0])),
]
y_trend = np.add(
np.multiply(x_trend, reg_prob.coef_),
reg_prob.intercept_,
)
return {
"x": norm_prob_plot[0].tolist(),
"y": norm_prob_plot[1].tolist(),
"y_intercept": float(reg_prob.intercept_),
"slope": float(reg_prob.coef_),
"x_trend": x_trend,
"y_trend": y_trend.tolist(),
}
[docs]class ControlChart:
"""Calculate control limits for a standard univariate Control Chart"
Parameters
----------
y : list, np.ndarray
Input data (1-D)
std : int, float, optional
Number of standard deviations used to calculate if a y-value is
out-of-control.
ucl_limit : float, optional
Limit the upper control limit to this value
lcl_limit : float, optional
Limit the lower control limit to this value
"""
def __init__(
self,
y,
std=3,
ucl_limit=None,
lcl_limit=None,
x=None,
):
"""Initialization of a ControlChart"""
self.y = np.array(y) if isinstance(y, list) else y
self.x = x if x else np.linspace(1, len(self.y), len(self.y))
self.std = std
self.ucl_limit = ucl_limit
self.lcl_limit = lcl_limit
# since moving range is calculated based on 2 consecutive points
self.scalar_d = 1.128
def __str__(self):
"""String representation of ControlChartData object"""
msg = [
"center_line: %0.3f" % self.center_line,
"control_limits: %0.3f, %0.3f" % self.control_limits,
"out_of_control: %s" % self.out_of_control,
]
return "\n".join(msg)
def __repr__(self):
"""Return the string representation"""
return str(self)
@property
def center_line(self):
"""Center line of charting data (i.e., mean value)
Returns
----------
np.ndarray, np.nan
Mean value of y with np.mean() or np.nan if y is empty
"""
data = remove_nan(self.y)
if len(data):
return np.mean(data)
return np.nan
@property
def avg_moving_range(self):
"""Avg moving range based on 2 consecutive points
Returns
----------
np.ndarray, np.nan
Average moving range. Returns NaN if arr is empty.
"""
return avg_moving_range(self.y, nan_policy="omit")
@property
def sigma(self):
"""UCL/LCL = center_line +/- sigma * std
Returns
----------
np.ndarray, np.nan
sigma or np.nan if arr is empty
"""
return self.avg_moving_range / self.scalar_d
@property
def control_limits(self):
"""Calculate the lower and upper control limits
Returns
----------
lcl : float
Lower Control Limit (LCL)
ucl : float
Upper Control Limit (UCL)
"""
cl = self.center_line
sigma = self.sigma
ucl = cl + self.std * sigma
lcl = cl - self.std * sigma
if self.ucl_limit is not None and ucl > self.ucl_limit:
ucl = self.ucl_limit
if self.lcl_limit is not None and lcl < self.lcl_limit:
lcl = self.lcl_limit
return lcl, ucl
@property
def out_of_control(self):
"""Get the indices of out-of-control observations
Returns
----------
np.ndarray
An array of indices that are not between the lower and upper
control limits
"""
lcl, ucl = self.control_limits
high = np.argwhere(self.y > ucl)
low = np.argwhere(self.y < lcl)
return np.unique(np.concatenate([high, low]))
@property
def out_of_control_high(self):
"""Get the indices of observations > ucl
Returns
----------
np.ndarray
An array of indices that are greater than the upper control limit
"""
_, ucl = self.control_limits
return np.argwhere(self.y > ucl)
@property
def out_of_control_low(self):
"""Get the indices of observations < lcl
Returns
----------
np.ndarray
An array of indices that are less than the lower control limit
"""
lcl, _ = self.control_limits
return np.argwhere(self.y < lcl)
@property
def chart_data(self):
"""JSON compatible dict for chart generation
Returns
----------
dict
Data used for Histogram visuals. Keys include 'x', 'y',
'out_of_control', 'center_line', 'lcl', 'ucl'
"""
lcl, ucl = self.control_limits
return {
"x": self.x.tolist(),
"y": self.y.tolist(),
"out_of_control": self.out_of_control.tolist(),
"center_line": float(self.center_line),
"lcl": float(lcl),
"ucl": float(ucl),
}
[docs]class HotellingT2:
"""Hotelling's t-squared statistic for multivariate hypothesis testing
Parameters
----------
data : np.ndarray
A 2-D array of data to perform multivariate analysis.
(e.g., DVHAStats.data)
alpha : float
The significance level used to calculate the
upper control limit (UCL)
const_policy : str
{‘raise’, 'omit'}
Defines how to handle when data is constant. The following
options are available (default is ‘raise’):
‘raise’: throws an error
'omit': exclude constant variables from calculation
"""
def __init__(self, data, alpha=0.05, const_policy="raise"):
"""Initialize the Hotelling T^2 class"""
self.data = (
data if const_policy == "raise" else remove_const_column(data)
)
self.alpha = alpha
self.lcl = 0
def __str__(self):
"""String representation of HotellingT2 object"""
msg = [
"Q: %s" % self.Q,
"center_line: %0.3f" % self.center_line,
"control_limits: %d, %0.3f" % (self.lcl, self.ucl),
"out_of_control: %s" % self.out_of_control,
]
return "\n".join(msg)
def __repr__(self):
"""Return the string representation"""
return str(self)
@property
def observations(self):
"""Number of observations in data
Returns
----------
int
Number of rows in data
"""
return self.data.shape[0]
@property
def variable_count(self):
"""Number of variables in data
Returns
----------
int
Number of columns in data"""
return self.data.shape[1]
@property
def Q(self):
"""Calculate Hotelling T^2 statistic (Q) from a 2-D numpy array
Returns
-------
np.ndarray
A numpy array of Hotelling T^2 (1-D of length N)
"""
Q = np.zeros(np.size(self.data, 0))
D_bar = np.mean(self.data, axis=0)
S = np.cov(self.data.T)
S_inv = np.linalg.inv(S)
observations = np.size(self.data, 0)
for i in range(observations):
spread = self.data[i, :] - D_bar
Q[i] = np.matmul(np.matmul(spread, S_inv), spread)
return Q
@property
def center_line(self):
"""Center line for the control chart
Returns
----------
float
Median value of beta distribution.
"""
return self.get_control_limit(0.5)
@property
def control_limits(self):
"""Lower and Upper control limits
Returns
----------
lcl : float
Lower Control Limit (LCL). This is fixed to 0 for Hotelling T2
ucl : float
Upper Control Limit (UCL)
"""
return self.lcl, self.ucl
@property
def ucl(self):
"""Upper control limit
Returns
----------
ucl : float
Upper Control Limit (UCL)"""
return self.get_control_limit(1 - self.alpha / 2)
@property
def out_of_control(self):
"""Indices of out-of-control observations
Returns
----------
np.ndarray
An array of indices that are greater than the upper
control limit. (NOTE: Q is never negative)
"""
return np.argwhere(self.Q > self.ucl).T[0]
[docs] def get_control_limit(self, x):
"""Calculate a Hotelling T^2 control limit using a beta distribution
Parameters
----------
x : float
Value where the beta function is evaluated
Returns
-------
float
The control limit for a beta distribution
"""
N = self.observations
a = self.variable_count / 2
b = (N - self.variable_count - 1) / 2
return ((N - 1) ** 2 / N) * scipy_stats.beta.ppf(x, a, b)
@property
def chart_data(self):
"""JSON compatible dict for chart generation
Returns
----------
dict
Data used for Histogram visuals. Keys include 'x', 'y',
'out_of_control', 'center_line', 'lcl', 'ucl'
"""
return {
"x": list(range(1, self.observations + 1)),
"y": self.Q.tolist(),
"out_of_control": self.out_of_control.tolist(),
"center_line": float(self.center_line),
"lcl": float(self.lcl),
"ucl": float(self.ucl),
}
[docs]class RiskAdjustedControlChart(ControlChart):
"""Calculate a risk-adjusted univariate Control Chart (with linear MVR)
Parameters
----------
X : array-like
Independent data
y : list, np.ndarray
Input data (1-D)
std : int, float, optional
Number of standard deviations used to calculate if a y-value is
out-of-control.
ucl_limit : float, optional
Limit the upper control limit to this value
lcl_limit : float, optional
Limit the lower control limit to this value
saved_reg : MultiVariableRegression, optional
Optionally provide a previously calculated regression
var_names : list, optional
Optionally provide names of the variables
back_elim : bool
Automatically perform backward elimination if True
back_elim_p : float
p-value threshold for backward elimination
"""
def __init__(
self,
X,
y,
std=3,
ucl_limit=None,
lcl_limit=None,
x=None,
saved_reg=None,
var_names=None,
back_elim=False,
back_elim_p=0.05,
):
"""Calculate control limits for a standard univariate Control Chart"""
self.y_raw = y
self.mvr = MultiVariableRegression(
X,
y,
saved_reg=saved_reg,
var_names=var_names,
back_elim=back_elim,
back_elim_p=back_elim_p,
)
ControlChart.__init__(
self,
self.mvr.residuals,
std=std,
ucl_limit=ucl_limit,
lcl_limit=lcl_limit,
x=x,
)
[docs]class PCA(sklearn_PCA):
"""Principal Component Analysis with sklearn.decomposition.PCA
Parameters
----------
X : np.ndarray
Training data (2-D), where n_samples is the number of samples and
n_features is the number of features. shape (n_samples, n_features)
var_names : list, optional
Optionally provide names of the features
n_components : int, float, None or str
Number of components to keep. if n_components is not set all
components are kept:
n_components == min(n_samples, n_features)
If n_components == 'mle' and svd_solver == 'full', Minka’s MLE
is used to guess the dimension. Use of n_components == 'mle'
will interpret svd_solver == 'auto' as svd_solver == 'full'.
If 0 < n_components < 1 and svd_solver == 'full', select the
number of components such that the amount of variance that
needs to be explained is greater than the percentage specified
by n_components.
If svd_solver == 'arpack', the number of components must be
strictly less than the minimum of n_features and n_samples.
transform : bool
Fit the model and apply the dimensionality reduction
kwargs : any
Provide any keyword arguments for sklearn.decomposition.PCA:
https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html
"""
def __init__(
self, X, var_names=None, n_components=0.95, transform=True, **kwargs
):
"""Initialize PCA and perform fit."""
self.X = X
self.var_names = (
list(range(self.X.shape[1])) if var_names is None else var_names
)
sklearn_PCA.__init__(self, n_components=n_components, **kwargs)
if transform:
self.fit_transform(self.X)
else:
self.fit(self.X)
@property
def feature_map_data(self):
"""Used for feature analysis heat map
Returns
----------
np.ndarray Shape (n_components, n_features)
Principal axes in feature space, representing the directions of
maximum variance in the data. The components are sorted by
explained_variance.
"""
return self.components_
@property
def component_labels(self):
"""Get component names
Returns
----------
list
Labels for plotting. (1st Comp, 2nd Comp, 3rd Comp, etc.)
"""
return [
"%s Comp" % (get_ordinal(n + 1))
for n in range(self.components_.shape[0])
]
[docs]class CorrelationMatrix:
"""Pearson-R correlation matrix
Parameters
----------
X : np.ndarray
Input data (2-D) with N rows of observations and
p columns of variables.
corr_type : str
Either "Pearson" or "Spearman"
"""
def __init__(self, X, corr_type="Pearson"):
"""Initialization of CorrelationMatrix object"""
self.X = X
self.corr_type = corr_type.lower()
if self.corr_type not in {"pearson", "spearman"}:
msg = "Invalid corr_type: must be either 'Pearson' or 'Spearman'"
raise NotImplementedError(msg)
func_map = {
"pearson": pearson_correlation_matrix,
"spearman": spearman_correlation_matrix,
}
if self.corr_type in list(func_map):
self.corr, self.p = func_map[self.corr_type](self.X)
@property
def normality(self):
"""The normality and normality p-value of the input array
Returns
----------
statistic : np.ndarray
Normality calculated with scipy.stats.normaltest
p-value : np.ndarray
A 2-sided chi squared probability for the hypothesis test.
"""
return scipy_stats.normaltest(self.X, nan_policy="omit")
@property
def chart_data(self):
"""JSON compatible dict for chart generation
Returns
----------
dict
Data used for Histogram visuals. Keys include 'corr', 'p',
'norm', 'norm_p'
"""
norm, norm_p = self.normality
return {
"corr": self.corr.tolist(),
"p": self.p.tolist(),
"norm": norm.tolist(),
"norm_p": norm_p.tolist(),
}
#########################################################
# Pure stats functions
#########################################################
[docs]def get_lin_reg_p_values(X, y, predictions, y_intercept, slope):
"""
Get p-values of a linear regression using sklearn
based on https://stackoverflow.com/questions/27928275/find-p-value-significance-in-scikit-learn-linearregression
Parameters
----------
X : np.ndarray
Independent data
y : np.ndarray, list
Dependent data
predictions : np.ndarray, list
Predictions using the linear regression.
(Output from linear_model.LinearRegression.predict)
y_intercept : float, np.ndarray
The y-intercept of the linear regression
slope : float, np.ndarray
The slope of the linear regression
Returns
----------
p_value : np.ndarray
p-value of the linear regression coefficients
std_errs : np.ndarray
standard errors of the linear regression coefficients
t_value : np.ndarray
t-values of the linear regression coefficients
"""
newX = np.append(np.ones((len(X), 1)), X, axis=1)
mse = (sum((y - predictions) ** 2)) / (len(newX) - len(newX[0]))
variance = mse * (np.linalg.inv(np.dot(newX.T, newX)).diagonal())
std_errs = np.sqrt(variance)
params = np.append(y_intercept, slope)
t_values = np.array(params) / std_errs
p_value = np.array(
[
2 * (1 - scipy_stats.t.cdf(np.abs(i), (len(newX) - 1)))
for i in t_values
]
)
return p_value, std_errs, t_values
[docs]def pearson_correlation_matrix(X):
"""Calculate a correlation matrix of Pearson-R values
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where n_samples is the number of samples and
n_features is the number of features.
Returns
----------
r : np.ndarray
Array (2-D) of Pearson-R correlations between the row indexed and
column indexed variables
p : np.ndarray
Array (2-D) of p-values associated with r
"""
r = np.ones([X.shape[1], X.shape[1]]) # Pearson R
p = np.zeros([X.shape[1], X.shape[1]]) # p-value of Pearson R
for x in range(X.shape[1]):
for y in range(X.shape[1]):
if x > y:
# Pearson r requires that both sets of data be of the same
# length. Remove index if NaN in either variable.
valid_x = ~np.isnan(X[:, x])
valid_y = ~np.isnan(X[:, y])
include = np.full(len(X[:, x]), True)
for i in range(len(valid_x)):
include[i] = valid_x[i] and valid_y[i]
x_data = X[include, x]
y_data = X[include, y]
r[x, y], p[x, y] = scipy_stats.pearsonr(x_data, y_data)
# These matrices are symmetric
r[y, x] = r[x, y]
p[y, x] = p[x, y]
return r, p
[docs]def spearman_correlation_matrix(X, nan_policy="omit"):
"""Calculate a Spearman correlation matrix
Parameters
----------
X : array-like, shape (n_samples, n_features)
Training data, where n_samples is the number of samples and
n_features is the number of features.
nan_policy : str
Value must be one of the following: ‘propagate’, ‘raise’, ‘omit’
Defines how to handle when input contains nan. The following options
are available (default is ‘omit’):
‘propagate’: returns nan
‘raise’: throws an error
‘omit’: performs the calculations ignoring nan values
Returns
----------
correlation : float or ndarray (2-D square)
Spearman correlation matrix or correlation coefficient (if only 2
variables are given as parameters. Correlation matrix is square with
length equal to total number of variables (columns or rows) in a and
b combined.
p-value : float
The two-sided p-value for a hypothesis test whose null hypothesis is
that two sets of data are uncorrelated, has same dimension as rho.
"""
return scipy_stats.spearmanr(X, nan_policy=nan_policy)
[docs]def moving_avg(y, avg_len, x=None, weight=None):
"""Calculate the moving (rolling) average of a set of data
Parameters
----------
y : array-like
data (1-D) to be averaged
avg_len : int
Data is averaged over this many points (current value and avg_len - 1
prior points)
x : np.ndarray, list, optional
Optionally specify the x-axis values. Otherwise index+1 is used.
weight : np.ndarray, list, optional
A weighted moving average is calculated based on the provided weights.
weight must be of same length as y. Weights of one are assumed by
default.
Returns
----------
x : np.ndarray
Resulting x-values for the moving average
moving_avg : np.ndarray
moving average values
"""
x = np.linspace(1, len(y), len(y)) if x is None else x
weight = np.ones_like(y) if weight is None else weight
cumsum, moving_aves, x_final = [0], [], []
for i, yi in enumerate(y, 1):
cumsum.append(cumsum[i - 1] + yi / weight[i - 1])
if i >= avg_len:
moving_ave = (cumsum[i] - cumsum[i - avg_len]) / avg_len
moving_aves.append(moving_ave)
x_final = [x[i] for i in range(avg_len - 1, len(x))]
return np.array(x_final), np.array(moving_aves)
[docs]def box_cox(arr, alpha=None, lmbda=None, const_policy="propagate"):
"""
Parameters
----------
arr : np.ndarray
Input array. Must be positive 1-dimensional.
lmbda : None, scalar, optional
If lmbda is not None, do the transformation for that value.
If lmbda is None, find the lambda that maximizes the
log-likelihood function and return it as the second output
argument.
alpha : None, float, optional
If alpha is not None, return the 100 * (1-alpha)% confidence
interval for lmbda as the third output argument. Must be between
0.0 and 1.0.
const_policy : str
{‘propagate’, ‘raise’, 'omit'}
Defines how to handle when data is constant. The following
options are available (default is ‘propagate’):
‘propagate’: returns nan
‘raise’: throws an error
Returns
----------
box_cox : np.ndarray
Box-Cox power transformed array
"""
try:
box_cox_data, _ = scipy_stats.boxcox(arr, alpha=alpha, lmbda=lmbda)
except ValueError as e:
if const_policy == "propagate":
return np.array([np.nan] * arr.shape[0])
raise e
return box_cox_data
[docs]def avg_moving_range(arr, nan_policy="omit"):
"""Calculate the average moving range (over 2-consecutive point1)
Parameters
----------
arr : array-like (1-D)
Input array. Must be positive 1-dimensional.
nan_policy : str, optional
Value must be one of the following: {‘propagate’, ‘raise’, ‘omit’}
Defines how to handle when input contains nan. The following options
are available (default is ‘omit’):
‘propagate’: returns nan
‘raise’: throws an error
‘omit’: performs the calculations ignoring nan values
Returns
----------
np.ndarray, np.nan
Average moving range. Returns NaN if arr is empty
"""
arr = process_nan_policy(arr, nan_policy)
if len(arr) == 0:
return np.nan
return np.mean(np.absolute(np.diff(arr)))
###################################################
# Stats related utilities
###################################################
[docs]def remove_nan(arr):
"""Remove indices from 1-D array with values of np.nan
Parameters
----------
arr : np.ndarray (1-D)
Input array. Must be positive 1-dimensional.
Returns
----------
np.ndarray
arr with NaN values deleted
"""
return arr[~np.isnan(arr)]
[docs]def remove_const_column(arr):
"""Remove all columns with zero variance
Parameters
----------
arr : np.ndarray
Input array (2-D)
Returns
----------
np.ndarray
Input array with columns of a constant value removed
"""
return arr[:, ~np.all(np.isnan(arr), axis=0)]
[docs]def is_nan_arr(arr):
"""Check if array has only NaN elements
Parameters
----------
arr : np.ndarray
Input array
Returns
----------
bool
True if all elements are np.nan
"""
return np.all(np.isnan(arr))
[docs]def is_arr_constant(arr):
"""Determine if data by var_name is constant
Parameters
----------
arr : array-like
Input array or object that can be converted to an array
Returns
----------
bool
True if all values the same (i.e., no variation)
"""
return np.all(arr == arr[0])
[docs]def process_nan_policy(arr, nan_policy):
"""Calculate the average moving range (over 2-consecutive point1)
Parameters
----------
arr : array-like (1-D)
Input array. Must be positive 1-dimensional.
nan_policy : str
Value must be one of the following: {‘propagate’, ‘raise’, ‘omit’}
Defines how to handle when input contains nan. The following options
are available (default is ‘omit’):
‘propagate’: returns nan
‘raise’: throws an error
‘omit’: performs the calculations ignoring nan values
Returns
----------
np.ndarray, np.nan
Input array evaluated per nan_policy
"""
arr_no_nan = remove_nan(arr)
if len(arr_no_nan) != len(arr):
if nan_policy == "raise":
msg = "NaN values are not supported for avg_moving_range"
raise NotImplementedError(msg)
if nan_policy == "propagate":
return np.nan
if nan_policy == "omit":
return arr_no_nan
return arr
[docs]def get_ordinal(n):
"""Convert number to its ordinal (e.g., 1 to 1st)
Parameters
----------
n : int
Number to be converted to ordinal
Returns
----------
str
the ordinal of n
"""
return "%d%s" % (
n,
"tsnrhtdd"[(n // 10 % 10 != 1) * (n % 10 < 4) * n % 10 :: 4],
)