"""
hhpy.ds.py
~~~~~~~~~~~~~~~~
Contains DataScience functions extending on pandas and sklearn
"""
# standard imports
import numpy as np
import pandas as pd
import warnings
# third party imports
from copy import deepcopy
from scipy import stats, signal
from scipy.spatial import distance
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error, median_absolute_error
from sklearn.preprocessing import StandardScaler
from typing import Mapping, Sequence, Callable, Union, List, Optional
# local imports
from hhpy.main import export, force_list, tprint, progressbar, qformat, list_intersection, round_signif, is_list_like, \
dict_list, append_to_dict_list, concat_cols
# --- functions
[docs]@export
def optimize_pd(df: pd.DataFrame, c_int: bool = True, c_float: bool = True, c_cat: bool = True, cat_frac: bool = .5) \
-> pd.DataFrame:
"""
optimize memory usage of a pandas df, automatically downcast all var types and converts objects to categories
:param df: pandas DataFrame to be optimized. Other objects are implicitly cast to DataFrame
:param c_int: whether to downcast integers
:param c_float: whether to downcast floats
:param c_cat: whether to cast objects to categories. Uses cat_frac as condition
:param cat_frac: if c_cat is True and the column has less than cat_frac unique values it will be cast to category
:return: the optimized pandas DataFrame
"""
_df = pd.DataFrame(df).copy()
del df
# check for duplicate columns
_duplicate_columns = get_duplicate_cols(_df)
if len(_duplicate_columns) > 0:
warnings.warn('duplicate columns found: {}'.format(_duplicate_columns))
_df = drop_duplicate_cols(_df)
if c_int:
_df_int = _df.select_dtypes(include=['int'])
for d_col in _df_int.columns:
# you can only use unsigned if all values are positive
if ~((_df_int[d_col] > 0).all()):
_df_int = _df_int.drop(d_col, axis=1)
converted_int = _df_int.apply(pd.to_numeric, downcast='unsigned')
_df[converted_int.columns] = converted_int
if c_float:
_df_float = _df.select_dtypes(include=['float'])
converted_float = _df_float.apply(pd.to_numeric, downcast='float')
_df[converted_float.columns] = converted_float
if c_cat:
_df_obj = _df.select_dtypes(include=['object'])
converted_obj = pd.DataFrame()
for col in _df_obj.columns:
num_unique_values = len(_df_obj[col].unique())
num_total_values = len(_df_obj[col])
if num_unique_values / num_total_values < (1 - cat_frac):
converted_obj.loc[:, col] = _df_obj[col].astype('category')
else:
converted_obj.loc[:, col] = _df_obj[col]
_df[converted_obj.columns] = converted_obj
return _df
[docs]@export
def get_df_corr(df: pd.DataFrame, target: str = None, groupby: Union[str, list] = None) -> pd.DataFrame:
"""
returns a pandas DataFrame containing all pearson correlations in a melted format
:param df: input pandas DataFrame. Other objects are implicitly cast to DataFrame
:param target: if target is specified: returns only correlations that involve the target column
:param groupby: if groupby is specified: returns correlations for each level of the group
:return: pandas DataFrame containing all pearson correlations in a melted format
"""
# avoid inplace operations
_df = df.copy()
del df
# if there is a column called index it will create problems so rename it to '__index__'
_df = _df.rename({'index': '__index__'}, axis=1)
# add dummy if no group by
if groupby is None:
groupby = ['_dummy']
_df['_dummy'] = 1
# setting target makes the df_corr only contain correlations that involve the target
_cols = _df.select_dtypes(include=np.number).columns
_df_corr = []
for _index, _df_i in _df.groupby(groupby):
# get corr
_df_corr_i = _df_i.corr().reset_index().rename({'index': 'col_0'}, axis=1)
# set upper right half to nan
for _i in range(len(_cols)):
_col = _cols[_i]
_df_corr_i[_col] = np.where(_df_corr_i[_col].index <= _i, np.nan, _df_corr_i[_col])
# gather / melt
_df_corr_i = pd.melt(_df_corr_i, id_vars=['col_0'], var_name='col_1', value_name='corr').dropna()
# drop self correlation
_df_corr_i = _df_corr_i[_df_corr_i['col_0'] != _df_corr_i['col_1']]
# get identifier
for _groupby in force_list(groupby):
_df_corr_i[_groupby] = _df_i[_groupby].iloc[0]
_df_corr.append(_df_corr_i)
_df_corr = df_merge(_df_corr)
_df_corr = col_to_front(_df_corr, groupby)
if '_dummy' in _df_corr.columns:
_df_corr.drop('_dummy', axis=1, inplace=True)
# reorder and keep only columns involving the target (if applicable)
if target is not None:
# if the target is col_1: switch it to col_0
_target_is_col_1 = (_df_corr['col_1'] == target)
_df_corr['col_1'] = np.where(_target_is_col_1, _df_corr['col_0'], _df_corr['col_1'])
_df_corr['col_0'] = np.where(_target_is_col_1, target, _df_corr['col_0'])
# keep only target in col_0
_df_corr = _df_corr[_df_corr['col_0'] == target]
# get absolute correlation
_df_corr['corr_abs'] = np.abs(_df_corr['corr'])
# sort descending
_df_corr = _df_corr.sort_values(['corr_abs'], ascending=False).reset_index(drop=True)
return _df_corr
[docs]@export
def drop_zero_cols(df: pd.DataFrame) -> pd.DataFrame:
"""
Drop columns with all 0 or None Values from DataFrame. Useful after applying one hot encoding.
:param df: pandas DataFrame
:return: pandas DataFrame without 0 columns.
"""
# noinspection PyUnresolvedReferences
return df[df.columns[(df != 0).any()]]
[docs]@export
def get_duplicate_indices(df: pd.DataFrame) -> Sequence:
"""
Returns duplicate indices from a pandas DataFrame
:param df: pandas DataFrame
:return: List of indices that are duplicate
"""
return df.index[df.index.duplicated()]
[docs]@export
def get_duplicate_cols(df: pd.DataFrame) -> Sequence:
"""
Returns names of duplicate columns from a pandas DataFrame
:param df: pandas DataFrame
:return: List of column names that are duplicate
"""
return df.columns[df.columns.duplicated()]
[docs]@export
def drop_duplicate_indices(df: pd.DataFrame) -> pd.DataFrame:
"""
Drop duplicate indices from pandas DataFrame
:param df: pandas DataFrame
:return: pandas DataFrame without the duplicates indices
"""
return df.loc[~df.indices.duplicated(), :]
[docs]@export
def drop_duplicate_cols(df: pd.DataFrame) -> pd.DataFrame:
"""
Drop duplicate columns from pandas DataFrame
:param df: pandas DataFrame
:return: pandas DataFrame without the duplicates columns
"""
return df.loc[:, ~df.columns.duplicated()]
[docs]@export
def change_span(s: pd.Series, steps: int = 5) -> pd.Series:
"""
return a True/False series around a changepoint, used for filtering stepwise data series in a pandas df
must be properly sorted!
:param s: pandas Series or similar
:param steps: number of steps around the changepoint to flag as true
:return: pandas Series of dtype Boolean
"""
return pd.Series(s.shift(-steps).ffill() != s.shift(steps).bfill())
[docs]@export
def outlier_to_nan(df: pd.DataFrame, col: str, groupby: Union[list, str] = None, std_cutoff: np.number = 3,
reps: int = 1, do_print: bool = False) -> pd.DataFrame:
"""
this algorithm cuts off all points whose DELTA (avg diff to the prev and next point) is outside of the n std range
:param df: pandas DataFrame
:param col: column to be filtered
:param groupby: if provided: applies std filter by group
:param std_cutoff: the number of standard deviations outside of which to set values to None
:param reps: how many times to repeat the algorithm
:param do_print: whether to print steps to console
:return: pandas DataFrame with outliers set to nan
"""
_df = df.copy()
del df
if groupby is None:
_df['__groupby'] = 1
groupby = '__groupby'
for _rep in range(reps):
if do_print:
tprint('rep = ' + str(_rep + 1) + ' of ' + str(reps))
# grouped by df
_df_out_grouped = _df.groupby(groupby)
_df['_dummy'] = _df[col]
# use interpolation to treat missing values
_df['_dummy'] = _df_out_grouped['_dummy'].transform(pd.DataFrame.interpolate)
# calculate delta (mean of diff to previous and next value)
_df['_dummy_delta'] = .5 * (
np.abs(_df['_dummy'] - _df_out_grouped['_dummy'].shift(1).bfill()) +
np.abs(_df['_dummy'] - _df_out_grouped['_dummy'].shift(-1).ffill())
)
_df_mean = _df_out_grouped[['_dummy_delta']].mean().rename({'_dummy_delta': '_dummy_mean'}, axis=1)
_df_std = _df_out_grouped[['_dummy_delta']].std().rename({'_dummy_delta': '_dummy_std'}, axis=1)
_df_cutoff = _df_mean.join(_df_std).reset_index()
_df = pd.merge(_df, _df_cutoff, on=groupby, how='inner')
_df[col] = np.where(
np.abs(_df['_dummy_delta'] - _df['_dummy_mean']) <= (std_cutoff * _df['_dummy_std']),
_df[col], np.nan)
_df = _df.drop(['_dummy', '_dummy_mean', '_dummy_std', '_dummy_delta'], axis=1)
if '__groupby' in _df.columns:
_df = _df.drop('__groupby', axis=1)
return _df
[docs]@export
def butter_pass_filter(data: pd.Series, cutoff: int, fs: int, order: int, btype: str = None, shift: bool = False):
"""
Implementation of a highpass / lowpass filter using scipy.signal.butter
:param data: pandas Series or 1d numpy Array
:param cutoff: cutoff
:param fs: critical frequencies
:param order: order of the fit
:param btype: The type of filter. Passed to scipy.signal.butter. Default is ‘lowpass’.
One of {‘lowpass’, ‘highpass’, ‘bandpass’, ‘bandstop’}
:param shift: whether to shift the data to start at 0
:return: 1d numpy array containing the filtered data
"""
def _f_butter_pass(_f_cutoff, _f_fs, _f_order, _f_btype):
_nyq = 0.5 * _f_fs
_normal_cutoff = _f_cutoff / _nyq
# noinspection PyTupleAssignmentBalance
__b, __a = signal.butter(_f_order, _normal_cutoff, btype=_f_btype, analog=False, output='ba')
return __b, __a
_data = np.array(data)
if shift:
_shift = pd.Series(data).iloc[0]
else:
_shift = 0
_data -= _shift
_b, _a = _f_butter_pass(_f_cutoff=cutoff, _f_fs=fs, _f_order=order, _f_btype=btype)
_y = signal.lfilter(_b, _a, _data)
_y = _y + _shift
return _y
[docs]@export
def pass_by_group(df: pd.DataFrame, col: str, groupby: Union[str, list], btype: str, shift: bool = False,
cutoff: int = 1, fs: int = 20, order: int = 5):
"""
allows applying a butter_pass filter by group
:param df: pandas DataFrame
:param col: column to filter
:param groupby: columns to groupby
:param btype: The type of filter. Passed to scipy.signal.butter. Default is ‘lowpass’.
One of {‘lowpass’, ‘highpass’, ‘bandpass’, ‘bandstop’}
:param shift: shift: whether to shift the data to start at 0
:param cutoff: cutoff
:param fs: critical frequencies
:param order: order of the filter
:return: filtered DataFrame
"""
_df = df.copy()
del df
_df_out_grouped = _df.groupby(groupby)
# apply highpass filter
_df[col] = np.concatenate(
_df_out_grouped[col].apply(butter_pass_filter, cutoff, fs, order, btype, shift).values).flatten()
_df = _df.reset_index(drop=True)
return _df
[docs]@export
def lfit(x: Union[pd.Series, str], y: Union[pd.Series, str] = None, w: Union[pd.Series, str] = None,
df: pd.DataFrame = None, groupby: Union[list, str] = None, do_print: bool = True,
catch_error: bool = False, return_df: bool = False, extrapolate: bool = None):
"""
quick linear fit with numpy
:param x: names of x variables in df or vector data, if y is None treated as target and fit against the index
:param y: names of y variables in df or vector data [optional]
:param w: names of weight variables in df or vector data [optional]
:param df: pandas DataFrame containing x,y,w data [optional]
:param groupby: If specified the linear fit is applied by group [optional]
:param do_print: whether to print steps to console
:param catch_error: whether to keep going in case of error [optional]
:param return_df: whether to return a DataFrame or Series [optional]
:param extrapolate: how many iteration to extrapolate [optional]
:return: if return_df is True: pandas DataFrame, else: pandas Series
"""
if df is None:
if 'name' in dir(x):
_x_name = x.name
else:
_x_name = 'x'
if 'name' in dir(y):
_y_name = y.name
else:
_y_name = 'x'
if 'name' in dir(w):
_w_name = w.name
else:
_w_name = 'x'
_df = pd.DataFrame({
_x_name: x,
_y_name: y,
_w_name: w
})
else:
_df = df.copy()
del df
_x_name = x
_y_name = y
_w_name = w
_y_name_fit = '{}_fit'.format(_y_name)
if groupby is None:
groupby = '__groupby'
_df[groupby] = 1
_it_max = _df[groupby].drop_duplicates().shape[0]
_df_fit = []
for _it, (_index, _df_i) in enumerate(_df.groupby(groupby)):
if do_print and _it_max > 1:
progressbar(_it, _it_max, print_prefix=qformat(_index))
if y is None:
_x = _df_i.index
_y = _df_i[_x_name]
else:
_x = _df_i[_x_name]
_y = _df_i[_y_name]
if w is not None:
_w = _df_i[_w_name]
_w = _w.astype(float)
else:
_w = None
_x = _x.astype(float)
_y = _y.astype(float)
_idx = np.isfinite(_x) & np.isfinite(_y)
if _w is not None:
_w_idx = _w[_idx]
else:
_w_idx = None
if catch_error:
try:
_fit = np.poly1d(np.polyfit(x=_x[_idx], y=_y[_idx], deg=1, w=_w_idx))
except Exception as _exc:
warnings.warn('handled exception: {}'.format(_exc))
_fit = None
else:
_fit = np.poly1d(np.polyfit(x=_x[_idx], y=_y[_idx], deg=1, w=_w_idx))
_x_diff = _x.diff().mean()
_x = list(_x)
_y = list(_y)
if _fit is None:
_y_fit = _y
else:
if extrapolate is not None:
for _ext in range(extrapolate):
_x.append(np.max(_x) + _x_diff)
_y.append(np.nan)
_y_fit = _fit(_x)
_df_i[_x_name] = _x
_df_i[_y_name] = _y
_df_i[_y_name_fit] = _y_fit
_df_fit.append(_df_i)
_df_fit = df_merge(_df_fit)
if do_print and _it_max > 1:
progressbar()
if return_df:
return _df_fit
else:
return _df_fit[_y_name_fit]
[docs]@export
def qf(df: pd.DataFrame, fltr: Union[pd.DataFrame, pd.Series, Mapping], remove_unused_categories: bool = True,
reset_index: bool = False):
"""
quickly filter a DataFrame based on equal criteria. All columns of fltr present in df are filtered
to be equal to the first entry in filter_df.
:param df: pandas DataFrame to be filtered
:param fltr: filter condition as DataFrame or Mapping or Series
:param remove_unused_categories: whether to remove unused categories from categorical dtype after filtering
:param reset_index: whether to reset index after filtering
:return: filtered pandas DataFrame
"""
_df = df.copy()
del df
# filter_df can also be a dictionary, in which case pd.DataFrame.from_dict will be applied
if isinstance(fltr, Mapping):
_filter_df = pd.DataFrame(fltr, index=[0])
# if the filter_df is a series, attempt to cast to data frame
elif isinstance(fltr, pd.Series):
_filter_df = pd.DataFrame(fltr).T
# assume it to be a DataFrame
else:
_filter_df = fltr.copy()
del fltr
# drop columns not in
_filter_df = _filter_df[list_intersection(_filter_df.columns, _df.columns)]
# init filter
_filter_iloc = _filter_df.iloc[0]
# create a dummy boolean of all trues with len of df
_filter_condition = (_df.index == _df.index)
# logical and filter for all columns in filter df
for _col in _filter_df.columns:
_filter_condition = _filter_condition & (_df[_col] == _filter_iloc[_col])
# create filtered df
_df = _df[_filter_condition]
# remove_unused_categories
if remove_unused_categories:
for _cat in _df.select_dtypes(include='category').columns:
_df[_cat] = _df[_cat].cat.remove_unused_categories()
if reset_index:
_df = _df.reset_index(drop=True)
# return
return _df
[docs]@export
def quantile_split(s: pd.Series, n: int, signif: int = 2, na_to_med: bool = False):
"""
splits a numerical column into n quantiles. Useful for mapping numerical columns to categorical columns
:param s: pandas Series to be split
:param n: number of quantiles to split into
:param signif: number of significant digits to round to
:param na_to_med: whether to fill na values with median values
:return: pandas Series of dtype category
"""
if len(s.unique()) <= n:
return s
_s = pd.Series(s).astype(float)
_s = np.where(~np.isfinite(_s), np.nan, _s)
_s = pd.Series(_s)
_s_out = _s.apply(lambda _: np.nan)
if na_to_med:
_s = _s.fillna(_s.median())
if signif is not None:
_s = round_signif(_s, signif)
if not isinstance(_s, pd.Series):
_s = pd.Series(_s)
_i = -1
for _q in np.arange(0, 1, 1. / n):
_i += 1
__q_min = np.quantile(_s.dropna().values, _q)
if _q + .1 >= 1:
__q_max = _s.max()
else:
__q_max = np.quantile(_s.dropna().values, _q + .1)
if np.round(_q + .1, 1) == 1.:
__q_max_adj = np.inf
_right_equal_sign = '<='
else:
__q_max_adj = __q_max
_right_equal_sign = '<'
_q_name = 'q{}: {}<=_{}{}'.format(_i, round_signif(__q_min, signif), _right_equal_sign,
round_signif(__q_max, signif))
_s_out = np.where((_s >= __q_min) & (_s < __q_max_adj), _q_name, _s_out)
# get back the old properties of the series (or you'll screw the index)
_s_out = pd.Series(_s_out)
_s_out.name = s.name
_s_out.index = s.index
# convert to cat
_s_out = _s_out.astype('category')
return _s_out
[docs]@export
def acc(y_true: Union[pd.Series, str], y_pred: Union[pd.Series, str], df: pd.DataFrame = None) -> float:
"""
calculate accuracy for a categorical label
:param y_true: true values as name of df or vector data
:param y_pred: predicted values as name of df or vector data
:param df: pandas DataFrame containing true and predicted values [optional]
:return: accuracy a percentage
"""
if df is None:
_y_true = y_true
_y_pred = y_pred
else:
_y_true = df[y_true]
_y_pred = df[y_pred]
_acc = np.sum(_y_true == _y_pred) / len(_y_true)
return _acc
[docs]@export
def rel_acc(y_true: Union[pd.Series, str], y_pred: Union[pd.Series, str], df: pd.DataFrame = None,
target_class: str = None):
"""
relative accuracy of the prediction in comparison to predicting everything as the most common group
:param y_true: true values as name of df or vector data
:param y_pred: predicted values as name of df or vector data
:param df: pandas DataFrame containing true and predicted values [optional]
:param target_class: name of the target class, by default the most common one is used [optional]
:return: accuracy difference as percent
"""
if df is None:
_y_true = 'y_true'
_y_pred = 'y_pred'
_df = pd.DataFrame({
_y_true: y_true,
_y_pred: y_pred
})
else:
_df = df.copy()
_y_true = y_true
_y_pred = y_pred
del df, y_true, y_pred
if target_class is None:
# get acc of pred
_acc = acc(_y_true, _y_pred, df=_df)
# get percentage of most common value
_acc_mc = _df[_y_true].value_counts()[0] / _df.shape[0]
else:
_df_target_class = _df.query('{}=="{}"'.format(_y_true, target_class))
# get acc of pred for target class
_acc = acc(_y_true, _y_pred, df=_df_target_class)
# get percentage of target class
_acc_mc = _df_target_class.shape[0] / _df.shape[0]
# rel acc is diff of both
return _acc - _acc_mc
[docs]@export
def cm(y_true: Union[pd.Series, str], y_pred: Union[pd.Series, str], df: pd.DataFrame = None) -> pd.DataFrame:
"""
confusion matrix from pandas df
:param y_true: true values as name of df or vector data
:param y_pred: predicted values as name of df or vector data
:param df: pandas DataFrame containing true and predicted values [optional]
:return: Confusion matrix as pandas DataFrame
"""
if df is None:
_y_true = deepcopy(y_true)
_y_pred = deepcopy(y_pred)
if 'name' in dir(y_true):
y_true = y_true.name
else:
y_true = 'y_true'
if 'name' in dir(y_pred):
y_pred = y_pred.name
else:
y_true = 'y_pred'
df = pd.DataFrame({
y_true: _y_true,
y_pred: _y_pred
})
else:
_y_true = df[y_true]
_y_pred = df[y_pred]
_cm = df.eval('_count=1').groupby([y_true, y_pred]).agg({'_count': 'count'}).reset_index() \
.pivot_table(index=y_true, columns=y_pred, values='_count')
_cm = _cm.fillna(0).astype(int)
return _cm
[docs]@export
def f1_pr(y_true: Union[pd.Series, str], y_pred: Union[pd.Series, str], df: pd.DataFrame = None, target: str = None,
factor: int = 100) -> pd.DataFrame:
"""
get f1 score, true positive, true negative, missed positive and missed negative rate
:param y_true: true values as name of df or vector data
:param y_pred: predicted values as name of df or vector data
:param df: pandas DataFrame containing true and predicted values [optional]
:param target: level for which to return the rates, by default all levels are returned [optional]
:param factor: factor by which to scale results, default 100 [optional]
:return: pandas DataFrame containing f1 score, true positive, true negative, missed positive
and missed negative rate
"""
if df is None:
_y_true = deepcopy(y_true)
_y_pred = deepcopy(y_pred)
if 'name' in dir(y_true):
y_true = y_true.name
else:
y_true = 'y_true'
if 'name' in dir(y_pred):
y_pred = y_pred.name
else:
y_true = 'y_pred'
df = pd.DataFrame({
y_true: _y_true,
y_pred: _y_pred
})
else:
_y_true = df[y_true]
_y_pred = df[y_pred]
_cm = cm(y_true=y_true, y_pred=y_pred, df=df)
if target is None:
target = _cm.index.tolist()
elif not is_list_like(target):
target = [target]
_f1_pr = []
_tp_sum = 0
_tn_sum = 0
_mp_sum = 0
_mn_sum = 0
_count_true_sum = 0
for _target in target:
if _target in _cm.index:
_count_true = _cm.loc[_target].sum()
else:
_count_true = 0
_count_true_sum += _count_true
if _target in _cm.columns:
_count_pred = _cm[_target].sum()
else:
_count_pred = 0
_perc_pred = _count_pred / _count_true * factor
# true positive: out of predicted as target how many are actually target
try:
_tp_i = _cm[_target][_target]
_tp_sum += _tp_i
except ValueError:
_tp_i = np.nan
# false positive: out of predicted as not target how many are actually not target
try:
_tn_i = _cm.drop(_target, axis=1).drop(_target, axis=0).sum().sum()
_tn_sum += _tn_i
except ValueError:
_tn_i = np.nan
# missed positive: out of true target how many were predicted as not target
try:
_mp_i = _cm.drop(_target, axis=1).loc[_target].sum()
_mp_sum += _mp_i
except ValueError:
_mp_i = np.nan
# missed negative: out of true not target how many were predicted as target
try:
_mn_i = _cm.drop(_target, axis=0)[_target].sum()
_mn_sum += _mn_i
except ValueError:
_mn_i = np.nan
# precision
try:
_precision = _tp_i / (_tp_i + _mn_i) * 100
except ValueError:
_precision = np.nan
# recall
try:
_recall = _tp_i / (_tp_i + _mp_i) * 100
except ValueError:
_recall = np.nan
if np.isnan(_precision) or np.isnan(_recall):
_f1 = np.nan
else:
_f1 = 200 * (_precision / 100. * _recall / 100.) / (_precision / 100. + _recall / 100.)
# to df
_cm_target = pd.DataFrame({
y_true: [_target], 'count': [_count_true], 'F1': [_f1], 'precision': [_precision], 'recall': [_recall]
}).copy()
_f1_pr.append(_cm_target)
_f1_pr = pd.concat(_f1_pr, ignore_index=True, sort=False).set_index(y_true)
return _f1_pr
[docs]@export
def f_score(y_true: Union[pd.Series, str], y_pred: Union[pd.Series, str], df: pd.DataFrame = None, dropna: bool = False,
f: Callable = r2_score, groupby: Union[list, str] = None, f_name: str = None) -> Union[pd.DataFrame, float]:
"""
generic scoring function base on pandas DataFrame.
:param y_true: true values as name of df or vector data
:param y_pred: predicted values as name of df or vector data
:param df: pandas DataFrame containing true and predicted values [optional]
:param dropna: whether to dropna values [optional]
:param f: scoreing function to apply, default is sklearn.metrics.r2_score, should return a scalar value. [optional]
:param groupby: if supplied then the result is returned for each group level [optional]
:param f_name: name of the scoreing function, by default uses .__name__ property of fuction [optional]
:return: if groupby is supplied: pandas DataFrame, else: scalar value
"""
if df is None:
_df = pd.DataFrame()
_y_true = 'y_true'
_y_pred = 'y_pred'
_df[_y_true] = y_true
_df[_y_pred] = y_pred
else:
_y_true = y_true
_y_pred = y_pred
_df = df.copy()
del df
if dropna:
_df = _df.dropna(subset=[_y_true, _y_pred])
if groupby is not None:
_df = _df.dropna(subset=groupby)
if _df.shape[0] == 0:
return np.nan
if groupby is None:
return f(_df[_y_true], _df[_y_pred])
else:
_df_out = []
for _i, _df_group in _df.groupby(groupby):
_df_i = _df_group[force_list(groupby)].head(1)
if f_name is None:
f_name = f.__name__
_df_i[f_name] = f(_df_group[_y_true], _df_group[_y_pred])
_df_out.append(_df_i)
_df_out = df_merge(_df_out)
return _df_out
# shorthand r2
[docs]@export
def r2(*args, **kwargs) -> Union[pd.DataFrame, float]:
"""
wrapper for f_score using sklearn.metrics.r2_score
:param args: passed to f_score
:param kwargs: passed to f_score
:return: if groupby is supplied: pandas DataFrame, else: scalar value
"""
return f_score(*args, f=r2_score, **kwargs)
[docs]@export
def rmse(*args, **kwargs) -> Union[pd.DataFrame, float]:
"""
wrapper for f_score using numpy.sqrt(skearn.metrics.mean_squared_error)
:param args: passed to f_score
:param kwargs: passed to f_score
:return: if groupby is supplied: pandas DataFrame, else: scalar value
"""
def _f_rmse(x, y):
return np.sqrt(mean_squared_error(x, y))
return f_score(*args, f=_f_rmse, **kwargs)
[docs]@export
def mae(*args, **kwargs) -> Union[pd.DataFrame, float]:
"""
wrapper for f_score using skearn.metrics.mean_absolute_error
:param args: passed to f_score
:param kwargs: passed to f_score
:return: if groupby is supplied: pandas DataFrame, else: scalar value
"""
return f_score(*args, f=mean_absolute_error, **kwargs)
[docs]@export
def stdae(*args, **kwargs) -> Union[pd.DataFrame, float]:
"""
wrapper for f_score using the standard deviation of the absolute error
:param args: passed to f_score
:param kwargs: passed to f_score
:return: if groupby is supplied: pandas DataFrame, else: scalar value
"""
def _f_stdae(x, y):
return np.std(np.abs(x - y))
return f_score(*args, f=_f_stdae, **kwargs)
[docs]@export
def medae(*args, **kwargs) -> Union[pd.DataFrame, float]:
"""
wrapper for f_score using skearn.metrics.median_absolute_error
:param args: passed to f_score
:param kwargs: passed to f_score
:return: if groupby is supplied: pandas DataFrame, else: scalar value
"""
return f_score(*args, f=median_absolute_error, **kwargs)
[docs]@export
def corr(*args, **kwargs) -> Union[pd.DataFrame, float]:
"""
wrapper for f_score using pandas.Series.corr
:param args: passed to f_score
:param kwargs: passed to f_score
:return: if groupby is supplied: pandas DataFrame, else: scalar value
"""
def _f_corr(x, y): return pd.Series(x).corr(other=pd.Series(y))
return f_score(*args, f=_f_corr, **kwargs)
[docs]@export
def df_score(df: pd.DataFrame, y_true: Union[List[str], str], pred_suffix: list = None, scores: List[Callable] = None,
pivot: bool = True, scale: Union[dict, list, int] = None,
groupby: Union[list, str] = None) -> pd.DataFrame:
"""
creates a DataFrame displaying various kind of scores
:param df: pandas DataFrame containing the true, pred data
:param y_true: name of the true variable inside df
:param pred_suffix: name of the predicted variable suffixes. Supports multiple predictions.
By default assumed suffix 'pred' [optional]
:param scores: scoring functions to be used [optional]
:param pivot: whether to pivot the DataFrame for easier readability [optional]
:param scale: a scale for multiplying the scores, default 1 [optional]
:param groupby: if supplied then the scores are calculated by group [optional]
:return: pandas DataFrame containing al the scores
"""
if pred_suffix is None:
pred_suffix = ['pred']
if scores is None:
scores = [r2, rmse, mae, stdae, medae]
else:
scores = force_list(scores)
_df = df.copy()
del df
if groupby is None:
_groupby = ['_dummy']
_df['_dummy'] = 1
else:
_groupby = force_list(groupby)
_target = force_list(y_true)
_model_names = force_list(pred_suffix)
if isinstance(scale, Mapping):
for _key, _value in scale.items():
_df[_key] *= _value
for _model_name in _model_names:
_df['{}_{}'.format(_key, _model_name)] *= _value
elif is_list_like(scale):
_i = -1
# noinspection PyTypeChecker
for _scale in scale:
_i += 1
_df[_target[_i]] *= _scale
for _model_name in _model_names:
_df['{}_{}'.format(_target[_i], _model_name)] *= _scale
elif scale is not None:
for _y_ref in _target:
_df[_y_ref] *= scale
for _model_name in _model_names:
_df['{}_{}'.format(_y_ref, _model_name)] *= scale
_df_score = dict_list(_groupby + ['y_ref', 'model', 'score', 'value'])
for _y_ref in _target:
for _model_name in _model_names:
for _score in scores:
_y_ref_pred = '{}_{}'.format(_y_ref, _model_name)
if _y_ref_pred not in _df.columns:
raise KeyError('{} not in columns'.format(_y_ref_pred))
for _index, _df_i in _df.groupby(_groupby):
_value = _score(_y_ref, _y_ref_pred, df=_df_i)
_append_dict = {
'y_ref': _y_ref,
'model': _model_name,
'score': _score.__name__,
'value': _value
}
for _groupby_i in _groupby:
_append_dict[_groupby_i] = _df_i[_groupby_i].iloc[0]
append_to_dict_list(_df_score, _append_dict)
_df_score = pd.DataFrame(_df_score)
_pivot_index = ['y_ref', 'model']
if groupby is None:
_df_score = _df_score.drop(['_dummy'], axis=1)
else:
_pivot_index += _groupby
if pivot:
_df_score = _df_score.pivot_table(index=_pivot_index, columns='score', values='value')
return _df_score
[docs]@export
def rmsd(x: str, df: pd.DataFrame, group: str, return_df_paired: bool = False, agg_func: str = 'median',
standardize: bool = False, to_abs: bool = False) -> Union[float, pd.DataFrame]:
"""
calculated the weighted root mean squared difference for a reference columns x by a specific group
:param x: name of the column to calculate the rmsd for
:param df: pandas DataFrame
:param group: groups for which to calculate the rmsd
:param return_df_paired: whether to return the paired DataFrame
:param agg_func: which aggregation to use for the group value, passed to pd.DataFrame.agg
:param standardize: whether to apply Standardization before calculating the rmsd
:param to_abs: whether to cast x to abs before calculating the rmsd
:return: if return_df_paired pandas DataFrame, else rmsd as float
"""
_agg_by_group = '{}_by_group'.format(agg_func)
_df = df.copy()
if to_abs:
_df[x] = _df[x].abs()
if standardize:
_df[x] = (_df[x] - _df[x].mean()) / _df[x].std()
_df = _df.groupby([group]).agg({x: ['count', agg_func]}).reset_index()
_df.columns = ['group', 'count', _agg_by_group]
_df['dummy'] = 1
_df_paired = pd.merge(_df, _df, on='dummy')
_df_paired = _df_paired[_df_paired['group_x'] != _df_paired['group_y']]
_df_paired['weight'] = _df_paired['count_x'] * _df_paired['count_y']
_df_paired['difference'] = _df_paired[_agg_by_group + '_x'] - _df_paired[_agg_by_group + '_y']
_df_paired['weighted_squared_difference'] = _df_paired['weight'] * _df_paired['difference'] ** 2
if return_df_paired:
return _df_paired
else:
return np.sqrt(_df_paired['weighted_squared_difference'].sum() / _df_paired['weight'].sum())
# get a data frame showing the root mean squared difference by group type
[docs]@export
def df_rmsd(x: str, df: pd.DataFrame, groups: Union[list, str] = None, hue: str = None, hue_order: list = None,
sort_by_hue: bool = True, n_quantiles: int = 10, include_rmsd: bool = True, **kwargs):
"""
calculate rmsd for reference column x with multiple other columns and return as DataFrame
:param x: name of the column to calculate the rmsd for
:param df: pandas DataFrame containing the data
:param groups: groups to calculate the rmsd or, defaults to all other columns in the DataFrame [optional]
:param hue: further calculate the rmsd for each hue level [optional]
:param hue_order: sort the hue levels in this order [optional]
:param sort_by_hue: sort the values by hue rather than by group [optional]
:param n_quantiles: numeric columns will be automatically split into this many quantiles [optional]
:param include_rmsd: if False provide only a grouped DataFrame but don't actually calculate the rmsd,
you can use include_rmsd=False to save computation time if you only need the maxperc (used in plotting)
:param kwargs: passed to rmsd
:return: None
"""
# avoid inplace operations
_df = df.copy()
_df_rmsd = pd.DataFrame()
# x / groups can be a list or a scaler
if isinstance(x, list):
_x_list = x
else:
_x_list = [x]
if groups is None:
groups = [_col for _col in _df.columns if _col not in _x_list]
if isinstance(groups, list):
_groups = groups
else:
_groups = [groups]
if hue is not None:
if hue in list(_df.select_dtypes(include=np.number)):
_df[hue] = quantile_split(_df[hue], n_quantiles)
_df[hue] = _df[hue].astype('category').cat.remove_unused_categories()
_hues = _df[hue].cat.categories
else:
_hues = [None]
# loop x
for _x in _x_list:
# loop groups
for _group in _groups:
# eliminate self dependency
if _group == _x:
continue
# numerical data is split in quantiles
if _group in list(_df.select_dtypes(include=np.number)):
_df['_group'] = quantile_split(_df[_group], n_quantiles)
# other data is taken as is
else:
_df['_group'] = _df[_group].copy()
warnings.simplefilter(action='ignore', category=RuntimeWarning)
# if hue is None, one calculation is enough
for _hue in _hues:
if hue is None:
_df_hue = _df
else:
_df_hue = _df[_df[hue] == _hue]
if include_rmsd:
_rmsd = rmsd(x=_x, df=_df_hue, group='_group', **kwargs)
else:
_rmsd = np.nan
_count = len(_df_hue['_group'])
_maxcount = _df_hue['_group'].value_counts().reset_index()['_group'].iloc[0]
_maxperc = _maxcount / _count
_maxlevel = _df_hue['_group'].value_counts().reset_index()['index'].iloc[0]
_df_rmsd_hue = pd.DataFrame(
{'x': _x, 'group': _group, 'rmsd': _rmsd, 'maxperc': _maxperc, 'maxlevel': _maxlevel,
'maxcount': _maxcount, 'count': _count}, index=[0])
if hue is not None:
_df_rmsd_hue[hue] = _hue
_df_rmsd = _df_rmsd.append(_df_rmsd_hue, ignore_index=True, sort=False)
# postprocessing, sorting etc.
if hue is not None:
_df_rmsd[hue] = _df_rmsd[hue].astype('category')
if hue_order is not None:
_hues = hue_order
else:
_hues = _df_rmsd[hue].cat.categories
_df_order = _df_rmsd[_df_rmsd[hue] == _hues[0]].sort_values(by=['rmsd'], ascending=False).reset_index(
drop=True).reset_index().rename({'index': '_order'}, axis=1)[['group', '_order']]
_df_rmsd = pd.merge(_df_rmsd, _df_order)
if sort_by_hue:
_df_rmsd = _df_rmsd.sort_values(by=[hue, '_order']).reset_index(drop=True).drop(['_order'], axis=1)
else:
_df_rmsd = _df_rmsd.sort_values(by=['_order', hue]).reset_index(drop=True).drop(['_order'], axis=1)
else:
_df_rmsd = _df_rmsd.sort_values(by=['rmsd'], ascending=False).reset_index(drop=True)
return _df_rmsd
[docs]@export
def df_p(x: str, group: str, df: pd.DataFrame, hue: str = None, agg_func: str = 'mean', agg: bool = False,
n_quantiles: int = 10):
"""
returns a DataFrame with the p value. See hypothesis testing.
:param x: name of column to evaluate
:param group: name of grouping column
:param df: pandas DataFrame
:param hue: further split by hue level
:param agg_func: standard agg function, passed to pd.DataFrame.agg
:param agg: whether to include standard aggregation
:param n_quantiles: numeric columns will be automatically split into this many quantiles [optional]
:return: pandas DataFrame containing p values
"""
# numeric to quantile
_df, _groupby, _groupby_names, _vars, _df_levels, _levels = df_group_hue(df, group=group, hue=hue, x=x,
n_quantiles=n_quantiles)
_df_p = pd.DataFrame()
# Loop levels
for _i_1 in range(len(_levels)):
for _i_2 in range(len(_levels)):
_level_1 = _levels[_i_1]
_level_2 = _levels[_i_2]
if _level_1 != _level_2:
_s_1 = _df[_df['_label'] == _level_1][x].dropna()
_s_2 = _df[_df['_label'] == _level_2][x].dropna()
# get t test / median test
try:
if agg_func == 'median':
_p = stats.median_test(_s_1, _s_2)[1]
else: # if not median then mean
_p = stats.ttest_ind(_s_1, _s_2, equal_var=False)[1]
except ValueError:
_p = np.nan
_df_dict = {}
if hue is not None:
_df_dict[group] = _df_levels['_group'][_i_1]
_df_dict[group + '_2'] = _df_levels['_group'][_i_2]
_df_dict[hue] = _df_levels['_hue'][_i_1]
_df_dict[hue + '_2'] = _df_levels['_hue'][_i_1]
else:
_df_dict[group] = _level_1
_df_dict[group + '_2'] = _level_2
_df_dict['p'] = _p
_df_p = _df_p.append(pd.DataFrame(_df_dict, index=[0]), ignore_index=True, sort=False)
if agg:
_df_p = _df_p.groupby(_groupby).agg({'p': 'mean'}).reset_index()
return _df_p
# df with various aggregations
def df_agg(x, group, df, hue=None, agg=None, n_quantiles=10, na_to_med=False, p=True,
p_test='mean', sort_by_count=False):
if agg is None:
agg = ['mean', 'median', 'std']
if not isinstance(agg, list):
agg = [agg]
# numeric to quantile
_df, _groupby, _groupby_names, _vars, _df_levels, _levels = df_group_hue(df, group=group, hue=hue, x=x,
n_quantiles=n_quantiles,
na_to_med=na_to_med)
if hue is not None:
_hue = '_hue'
else:
_hue = None
# get agg
_df_agg = _df.groupby(_groupby).agg({'_dummy': 'count', x: agg}).reset_index()
_df_agg.columns = _groupby + ['count'] + agg
if sort_by_count:
_df_agg = _df_agg.sort_values(by=['count'], ascending=False)
if p:
_df_p = df_p(x=x, group='_group', hue=_hue, df=_df, agg_func=p_test, agg=True)
_df_agg = pd.merge(_df_agg, _df_p, on=_groupby)
_df_agg.columns = _groupby_names + [_col for _col in _df_agg.columns if _col not in _groupby]
return _df_agg
# quick function to adjust group and hue to be categorical
def df_group_hue(df, group, hue=None, x=None, n_quantiles=10, na_to_med=False, keep=True):
_df = df.copy()
_hue = None
if keep:
_group = '_group'
if hue is not None:
_hue = '_hue'
else:
_group = group
if hue is not None:
_hue = hue
_groupby = ['_group']
_groupby_names = [group]
_vars = [group]
if hue is not None:
_groupby.append('_hue')
_groupby_names.append(hue)
if hue not in _vars:
_vars.append(hue)
if x is not None:
if x not in _vars:
_vars = [x] + _vars
_df = _df.drop([_col for _col in _df.columns if _col not in _vars], axis=1)
_df[_group] = _df[group].copy()
if hue is not None:
_df[_hue] = _df[hue].copy()
_df['_dummy'] = 1
_df[_group] = _df[group].copy()
if hue is not None:
_df[_hue] = _df[hue].copy()
# - numeric to quantile
# group
if _group in list(_df.select_dtypes(include=np.number)):
_df[_group] = quantile_split(_df[group], n_quantiles, na_to_med=na_to_med)
_df[_group] = _df[_group].astype('category').cat.remove_unused_categories()
# hue
if hue is not None:
if _hue in list(_df.select_dtypes(include=np.number)):
_df[_hue] = quantile_split(_df[hue], n_quantiles, na_to_med=na_to_med)
_df[_hue] = _df[_hue].astype('category').cat.remove_unused_categories()
_df['_label'] = concat_cols(_df, [_group, _hue]).astype('category')
_df_levels = _df[[_group, _hue, '_label']].drop_duplicates().reset_index(drop=True)
_levels = _df_levels['_label']
else:
_df['_label'] = _df[_group]
_df_levels = _df[[_group, '_label']].drop_duplicates().reset_index(drop=True)
_levels = _df_levels['_label']
return _df, _groupby, _groupby_names, _vars, _df_levels, _levels
def order_cols(df, cols):
return df[cols + [_col for _col in df.columns if _col not in cols]]
def df_precision_filter(df, col, precision):
return df[(np.abs(df[col] - df[col].round(precision)) < (1 / (2 * 10 ** (precision + 1))))]
# grouped iterpolate method (avoids .apply failing if one sub group fails)
def grouped_interpolate(df, col, groupby, method=None):
_df = df.copy()
_dfs_i = []
for _index_i, _df_i in df.groupby(groupby):
try:
_df_i[col] = _df_i[col].interpolate(method=method)
except ValueError: # do nothing
_df_i[col] = _df_i[col]
_dfs_i.append(_df_i)
_df_interpolate = pd.concat(_dfs_i)
return _df_interpolate[col]
def time_reg(df, t='t', y='y', t_unit='D', window=10, slope_diff_cutoff=.1, int_diff_cutoff=3, return_df_fit=False):
if slope_diff_cutoff is None:
slope_diff_cutoff = np.iinfo(np.int32).max
if int_diff_cutoff is None:
int_diff_cutoff = np.iinfo(np.int32).max
_t_from = '{}_from'.format(t)
_t_to = '{}_to'.format(t)
_t_i = '{}_i'.format(t)
_t_i_from = '{}_i_from'.format(t)
_t_i_to = '{}_i_to'.format(t)
_y_slope = '{}_slope'.format(y)
_y_int = '{}_int'.format(y)
_y_fit = '{}_fit'.format(y)
_y_r2 = '{}_r2'.format(y)
_y_rmse = '{}_rmse'.format(y)
_df = df[[t, y]].copy().reset_index(drop=True)
_t_min = _df[t].min()
_t_max = _df[t].max()
if isinstance(_df[t].iloc[0], pd.datetime):
_df[_t_i] = (_df[t] - _t_min) / np.timedelta64(1, t_unit)
_t_i_min = 0
_t_i_max = (_df[t].max() - _t_min) / np.timedelta64(1, t_unit)
else:
_df[_t_i] = _df[t]
_t_i_min = _t_min
_t_i_max = _t_max
_df['_y'] = (_df[y] - _df[y].mean()) / _df[y].std()
_df['slope_rolling'] = _df[_t_i].rolling(window, min_periods=0).cov(other=_df['_y'], pairwise=False) / _df[
_t_i].rolling(window, min_periods=0).var()
_df['int_rolling'] = _df['_y'].rolling(window, min_periods=0).mean() - _df['slope_rolling'] * _df[_t_i].rolling(
window, min_periods=0).mean()
_df['slope_rolling_diff'] = np.abs(_df['slope_rolling'].diff())
_df['int_rolling_diff'] = np.abs(_df['int_rolling'].diff())
_df['slope_change'] = _df['slope_rolling_diff'] >= slope_diff_cutoff
_df['int_change'] = _df['int_rolling_diff'] >= int_diff_cutoff
_df['_change'] = (_df['slope_change']) | (_df['int_change'])
_df_phases = _df[_df['_change']][[t, _t_i]]
_df_phases.insert(0, _t_from, _df_phases[t].shift(1).fillna(_t_min))
_df_phases.insert(2, _t_i_from, _df_phases[_t_i].shift(1).fillna(_t_i_min))
_df_phases = _df_phases.rename({t: _t_to, _t_i: _t_i_to}, axis=1)
# append row for last phase
_df_phases = _df_phases.append(
pd.DataFrame({
_t_from: _df_phases[_t_from].max(),
_t_to: _t_max,
_t_i_from: _df_phases[_t_i_from].max(),
_t_i_to: _t_i_max,
}, index=[0]), ignore_index=True, sort=False
)
_df_phases[_y_slope] = np.nan
_df_phases[_y_int] = np.nan
_df_phases[_y_r2] = np.nan
_df_phases[_y_rmse] = np.nan
_df_phases['_keep'] = False
_dfs = []
_continue = False
_t_i_from_row = None
for _i, _row in _df_phases.iterrows():
# check len of the phase: if len is less than window days it will be merged with next phase
_t_i_to_row = _row[_t_i_to]
if not _continue:
_t_i_from_row = _row[_t_i_from]
_df_t = _df[(_df[_t_i] >= _t_i_from_row) & (_df[_t_i] < _t_i_to_row)]
_len_df_t = _df_t.index.max() - _df_t.index.min() + 1
if _len_df_t < window:
_continue = True
continue
else:
_continue = False
_df_phases['_keep'][_i] = True
_df_phases[_t_i_from][_i] = _t_i_from_row
# calculate slope
_y_slope_i = _df_t[_t_i].cov(other=_df_t[y]) / _df_t[_t_i].var()
# calculate intercept
_y_int_i = _df_t[y].mean() - _y_slope_i * _df_t[_t_i].mean()
# calculate y fit
_df_t[_y_fit] = _y_int_i + _df_t[_t_i] * _y_slope_i
_df_phases[_y_slope][_i] = _y_slope_i
_df_phases[_y_int][_i] = _y_int_i
_df_phases[_y_r2][_i] = r2_score(_df_t[y], _df_t[_y_fit])
_df_phases[_y_rmse][_i] = np.sqrt(mean_squared_error(_df_t[y], _df_t[_y_fit]))
_dfs.append(_df_t)
_df_fit = pd.concat(_dfs)
# postprocessing
_df_phases = _df_phases[_df_phases['_keep']].reset_index(drop=True).drop(['_keep'], axis=1)
if return_df_fit:
return _df_fit
else:
return _df_phases
def col_to_front(df, cols):
_cols = force_list(cols)
return df[_cols + [_ for _ in df.columns if _ not in _cols]]
def lr(df, x, y, groupby=None, t_unit='D', do_print=True):
# const
_x_i = '_x_i'
_y_slope = '{}_slope'.format(y)
_y_int = '{}_int'.format(y)
_y_fit = '{}_fit'.format(y)
_y_error = '{}_error'.format(y)
# init
if do_print:
tprint('init')
_df = df[np.isfinite(df[x]) & np.isfinite(df[y])]
if groupby is None:
_df['_dummy'] = 1
groupby = ['_dummy']
elif not is_list_like(groupby):
groupby = [groupby]
_df_out = dict_list(
groupby + [_y_slope, _y_int, 'r2', 'rmse', 'error_mean', 'error_std', 'error_abs_mean', 'error_abs_std'])
if isinstance(_df[x].iloc[0], pd.datetime):
_df[_x_i] = (_df[x] - _df[x].min()) / np.timedelta64(1, t_unit)
else:
_df[_x_i] = _df[x]
# loop groups
_i = 0
_i_max = _df[groupby].drop_duplicates().shape[0]
for _index, _df_i in _df.groupby(groupby):
_i += 1
if do_print:
tprint('Linear Regression Iteration {} / {}'.format(_i, _i_max))
_slope = _df_i[_x_i].cov(other=_df_i[y]) / _df_i[_x_i].var()
_int = _df_i[y].mean() - _slope * _df_i[_x_i].mean()
_df_i[_y_fit] = _slope * _df_i[x] + _int
_df_i[_y_error] = _df_i[_y_fit] - _df_i[y]
_r2 = r2(_df_i[y], _df_i[_y_fit])
_rmse = rmse(_df_i[y], _df_i[_y_fit])
append_to_dict_list(_df_out, _index)
append_to_dict_list(_df_out, {
_y_slope: _slope,
_y_int: _int,
'r2': _r2,
'rmse': _rmse,
'error_mean': _df_i[_y_error].mean(),
'error_std': _df_i[_y_error].std(),
'error_abs_mean': _df_i[_y_error].abs().mean(),
'error_abs_std': _df_i[_y_error].abs().std()
})
_df_out = pd.DataFrame(_df_out)
if '_dummy' in _df_out.columns:
_df_out = _df_out.drop(['_dummy'], axis=1)
if do_print:
tprint('Linear Regression done')
return _df_out
def flatten(lst):
# https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
def _flatten_generator(_lst):
for _x in _lst:
if is_list_like(_x):
for _sub_x in flatten(_x):
yield _sub_x
else:
yield _x
return list(_flatten_generator(lst))
[docs]@export
def df_split(df: pd.DataFrame, split_by: Union[List[str], str], return_type: str = 'dict', print_key: bool = False,
sep: str = '_', key_sep: str = '==') -> Union[list, dict]:
"""
Split a pandas DataFrame by column value and returns a list or dict
:param df: pandas DataFrame to be split
:param split_by: Column(s) to split by, creates a sub-DataFrame for each level
:param return_type: one of ['list', 'dict'], if list returns a list of sub-DataFrame, if dict returns a dictionary
with each level as keys
:param print_key: whether to include the column names in the key labels
:param sep: separator to use in the key labels between columns
:param key_sep: separator to use in the key labels between key and value
:return: see return_type
"""
_split_by = force_list(split_by)
if return_type == 'list':
_dfs = []
else:
_dfs = {}
for _i, _df in df.groupby(_split_by):
if return_type == 'list':
_dfs.append(_df)
else:
_key = qformat(pd.DataFrame(_df[_split_by]).head(1), print_key=print_key, sep=sep, key_sep=key_sep)
_dfs[_key] = _df
return _dfs
# merges a df, wrapper for pd.concat
def df_merge(*args, ignore_index=True, sort=False, **kwargs):
return pd.concat(*args, ignore_index=ignore_index, sort=sort, **kwargs)
def rank(df, rank_by, groupby=None, score_ascending=True, sort_by=None, sort_by_ascending=None):
if sort_by is None:
sort_by = []
_df = df.copy()
del df
if groupby is None:
groupby = ['_dummy']
_df['_dummy'] = 1
_sort_by = force_list(rank_by) + force_list(groupby) + force_list(sort_by)
_df['_row'] = _df.assign(_row=1)['_row'].cumsum()
if sort_by_ascending is None:
_ascending = score_ascending
else:
_ascending = force_list(score_ascending) + [True for _ in groupby] + force_list(sort_by_ascending)
_df = _df.sort_values(by=_sort_by, ascending=_ascending).assign(rank=1)
_df['_rank'] = _df.groupby(groupby)['rank'].cumsum()
_df = _df.sort_values(by='_row')
return _df['_rank']
def kde(x, df=None, x_range=None, perc_cutoff=.1, range_cutoff=None, x_steps=1000):
if df is not None:
_df = df.copy()
del df
if x in ['value', 'perc', 'diff', 'sign', 'ex', 'ex_max', 'ex_min', 'mean', 'std', 'range',
'value_min', 'value_max', 'range_min', 'range_max']:
raise ValueError('x cannot be named {}, please rename your variable'.format(x))
else:
_df = None
# std cutoff = norm(0,1).pdf(1)/norm(0,1).pdf(0)
# 1/e cutoff: range_cutoff = 1-1/e = .63
# full width at half maximum: range_cutoff = .5
if range_cutoff is None or range_cutoff in ['sigma', 'std']:
_range_cutoff = stats.norm(0, 1).pdf(1) / stats.norm(0, 1).pdf(0)
elif range_cutoff in ['e', '1/e', '1-1/e']:
_range_cutoff = 1 - 1 / np.exp(1)
elif range_cutoff in ['fwhm', 'FWHM', 'hm', 'HM']:
_range_cutoff = .5
else:
_range_cutoff = range_cutoff + 0
if _df is not None:
_x = _df[x]
_x_name = x
else:
_x = x
if 'name' in dir(x):
_x_name = x.name
else:
_x_name = 'x'
assert(len(_x) > 0), 'Series {} has zero length'.format(_x_name)
_x = pd.Series(_x).reset_index(drop=True)
_x_name_max = _x_name + '_max'
if x_range is None:
x_range = np.linspace(np.nanmin(_x), np.nanmax(_x), x_steps)
# -- fit kde
_kde = stats.gaussian_kde(_x)
# -- to df
_df_kde = pd.DataFrame({_x_name: x_range, 'value': _kde.evaluate(x_range)})
_df_kde['perc'] = _df_kde['value'] / _df_kde['value'].max()
# -- get extrema
_df_kde['diff'] = _df_kde['value'].diff()
_df_kde['sign'] = np.sign(_df_kde['diff'])
_df_kde['ex_max'] = _df_kde['sign'].diff(-1).fillna(0) > 0
_df_kde['ex_min'] = _df_kde['sign'].diff(-1).fillna(0) < 0
_df_kde['phase'] = _df_kde['ex_min'].astype(int).cumsum()
if perc_cutoff:
_df_kde['ex_max'] = _df_kde['ex_max'].where(_df_kde['perc'] > perc_cutoff, False)
# -- get std
# we get the extrema and do a full merge to find the closest one to each point
_df_kde_ex = _df_kde.query('ex_max')[[_x_name, 'value', 'phase']].reset_index()
_df_kde_ex['mean'] = np.nan
_df_kde_ex['std'] = np.nan
_df_kde_ex['range'] = np.nan
_df_kde_ex['range_min'] = np.nan
_df_kde_ex['range_max'] = np.nan
_df_kde_ex['value_min'] = np.nan
_df_kde_ex['value_max'] = np.nan
for _index, _row in _df_kde_ex.iterrows():
_df_kde_i = _df_kde[_df_kde['phase'] == _row['phase']]
# Width of Peak range
_df_kde_i = _df_kde_i[_df_kde_i['value'] >= _row['value'] * _range_cutoff]
_x_min = _df_kde_i[_x_name].iloc[0]
_x_max = _df_kde_i[_x_name].iloc[-1]
_x_i = np.extract((_x > _x_min) & (_x < _x_max), _x)
_mean, _std = stats.norm.fit(_x_i)
_df_kde_ex['mean'].loc[_index] = _mean
_df_kde_ex['std'].loc[_index] = _std
_df_kde_ex['range'].loc[_index] = _x_max - _x_min
_df_kde_ex['range_min'].loc[_index] = _x_min
_df_kde_ex['range_max'].loc[_index] = _x_max
_df_kde_ex['value_min'].loc[_index] = _df_kde_i['value'].iloc[0]
_df_kde_ex['value_max'].loc[_index] = _df_kde_i['value'].iloc[-1]
return _df_kde, _df_kde_ex
# wrapper to quickly aggregate df
def qagg(df: pd.DataFrame, groupby, columns=None, agg=None, reset_index=True):
if agg is None:
agg = ['mean', 'std']
if columns is None:
columns = df.select_dtypes(include=np.number).columns
_df_agg = df.groupby(groupby).agg({_: agg for _ in columns})
_df_agg = _df_agg.set_axis(flatten([[_ + '_mean', _ + '_std'] for _ in columns]), axis=1, inplace=False)
if reset_index:
_df_agg = _df_agg.reset_index()
return _df_agg
[docs]@export
def mahalanobis(point: Union[pd.DataFrame, pd.Series, np.ndarray], df: pd.DataFrame = None, params: List[str] = None,
do_print: bool = True) -> Union[float, List[float]]:
"""
Calculates the Mahalanobis distance for a single point or a DataFrame of points
:param point: The point(s) to calculate the Mahalanobis distance for
:param df: The reference DataFrame against which to calculate the Mahalanobis distance
:param params: The columns to calculate the Mahalanobis distance for
:param do_print: Whether to print intermediate steps to the console
:return: if a single point is passed: Mahalanobis distance as float, else a list of floats
"""
if df is None:
df = point
_df = df.copy()
del df
if params is None:
params = _df.columns
else:
_df = _df[params]
try:
_vi = np.linalg.inv(_df.cov())
except np.linalg.LinAlgError:
return np.nan
_y = _df.mean().values
if isinstance(point, pd.DataFrame):
_out = []
_it = -1
for _index, _row in point.iterrows():
_it += 1
if do_print:
progressbar(_it, point.shape[0])
_x = _row[params].values
_out.append(distance.mahalanobis(_x, _y, _vi))
if do_print:
progressbar()
return _out
elif isinstance(point, pd.Series):
_x = point[params].values
else:
_x = np.array(point)
return distance.mahalanobis(_x, _y, _vi)
def multi_melt(df, cols, suffixes, id_vars, var_name='variable', sep='_', **kwargs):
# for multi melt to work the columns must share common suffixes
_df = df.copy()
del df
_df_out = []
for _col in cols:
_value_vars = ['{}{}{}'.format(_col, sep, _suffix) for _suffix in suffixes]
_df_out_i = _df.melt(id_vars=id_vars, value_vars=_value_vars, value_name=_col, var_name=var_name, **kwargs)
_df_out_i[var_name] = _df_out_i[var_name].str.slice(len(_col) + len(sep))
_df_out_i = _df_out_i.sort_values(by=force_list(id_vars) + [var_name]).reset_index(drop=True)
_df_out.append(_df_out_i)
_df_out = pd.concat(_df_out, axis=1).pipe(drop_duplicate_cols)
return _df_out
# for resampling integer indexes
def resample(df, rule=1, on=None, groupby=None, agg='mean', columns=None, adj_column_names=True, factor=1, **kwargs):
assert isinstance(df, pd.DataFrame), 'df must be a DataFrame'
_df = df.copy()
del df
if on is not None:
_df = _df.set_index(on)
if columns is None:
_columns = _df.select_dtypes(include=np.number).columns
else:
_columns = columns
if groupby is not None:
_columns = [_ for _ in _columns if _ not in force_list(groupby)]
_df = _df.groupby(groupby)
# convert int to seconds to be able to use .resample
_df.index = pd.to_datetime(_df.index * factor, unit='s')
# resample as time series
_df = _df.resample('{}s'.format(rule), **kwargs)
# agg
_adj_column_names = False
if agg == 'mean':
_df = _df.mean()
elif agg == 'median':
_df = _df.median()
elif agg == 'sum':
_df = _df.sum()
else:
_df = _df.agg({_: agg for _ in _columns})
if adj_column_names:
_adj_column_names = True
# back to int
_df.index = ((_df.index - pd.to_datetime('1970-01-01')).total_seconds() / factor)
if _adj_column_names:
_column_names = []
for _col in _columns:
for _agg in force_list(agg):
_column_names += ['{}_{}'.format(_col, _agg)]
_df.columns = _column_names
return _df
def df_count(x, df, hue=None, sort_by_count=True, top_nr=5, x_int=None, x_min=None, x_max=None, other_name='other',
na='drop'):
# -- init
_df = df.copy()
del df
if na != 'drop':
_df[x] = _df[x].astype(str).fillna('NaN')
if hue is not None:
_df[hue] = _df[hue].astype(str).fillna('NaN')
if not top_nr:
top_nr = None
if x == 'count':
x = 'count_org'
_df = _df.rename({'count': 'count_org'}, axis=1)
# -- preprocessing
if x_int is not None:
_df[x] = np.round(_df[x] / x_int) * x_int
if isinstance(x_int, int):
_df[x] = _df[x].astype(int)
if x_min is None:
x_min = _df[x].min()
if x_max is None:
x_max = _df[x].max()
_df_xs = pd.DataFrame({x: range(x_min, x_max, x_int)})
_xs_on = [x]
if hue is not None:
_df_hues = _df[[hue]].drop_duplicates().reset_index().assign(_dummy=1)
_df_xs = pd.merge(_df_xs.assign(_dummy=1), _df_hues, on='_dummy').drop(['_dummy'], axis=1)
_xs_on = _xs_on + [hue]
else:
_df_xs = pd.DataFrame()
_xs_on = []
# dummy
_df['_count'] = 1
# group values outside of top_n to other_name
if top_nr is not None:
_df[x] = top_n_coding(s=_df[x], n=top_nr, other_name=other_name)
if hue is not None:
_df[hue] = top_n_coding(s=_df[hue], n=top_nr, other_name=other_name)
# init df with counts
_groupby = [x]
if hue is not None:
_groupby = _groupby + [hue]
_df_count = _df.groupby(_groupby).agg({'_count': 'sum'}).reset_index().rename({'_count': 'count'}, axis=1)
# append 0 entries for numerical x
if x_int is not None:
_df_count = pd.merge(_df_count, _df_xs, on=_xs_on, how='outer')
_df_count['count'] = _df_count['count'].fillna(0)
# create total count (for perc)
_count_x = 'count_{}'.format(x)
_count_hue = 'count_{}'.format(hue)
if hue is None:
_df_count[_count_hue] = _df_count['count'].sum()
_df_count[_count_x] = _df_count['count']
else:
_df_count[_count_x] = _df_count.groupby(x)['count'].transform(pd.Series.sum)
_df_count[_count_hue] = _df_count.groupby(hue)['count'].transform(pd.Series.sum)
# sort
if sort_by_count:
_df_count = _df_count.sort_values([_count_x], ascending=False).reset_index(drop=True)
_df_count['perc_{}'.format(x)] = np.round(_df_count['count'] / _df_count[_count_x] * 100, 2)
_df_count['perc_{}'.format(hue)] = np.round(_df_count['count'] / _df_count[_count_hue] * 100, 2)
return _df_count
# return prediction accuracy in percent
def get_accuracy(class_true, class_pred):
return np.where(class_true.astype(str) == class_pred.astype(str), 1, 0).sum() / len(class_true)
# takes a numeric pandas series and splits it into groups, the groups are labeled by INTEGER multiples of the step value
def numeric_to_group(pd_series, step=None, outer_limit=4, suffix=None, use_abs=False, use_standard_scaler=True):
# outer limit is given in steps, only INTEGER values allowed
outer_limit = int(outer_limit)
# make a copy to avoid inplace effects
_series = pd.Series(deepcopy(pd_series))
# use standard scaler to center around mean with std +- 1
if use_standard_scaler:
_series = StandardScaler().fit(_series.values.reshape(-1, 1)).transform(_series.values.reshape(-1, 1)).flatten()
# if step is none: use 1 as step
if step is None:
step = 1
if suffix is None:
if use_standard_scaler:
suffix = 'std'
else:
suffix = 'step'
if suffix != '':
suffix = '_' + suffix
# to absolute
if use_abs:
_series = np.abs(_series)
else:
# gather the +0 and -0 group to 0
_series = np.where(np.abs(_series) < step, 0, _series)
# group
# get sign
_series_sign = np.sign(_series)
# divide by step, floor and integer
_series = (np.floor(np.abs(_series) / step)).astype(int) * np.sign(_series).astype(int)
# apply outer limit
if outer_limit is not None:
_series = np.where(_series > outer_limit, outer_limit, _series)
_series = np.where(_series < -outer_limit, -outer_limit, _series)
# make a pretty string
_series = pd.Series(_series).apply(lambda x: '{0:n}'.format(x)).astype('str') + suffix
# to cat
_series = _series.astype('category')
return _series
[docs]@export
def top_n(s: Sequence, n: int, w: Optional[Sequence] = None) -> list:
"""
select n elements form a categorical pandas series with the highest counts
:param s: pandas Series to select from
:param n: how many elements to return
:param w: weights, if given the weights are summed instead of just counting entries in s [optional]
:return: List of top n elements
"""
# faster
if w is None:
return list(pd.Series(s).value_counts().reset_index()['index'][:n])
else:
return pd.DataFrame({'s': s, 'w': w}).groupby('s').agg({'w': 'sum'})\
.sort_values(by='w', ascending=False).index.tolist()[:n]
[docs]@export
def top_n_coding(s: Sequence, n: int, other_name: str = 'other', na_to_other: bool = False,
w: Optional[Sequence] = None) -> pd.Series:
"""
returns a modified version of the pandas series where all elements not in top_n become recoded as 'other'
:param s: pandas Series to adjust
:param n: how many elements to keep
:param other_name: name of the other element [optional]
:param na_to_other: whether to cast missing elements to other [optional]
:param w: weights, if given the weights are summed instead of just counting entries in s [optional]
:return: adjusted pandas Series
"""
# we have to cast to string so we can set the other name
_s = pd.Series(s).astype('str')
_top_n = top_n(_s, n, w=w)
_s = pd.Series(np.where(_s.isin(_top_n), _s, other_name))
if na_to_other:
_s = np.where(~_s.isin(['nan', 'nat']), _s, other_name)
_s = pd.Series(_s)
# get back the old properties of the series (or you'll screw the index)
if isinstance(s, pd.Series):
_s.name = s.name
_s.index = s.index
# convert to cat
_s = _s.astype('category')
return _s
[docs]@export
def k_split(df: pd.DataFrame, k: int = 5, groupby: Union[Sequence, str] = None,
sortby: Union[Sequence, str] = None, random_state: int = None, do_print: bool = True,
return_type: Union[str, int] = 1) -> Union[pd.Series, tuple]:
"""
splits a DataFrame into k (equal sized) parts that can be used for train test splitting or k_cross splitting
:param df: pandas DataFrame to be split
:param k: how many (equal sized) parts to split the DataFrame into [optional]
:param groupby: passed to pandas.DataFrame.groupby before splitting,
ensures that each group will be represented equally in each split part [optional]
:param sortby: if True the DataFrame is ordered by these column(s) and then sliced into parts from the top
if False the DataFrame is sorted randomly before slicing [optional]
:param random_state: random_state to be used in random sorting, ignore if sortby is True [optional]
:param do_print: whether to print steps to console [optional]
:param return_type: if one of ['Series', 's'] returns a pandas Series containing the k indices range(k)
if a positive integer < k returns tuple of shape (df_train, df_test) where the return_type'th part
is equal to df_test and the other parts are equal to df_train
:return: depending on return_type either a pandas Series or a tuple
"""
if do_print:
tprint('splitting 1:{} ...'.format(k))
# -- init
_df = df.copy()
del df
_index_name = _df.index.name
_df['_index'] = _df.index.copy()
_df = _df.reset_index(drop=True)
_k_split = int(np.ceil(_df.shape[0] / k))
if groupby is None:
groupby = '_dummy'
_df['_dummy'] = 1
_df_out = []
for _index, _df_i in _df.groupby(groupby):
# sort (randomly or by given value)
if sortby is None:
_df_i = _df_i.sample(frac=1, random_state=random_state).reset_index(drop=True)
else:
if sortby == 'index':
_df_i = _df_i.sort_index()
else:
_df_i = _df_i.sort_values(by=sortby).reset_index(drop=True)
# assign k index
_df_i['_k_index'] = _df_i.index // _k_split
_df_out.append(_df_i)
_df_out = df_merge(_df_out).set_index(['_index']).sort_index()
_df_out.index = _df_out.index.rename(None)
if '_dummy' in _df_out.columns:
_df_out = _df_out.drop(['_dummy'], axis=1)
if return_type in range(k):
_df_train = _df_out[_df_out['_k_index'] != return_type].drop('_k_index', axis=1)
_df_test = _df_out[_df_out['_k_index'] == return_type].drop('_k_index', axis=1)
return _df_train, _df_test
else:
return _df_out['_k_index']