"""
hhpy.modelling.py
~~~~~~~~~~~~~~~~~
Contains a model class that is based on pandas DataFrames and wraps around sklearn and other frameworks
to provide convenient train test functions.
"""
# ---- imports
# --- standard imports
import itertools
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
# --- third party imports
from copy import deepcopy
from typing import Sequence, Mapping, Union, Callable, Optional, Any, Tuple, List
from sklearn.exceptions import DataConversionWarning
from sklearn.base import clone
# ---- optional imports
try:
from IPython.core.display import display
except ImportError:
display = print
# --- local imports
from hhpy.main import GROUPBY_DUMMY, export, BaseClass, is_list_like, assert_list, tprint, DocstringProcessor, \
SequenceOrScalar, DFOrArray, list_exclude, list_merge, list_intersection
from hhpy.ds import docstr as docstr_ds, assert_df, k_split, df_score, drop_duplicate_cols
from hhpy.plotting import ax_as_list, legend_outside, rcParams as hpt_rcParams, docstr as docstr_hpt
from hhpy.ipython import display_df
# ---- variables
# --- validations
validations = {
'Model__predict__return_type': ['y', 'df', 'DataFrame'],
'Models__predict__return_type': ['y', 'df', 'DataFrame', 'self'],
'Models__score__return_type': ['self', 'df', 'DataFrame'],
'Models__score__scores': ['r2', 'rmse', 'mae', 'stdae', 'medae', 'pae'],
'fit__fit_type': ['train_test', 'k_cross', 'final'],
'setattr__target': ['all', 'self', 'children', 'a', 's', 'c']
}
# --- docstr
docstr = DocstringProcessor(
df='Pandas DataFrame containing the training and testing data. '
'Can be saved to the Model object or supplied on an as needed basis.',
X_ref='List of features (predictors) used for training the model',
y_ref='List of labels (targets) to be predicted',
X='The feature (predictor) data used for training as DataFrame, np.array or column names',
y='The label (target) data used for training as DataFrame, np.array or column names',
X_test='The feature (predictor) data used for testing as DataFrame, np.array or column names',
y_test='The label (target) data used for testing as DataFrame, np.array or column names',
df_train='Pandas DataFrame containing the training data, optional if array like data is passed for X/y',
df_test='Pandas DataFrame containing the testing data, optional if array like data is passed for X/y test',
X_predict='The feature (predictor) data used for predicting as DataFrame, np.array or column names',
y_predict='The label (target) data used for predicting as DataFrame, np.array or column names. Specifying y '
'is only necessary for convolutional or time-series type models [optional]',
df_predict='Pandas DataFrame containing the predict data, optional if array like data is passed for X_predict',
model_in='Any model object that implements .fit and .predict',
name='Name of the model, used for naming columns [optional]',
dropna='Whether to drop rows containing NA in the training data [optional]',
scaler_X='Scalar object that implements .transform and .inverse_transform, applied to the features (predictors)'
'before training and inversely after predicting [optional]',
scaler_y='Scalar object that implements .transform and .inverse_transform, applied to the labels (targets)'
'before training and inversely after predicting [optional]',
do_print='Whether to print the steps to console [optional]',
display_score='Whether to display the score DataFrame [optional]',
ensemble='if True also predict with Ensemble like combinations of models. If True or mean calculate'
'mean of individual predictions. If median calculate median of individual predictions. [optional]',
printf='print function to use for logging [optional]',
groupby=docstr_ds.params['groupby'],
key='key of attribute to set',
value='value of attribute to set',
**validations
)
# ---- classes
# noinspection PyPep8Naming
[docs]@docstr
@export
class Model(BaseClass):
"""
A unified modeling class that is extended from sklearn, accepts any model that implements .fit and .predict
:param model: %(model_in)s
:param name: %(name)s
:param X_ref: %(X_ref)s
:param y_ref: %(y_ref)s
:param groupby: %(groupby)s
"""
# --- globals
__name__ = 'Model'
__attributes__ = ['name', 'model', 'X_ref', 'y_ref', 'groupby', 'is_fit']
# --- functions
def __init__(self, model: Any = None, name: str = 'pred', X_ref: SequenceOrScalar = None,
y_ref: SequenceOrScalar = None, groupby: SequenceOrScalar = None) -> None:
# -- assert
if X_ref is not None:
X_ref = assert_list(X_ref)
if y_ref is not None:
y_ref = assert_list(y_ref)
# _intersection_X_y = list_intersection(X_ref, y_ref)
# if len(_intersection_X_y) > 0:
# raise ValueError(f"{_intersection_X_y} are in both X_ref and y_ref")
# -- assign
self.name = name
if isinstance(model, str):
model = eval(model)
self.model = [model]
self.X_ref = X_ref
self.y_ref = y_ref
self.groupby = groupby
self.is_fit = False
def reset(self) -> None:
"""
reset self to unfit state and delete copy of model
:return: None
"""
self.is_fit = False
self.model = [clone(self.model[0])]
[docs] @docstr
def fit(self, X: Union[DFOrArray, SequenceOrScalar] = None, y: Union[DFOrArray, SequenceOrScalar] = None,
df: pd.DataFrame = None, dropna: bool = True, X_test: Union[DFOrArray, SequenceOrScalar] = None,
y_test: Union[DFOrArray, SequenceOrScalar] = None, df_test: pd.DataFrame = None,
groupby: SequenceOrScalar = None, k: int = 0) -> None:
"""
generalized fit method extending on model.fit
:param X: %(X)s
:param y: %(y)s
:param df: %(df_train)s
:param dropna: %(dropna)s
:param X_test: %(X_test)s
:param y_test: %(y_test)s
:param df_test: %(df_test)s
:param groupby: %(groupby)s
:param k: index of the model to fit
:return: None
"""
# -- assert
# - groupby
if groupby is None:
groupby = self.groupby
groupby = assert_list(groupby)
self.groupby = groupby
# - df
# if X and y are passed separately: concat them to a DataFrame
if df is None:
_df_X = assert_df(X)
_df_y = assert_df(y)
# if both X and y are DataFrames we can use their column names
if isinstance(X, pd.DataFrame) and isinstance(y, (pd.DataFrame, pd.Series)):
df = pd.concat([_df_X, _df_y], axis=1)
# if not then we override the default column names when concatting to avoid duplicates
else:
df = pd.concat([_df_X, _df_y], ignore_index=True, sort=False, axis=1)
# save column names to self
self.X_ref = _df_X.columns
self.y_ref = _df_y.columns
# in this case groupby is not supported
groupby = []
# get X_ref and y_ref
else:
if self.X_ref is None:
self.X_ref = assert_list(X)
if self.y_ref is None:
self.y_ref = assert_list(y)
df = assert_df(df)
if df_test is not None:
df_test = assert_df(df_test)
# handle dropna
if dropna:
_dropna_cols = list_intersection(df.columns, list_merge(self.X_ref, self.y_ref, groupby))
df = df.dropna(subset=_dropna_cols)
if df_test is not None:
df_test = df_test.dropna(subset=_dropna_cols)
# -- init
# - X / y train
_X = df[self.X_ref]
_y = df[self.y_ref]
# - X / y test
if df_test is None:
_X_test = X_test
_y_test = y_test
else:
_X_test = df_test[self.X_ref]
_y_test = df_test[self.y_ref]
# -- fit
# append a copy of the model if needed
while k >= len(self.model):
self.model.append(deepcopy(self.model[0]))
# get model by index
_model = self.model[k]
_varnames = _model.fit.__code__.co_varnames
_kwargs = {}
# pass X / y test only if fit can handle it
if 'groupby' in _varnames and groupby not in [None, []]:
_X = pd.concat([_X, df[groupby]], axis=1)
if _X_test is not None:
_X_test = pd.concat([_X_test, df_test[groupby]], axis=1)
_kwargs['groupby'] = groupby
if 'X_test' in _varnames:
_kwargs['X_test'] = _X_test
if 'y_test' in _varnames:
_kwargs['y_test'] = _y_test
warnings.simplefilter('ignore', (FutureWarning, DataConversionWarning))
_model.fit(X=_X, y=_y, **_kwargs)
warnings.simplefilter('default', (FutureWarning, DataConversionWarning))
self.is_fit = True
[docs] @docstr
def predict(self, X: Union[DFOrArray, SequenceOrScalar] = None, y: Union[DFOrArray, SequenceOrScalar] = None,
df: pd.DataFrame = None, return_type: str = 'y', k_index: pd.Series = None,
groupby: SequenceOrScalar = None, handle_na: bool = True, multi: SequenceOrScalar = None
) -> Union[pd.Series, pd.DataFrame]:
"""
Generalized predict method based on model.predict
:param X: %(X)s
:param y: %(y)s
:param df: %(df)s
:param return_type: one of %(Model__predict__return_type)s, if 'y' returns a pandas Series / DataFrame
with only the predictions, if one of 'df','DataFrame' returns the full DataFrame with predictions added
:param k_index: If specified and model is k_cross split: return only the predictions for each test subset
:param groupby: %(groupby)s
:param handle_na: Whether to handle NaN values (prediction will be NaN) [optional]
:param multi: Postfixes to use for multi output models [optional]
:return: see return_type
"""
# -- assert
# - model is fit
if not self.is_fit:
raise ValueError('Model is not fit yet')
# - valid return type
if return_type not in validations['Model__predict__return_type']:
raise ValueError(f"return type must be one of {validations['Model__predict__return_type']}")
# - df
if df is not None:
df = assert_df(df)
else:
df = pd.concat([assert_df(X), assert_df(y)], axis=1)
# - groupby
if groupby in [None, []]:
groupby = self.groupby
groupby = assert_list(groupby)
_y_ref_pred = ['{}_pred'.format(_) for _ in self.y_ref]
# special case if there is only one target (most algorithms support only one)
if len(self.y_ref) == 1:
_y_ref_pred = _y_ref_pred[0]
# - predict using sklearn api
_ks = len(self.model)
_y_labs = []
for _k, _model in enumerate(self.model):
# get y lab
if _ks == 1:
_y_lab = _y_ref_pred
_y_labs = [_y_ref_pred]
else:
if k_index is None:
_y_lab = [f"{_}_{_k}" for _ in assert_list(_y_ref_pred)]
_y_labs += _y_lab
else:
_y_labs = [_y_ref_pred]
# - handle kwargs
_kwargs = {}
_varnames = _model.predict.__code__.co_varnames
# groupby
if 'groupby' in _varnames:
_X = df[self.X_ref + groupby] # .dropna()
_kwargs['groupby'] = groupby
else:
_X = df[self.X_ref]
# noinspection PyUnresolvedReferences
_na_indices = _X[_X.isna().any(axis=1)].index.tolist()
# y
if 'y' in _varnames:
_y = df[self.y_ref]
# noinspection PyUnresolvedReferences
_na_indices = list_merge(_na_indices, _X[_X.isna().any(axis=1)].index.tolist())
if handle_na: # drop na
_y = _y.drop(_na_indices)
_kwargs['y'] = _y
if handle_na: # drop na (has to be here so y na indices have been handled)
_X = _X.drop(_na_indices)
# - predict
try:
_y_pred = _model.predict(_X, **_kwargs)
except TypeError:
# sometimes the model accepts y as keyword argument but cannot actually handle it
# I'm looking at you sklearn.multioutput
_kwargs.pop('y')
_y_pred = _model.predict(_X, **_kwargs)
# cast to pandas
_y_pred = pd.DataFrame(_y_pred, index=_X.index)
# bring back dropped nans
if handle_na:
_y_pred = pd.concat([_y_pred, pd.DataFrame(np.nan, index=_na_indices, columns=_y_pred.columns)])\
.sort_index()
# feed back to df
if len(_y_labs) < _y_pred.shape[1]: # some regressors (TimeSeriesRegressors)
# might return multi output
_y_labs_new = []
if multi is None:
multi = _y_pred.shape[1] // len(_y_labs)
for _y_lab in _y_labs:
for _it in range(multi):
_y_labs_new.append(f"{_y_lab}_{_it}")
_y_labs = _y_labs_new + []
if k_index is None: # all
for _it, _y_lab in enumerate(_y_labs):
df[_y_lab] = _y_pred[_y_pred.columns[_it]]
else:
# during first iteration: assert output columns are available
if _k == 0:
for __y_ref_pred in assert_list(_y_ref_pred):
if __y_ref_pred not in df.columns:
df[__y_ref_pred] = np.nan
# write output
# we loop the columns because _y is possible to be multi output or single output while _y_pred is a DF
# and np where does weird stuff if you pass a single column on the left and a DF of the right of the ==
for _col, __y_ref_pred in enumerate(assert_list(_y_ref_pred)):
df[__y_ref_pred] = np.where(k_index == _k, _y_pred[_y_pred.columns[_col]], df[__y_ref_pred])
if return_type == 'y':
return df[_y_labs]
else:
return df
@docstr
def setattr(self, key: str, value: Any, target: str = 'all') -> None:
"""
Set attribute on self and or all k instances of one's model
:param key: %(key)s
:param value: %(value)s
:param target: One of %(setattr__target)s, for 'self' sets only on self, for 'children' sets for children and
for 'all' tries to set on both
:return: None
"""
# -- assert
if target not in validations['setattr__target']:
raise ValueError(f"target must be one of {validations['setattr__target']}")
# -- main
# - self
if target in ['all', 'a', 'self', 's'] and hasattr(self, key):
setattr(self, key, value)
# - children
if target in ['all', 'a', 'children', 'c']:
for _model in self.model:
if hasattr(_model, key):
setattr(_model, key, value)
# noinspection PyPep8Naming
[docs]@docstr
@export
class Models(BaseClass):
"""
Collection of Models that allow for fitting and predicting with multiple Models at once,
comparing accuracy and creating Ensembles
:param args: multiple Model objects that will form a Models Collection
:param name: name of the collection
:param df: %(df)s
:param X_ref: %(X_ref)s
:param y_ref: %(y_ref)s
:param scaler_X: %(scaler_X)s
:param scaler_y: %(scaler_y)s
:param printf: %(printf)s
"""
# --- globals
__name__ = 'Models'
__attributes__ = ['models', 'fit_type', 'df', 'X_ref', 'y_ref', 'groupby', 'scaler_X', 'scaler_y', 'model_names',
'df_score', 'k_tests']
# --- functions
def __init__(self, *args: Any, df: pd.DataFrame = None, X_ref: SequenceOrScalar = None,
y_ref: SequenceOrScalar = None, groupby: SequenceOrScalar = None, scaler_X: Any = None,
scaler_y: Any = None, printf: Callable = tprint) -> None:
# -- assert
if isinstance(scaler_X, type):
scaler_X = scaler_X()
if isinstance(scaler_y, type):
scaler_y = scaler_y()
if X_ref is not None:
X_ref = assert_list(X_ref)
if y_ref is not None:
y_ref = assert_list(y_ref)
if groupby is not None:
groupby = assert_list(groupby)
# -- init
_models = []
_model_names = []
# ensure hhpy.modelling.Model
_it = -1
for _arg in args:
for _model in assert_list(_arg):
_it += 1
if not isinstance(_model, Model):
if hasattr(_model, 'name'):
_name = _model.name
elif hasattr(_model, '__name__'):
_name = f"{_model.__name__}_{_it}"
else:
_name = f"model_{_it}"
_model = Model(_model, name=_name, X_ref=X_ref, y_ref=y_ref, groupby=groupby)
_models.append(deepcopy(_model))
if _model.name not in _model_names:
_model_names.append(_model.name)
# -- assign
self.models = _models
self.fit_type = None
if df is not None:
self.df = df.copy()
else:
self.df = None
self.y_ref = y_ref
if X_ref is not None:
self.X_ref = X_ref
elif df is not None:
# default to all columns not in y_ref / groupby
self.X_ref = [_ for _ in df.columns if _ not in y_ref + groupby]
else:
self.X_ref = []
self.groupby = groupby
self.scaler_X = scaler_X
self.scaler_y = scaler_y
self.model_names = _model_names
self.printf = printf
# - not given attributes
self.df_score = None
self.k_tests = None
self.y_pred = []
self.multi = None
@property
def df_test(self) -> Union[None, pd.DataFrame, List[pd.DataFrame]]:
# None case
if self.df is None or self.fit_type is None:
return None
# train-test case
if self.fit_type == 'train_test':
return self.df[lambda _: _['_k_index'] == self.k_tests[0]]
# k-cross case
_df_tests = []
for _k_test in self.k_tests:
_df_tests.append(self.df[lambda _: _['_k_index'] == _k_test])
return _df_tests
def reset(self) -> None:
"""
reset all child models to an unfit state
:return: None
"""
# - loop child models and call reset
for _model in self.models:
_model.reset()
# - reset df score
self.df_score = None
# - reset k tests
self.k_tests = None
# - drop predictions from df
for _y_pred in list_intersection(self.y_pred, assert_list(self.df.columns)):
self.df = self.df.drop(_y_pred, axis=1)
[docs] @docstr
def k_split(self, **kwargs):
"""
apply hhpy.ds.k_split to self to create train-test or k-cross ready data
:param kwargs: keyword arguments passed to :func:`~hhpy.ds.k_split`
:return: None
"""
# -- init
if self.df is None:
raise ValueError('You must specify a df')
# -- split
_k_index = k_split(df=self.df, return_type='s', **kwargs)
self.df['_k_index'] = _k_index
[docs] @docstr
def model_by_name(self, name: Union[list, str]) -> Union[Model, list]:
"""
extract a list of Models from the collection by their names
:param name: name of the Model
:return: list of Models
"""
_models = []
for _model in self.models:
if _model.name in assert_list(name):
if not is_list_like(name):
return _model
else:
_models.append(_model)
return _models
[docs] @docstr
def fit(self, fit_type: str = 'train_test', k_test: Optional[int] = 0, groupby: SequenceOrScalar = None,
do_print: bool = True, **kwargs):
"""
fit all Model objects in collection
:param fit_type: one of %(fit__fit_type)s
:param k_test: which k_index to use as test data
:param groupby: %(groupby)s
:param do_print: %(do_print)s
:param kwargs: Other keyword arguments passed to :func:`~Model.fit`
:return: None
"""
# -- assert
if fit_type not in validations['fit__fit_type']:
raise ValueError(f"fit type must be one of {validations['fit__fit_type']}")
# -- init
if groupby is None:
groupby = self.groupby
# get df
_df = self.df.copy()
# groupby and k index
# -- apply scaler to X and y separately
warnings.simplefilter('ignore', DataConversionWarning)
if self.scaler_X is not None and _df[self.X_ref].shape[1] > 0: # 0 column features cannot be scaled
self.scaler_X = self.scaler_X.fit(_df[self.X_ref])
_df[self.X_ref] = self.scaler_X.transform(_df[self.X_ref])
else:
self.scaler_X = None
if self.scaler_y is not None:
self.scaler_y = self.scaler_y.fit(_df[self.y_ref])
_df[self.y_ref] = self.scaler_y.transform(_df[self.y_ref])
warnings.simplefilter('default', DataConversionWarning)
# split
if fit_type == 'train_test':
_k_tests = [k_test]
# self.df['_test'] = self.df['_k_index'] == k_test
elif fit_type == 'k_cross':
_k_tests = sorted(_df['_k_index'].unique())
else: # fit_type == 'final'
_k_tests = [-1]
# get df train and df test
for _k_test in _k_tests:
_df_train = _df[lambda _: _['_k_index'] != _k_test]
_df_test = _df[lambda _: _['_k_index'] == _k_test]
for _model in self.models:
if do_print:
self.printf('fitting model {}...'.format(_model.name))
_model.fit(X=self.X_ref, y=self.y_ref, df=_df_train, df_test=_df_test, groupby=groupby,
k=_k_test, **kwargs)
self.fit_type = fit_type
self.k_tests = _k_tests
[docs] @docstr
def predict(
self, X: Union[DFOrArray, SequenceOrScalar] = None, y: Union[DFOrArray, SequenceOrScalar] = None,
df: pd.DataFrame = None, return_type: str = 'self', ensemble: bool = False, k_predict_type: str = 'test',
groupby: SequenceOrScalar = None, multi: SequenceOrScalar = None, do_print: bool = True
) -> Optional[Union[pd.Series, pd.DataFrame]]:
"""
predict with all models in collection
:param X: %(X_predict)s
:param y: %(y_predict)s
:param df: %(df_predict)s
:param return_type: one of %(Models__predict__return_type)s
:param ensemble: %(ensemble)s
:param k_predict_type: 'test' or 'all'
:param groupby: %(groupby)s
:param multi: postfixes to use for multi output [optional]
:param do_print: %(do_print)s
:return: if return_type is self: None, else see Model.predict
"""
# -- assert
# return_type
_valid_return_types = ['self', 'df', 'DataFrame']
assert(return_type in _valid_return_types), 'return_type must be one of {}'.format(_valid_return_types)
# fit_type
if not self.fit_type:
raise ValueError('Model is not fit yet')
# X / y
if df is None:
if isinstance(X, pd.DataFrame):
if isinstance(y, pd.DataFrame):
df = pd.concat([X, y], axis=1)
else:
df = X
else:
df = self.df.copy()
# - handle scaler transformation
warnings.simplefilter('ignore', DataConversionWarning)
if self.scaler_X is not None:
df[self.X_ref] = self.scaler_X.transform(df[self.X_ref])
if self.scaler_y is not None:
df[self.y_ref] = self.scaler_y.transform(df[self.y_ref])
_y_preds = []
_y_ref_preds = []
_model_names = []
for _model in self.models:
if do_print:
self.printf('predicting model {}...'.format(_model.name))
_model_names.append(_model.name)
# get config for k_cross
_k_index = None
if self.fit_type == 'k_cross':
# k_cross_type test: return only test prediction for each k
if k_predict_type == 'test':
_k_index = self.df['_k_index']
_y_ref_preds += [f"{_}_{_model.name}" for _ in _model.y_ref]
# k_cross_type all: return all predictions
else:
for _k in range(len(_model.model)):
_y_ref_preds += [f"{_}_{_model.name}_{_k}" for _ in _model.y_ref]
else:
_y_ref_preds += [f"{_}_{_model.name}" for _ in _model.y_ref]
_y_pred_scaled = _model.predict(df=df, groupby=groupby, k_index=_k_index)
# - handle scaler inverse transformation
if self.scaler_y is None:
_y_pred = deepcopy(_y_pred_scaled)
else:
_df_y_pred_scaled = pd.DataFrame(_y_pred_scaled)
_df_y_pred_scaled.columns = _model.y_ref
# if the model is predicting only a subset of ys we need to reshape before we scale
if _model.y_ref != self.y_ref:
for _col in self.y_ref:
if _col not in _model.y_ref:
_df_y_pred_scaled[_col] = np.nan
# ensure correct order
_df_y_pred_scaled = _df_y_pred_scaled[self.y_ref]
_df_y_pred = self.scaler_y.inverse_transform(_df_y_pred_scaled)
_df_y_pred = pd.DataFrame(_df_y_pred)
_df_y_pred.columns = self.y_ref
_y_pred = _df_y_pred[_model.y_ref]
_y_preds.append(_y_pred)
warnings.simplefilter('default', DataConversionWarning)
_df = pd.concat(_y_preds, axis=1, sort=False)
# feed back to df
if len(_y_ref_preds) < _df.shape[1]: # some regressors (TimeSeriesRegressors)
# might return multi output
_y_ref_preds_new = []
if multi is None:
multi = range(_df.shape[1] // len(_y_ref_preds))
for _y_ref_pred in _y_ref_preds:
for _multi in multi:
_y_ref_preds_new.append(f"{_y_ref_pred}_{_multi}")
_y_ref_preds = _y_ref_preds_new + []
else:
multi = None
# save
self.multi = multi
# to df
_df.columns = _y_ref_preds
_df.index = df.index
if ensemble: # supports up to 3 model ensembles (does not support multi regression)
# drop duplicates
_model_names = list(set(_model_names))
# get all combinations
for _comb in list(itertools.combinations(_model_names, 2)) + list(itertools.combinations(_model_names, 3)):
for _y_ref in self.y_ref:
_comb_name = '_'.join(_comb)
_y_comb_name = '{}_{}'.format(_y_ref, _comb_name)
_cols = []
for _element in _comb:
_cols.append('{}_{}'.format(_y_ref, _element))
_df[_y_comb_name] = _df[_cols].mean(axis=1)
if _comb_name not in self.model_names:
self.model_names.append(_comb_name)
if return_type == 'self':
for _col in _df.columns:
self.df[_col] = _df[_col]
if _col not in self.y_pred:
self.y_pred.append(_col)
elif return_type in ['df', 'DataFrame']:
return _df
[docs] @docstr
def score(
self, return_type: str = 'self', pivot: bool = False, groupby: SequenceOrScalar = None,
do_print: bool = True, display_score: bool = True, display_format: str = ',.3f', **kwargs
) -> Optional[pd.DataFrame]:
"""
calculate score of the Model predictions
:param return_type: one of %(Models__score__return_type)s
:param pivot: whether to pivot the DataFrame for easier readability [optional]
:param do_print: %(do_print)s
:param display_score: %(display_score)s
:param display_format: Format to use when displaying the score DataFrame [optional]
:param groupby: %(groupby)s
:param kwargs: other keyword arguments passed to :func:`~hhpy.ds.df_score`
:return: if return_type is 'self': None, else: pandas DataFrame containing the scores
"""
# -- assert
_valid_return_types = ['self', 'df', 'DataFrame']
assert(return_type in _valid_return_types),\
'return_type must be one of {}'.format(_valid_return_types)
groupby = assert_list(groupby)
_df = self.df.copy()
# -- init
if do_print:
self.printf('scoring...')
if self.fit_type is None:
raise ValueError('Model is not fit yet')
elif self.fit_type == 'k_cross':
groupby.append('_k_index')
elif self.fit_type == 'train_test':
_df = _df[lambda _: _['_k_index'].isin(self.k_tests)]
_df_score = df_score(df=_df, y_true=self.y_ref, pred_suffix=self.model_names, pivot=pivot, groupby=groupby,
multi=self.multi, **kwargs)
if display_score:
if pivot:
_display_score = _df_score
else:
_display_score = _df_score.pivot_table(index=['y_true', 'y_pred'], columns='score', values='value')
display_df(_display_score, float_format=display_format)
if return_type == 'self':
self.df_score = _df_score
else:
return _df_score
[docs] @docstr
def train(
self, df: pd.DataFrame = None, k: int = 5, groupby: Union[Sequence, str] = None,
sortby: Union[Sequence, str] = None, random_state: int = None, fit_type: str = 'train_test',
k_test: Optional[int] = 0, ensemble: bool = False, scores: Union[Sequence, str, Callable] = None,
multi: SequenceOrScalar = None,
scale: float = None, do_predict: bool = True, do_score: bool = True, do_split: bool = True,
do_fit: bool = True, do_print: bool = True, display_score: bool = True
) -> None:
"""
wrapper method that combined k_split, train, predict and score
:param df: %(df)s
:param k: see hhpy.ds.k_split see hhpy.ds.k_split
:param groupby: %(groupby)s
:param sortby: see hhpy.ds.k_split
:param random_state: see hhpy.ds.k_split
:param fit_type: see .fit
:param k_test: see .fit
:param ensemble: %(ensemble)s
:param scores: see .score [optional]
:param multi: postfixes to use for multi output [optional]
:param scale: see .score
:param do_print: %(do_print)s
:param display_score: %(display_score)s
:param do_split: whether to apply k_split [optional]
:param do_fit: whether to fit the Models [optional]
:param do_predict: whether to add predictions to DataFrame [optional]
:param do_score: whether to create self.df_score [optional]
:return: None
"""
if df is not None:
self.df = df
# -- main
if do_split:
self.k_split(k=k, groupby=groupby, sortby=sortby, random_state=random_state, do_print=do_print)
if do_fit:
self.fit(fit_type=fit_type, k_test=k_test, do_print=do_print, groupby=groupby)
if do_predict:
self.predict(ensemble=ensemble, do_print=do_print, groupby=groupby, multi=multi)
if do_score:
self.score(scores=scores, scale=scale, do_print=do_print, display_score=display_score)
if do_print:
self.printf('training done')
[docs] @docstr_hpt
def scoreplot(
self, x='y_ref', y='value', hue='model', hue_order=None, row='score', row_order=None,
palette=None, width=16, height=9 / 2, scale=None, query=None, return_fig_ax=False,
**kwargs
) -> Optional[tuple]:
"""
plot the score(s) using sns.barplot
:param x: %(x)s
:param y: %(y)s
:param hue: %(hue)s
:param hue_order: %(order)s
:param row: the variable to wrap around the rows [optional]
:param row_order: %(order)s
:param palette: %(palette)s
:param width: %(subplot_width)s
:param height: %(subplot_height)s
:param scale: scale the values [optional
:param query: query to be passed to pd.DataFrame.query before plotting [optional]
:param return_fig_ax: %(return_fig_ax)s
:param kwargs: other keyword arguments passed to sns.barplot
:return: see return_fig_ax
"""
# -- init
if row_order is None:
row_order = ['r2', 'rmse', 'mae', 'stdae', 'medae', 'pae']
if hue_order is None:
hue_order = self.model_names
if palette is None:
palette = hpt_rcParams['palette']
_row_order = assert_list(row_order)
fig, ax = plt.subplots(nrows=len(_row_order), figsize=(width, height * len(_row_order)))
_ax_list = ax_as_list(ax)
_df_score = self.score(return_type='df', scale=scale, do_print=False, display_score=False)
if query is not None:
_df_score = _df_score.query(query)
_row = -1
for _row_value in _row_order:
_row += 1
_ax = _ax_list[_row]
sns.barplot(x=x, y=y, hue=hue, hue_order=hue_order, data=_df_score.query('{}==@_row_value'.format(row)),
ax=_ax, palette=palette, **kwargs)
_y_label = _row_value
if scale:
_y_label += '_scale={}'.format(scale)
_ax.set_ylabel(_y_label)
_ax.set_xlabel('')
legend_outside(ax)
if return_fig_ax:
return fig, ax
else:
plt.show(fig)
@docstr
def setattr(self, key: str, value: Any, target: str = 'all', child_target: str = 'all') -> None:
"""
Set attribute on self and or all child models
:param key: %(key)s
:param value: %(value)s
:param target: One of %(setattr__target)s, for 'self' sets only on self, for 'children' sets for children and
for 'all' tries to set on both
:param child_target: Same as target but passed to :func:`~Model.setattr`
:return: None
"""
# -- assert
if target not in validations['setattr__target']:
raise ValueError(f"target must be one of {validations['setattr__target']}")
# -- main
# - self
if target in ['all', 'a', 'self', 's'] and hasattr(self, key):
setattr(self, key, value)
# - children
if target in ['all', 'a', 'children', 'c']:
for _model in self.models:
_model.setattr(key, value, target=child_target)
# ---- functions
[docs]@export
def assert_array(a: Any, return_name: bool = False, name_default: str = 'name') -> Union[Tuple[np.ndarray, str],
np.ndarray]:
"""
Take any python object and turn it into a 2d numpy array (if possible). Useful for training neural networks.
:param a: any python object
:param return_name: Whether the name should be returned
:param name_default: The name to fall back to if the object has no name attribute.
:return: numpy array, if return_name: Tuple [numpy.array, name]
"""
_name = name_default
if return_name:
if isinstance(a, pd.DataFrame):
_name = ';'.join(a.columns)
elif hasattr(a, 'name'):
_name = a.name
# assert array
a = np.array(a).copy()
if len(a.shape) == 1:
a = a.reshape(-1, 1)
if return_name:
return a, _name
else:
return a
[docs]@export
def dict_to_model(dic: Mapping) -> Model:
"""
restore a Model object from a dictionary
:param dic: dictionary containing the model definition
:return: Model
"""
# init empty model
if dic['__name__'] == 'Model':
_model = Model()
else:
raise ValueError('__name__ not recognized')
# reconstruct from dict
_model.from_dict(dic)
return _model
[docs]@export
def assert_model(model: Any) -> Model:
"""
takes any Model, model object or dictionary and converts to Model
:param model: Mapping or object containing a model
:return: Model
"""
if not isinstance(model, Mapping):
if not isinstance(model, Model):
return Model(model)
return model
if '__name__' in model.keys():
if model['__name__'] in ['Model']:
_model = dict_to_model(model)
else:
raise ValueError('__name__ not recognized')
else:
raise KeyError('no element named __name__')
return _model
[docs]def force_model(*args, **kwargs):
warnings.warn('force_model is deprecated, please use assert_model instead', DeprecationWarning)
return assert_model(*args, **kwargs)
[docs]@export
def get_coefs(model: Any, y: SequenceOrScalar):
"""
get coefficients of a linear regression in a sorted data frame
:param model: model object
:param y: name of the coefficients
:return: pandas DataFrame containing the coefficient names and values
"""
if isinstance(model, Model):
_model = model.model
else:
_model = model
assert(hasattr(_model, 'coef_')), 'Attribute coef_ not available, did you specify a linear model?'
_coef = _model.coef_.tolist()
_coef.append(model.intercept_)
_df = pd.DataFrame()
_df['feature'] = assert_list(y) + ['intercept']
_df['coef'] = _coef
_df = _df.sort_values(['coef'], ascending=False).reset_index(drop=True)
return _df
[docs]@export
def get_feature_importance(
model: object, predictors: Union[Sequence, str], features_to_sum: Mapping = None
) -> pd.DataFrame:
"""
get feature importance of a decision tree like model in a sorted data frame
:param model: model object
:param predictors: names of the predictors properly sorted
:param features_to_sum: if you want to sum features please provide name mappings
:return: pandas DataFrame containing the feature importances
"""
def _get_feature_importance_rf(__model, __predictors, __features_to_sum: Mapping = None):
# init df for feature importances
___df = pd.DataFrame({
'feature': __predictors,
'importance': __model.feature_importances_
})
# if applicable: sum features
if __features_to_sum is not None:
for _key in list(__features_to_sum.keys()):
___df['feature'] = np.where(___df['feature'].isin(__features_to_sum[_key]), _key, ___df['feature'])
___df = ___df.groupby('feature').sum().reset_index()
___df = ___df.sort_values(['importance'], ascending=False).reset_index(drop=True)
___df['importance'] = np.round(___df['importance'], 5)
return ___df
# get feature importance of a decision tree like model in a sorted data frame
def _get_feature_importance_xgb(_f_model, _f_predictors, _f_features_to_sum=None):
# get f_score
_f_score = _f_model.get_booster().get_fscore()
# init df to return
__df = pd.DataFrame()
# get predictor code
__df['feature_code'] = list(_f_score.keys())
__df['feature_code'] = __df['feature_code'].str[1:].astype(int)
# code importance
__df['importance_abs'] = list(_f_score.values())
__df['importance'] = __df['importance_abs'] / __df['importance_abs'].sum()
__df = __df.sort_values(['feature_code']).reset_index(drop=True)
__df['feature'] = [_f_predictors[x] for x in range(len(_f_predictors)) if x in __df['feature_code']]
if _f_features_to_sum is not None:
for _key in list(_f_features_to_sum.keys()):
__df['feature'] = np.where(__df['feature'].isin(_f_features_to_sum[_key]), _key, __df['feature'])
__df = __df.groupby('feature').sum().reset_index()
__df = __df.sort_values(['importance'], ascending=False).reset_index(drop=True)
__df['importance'] = np.round(__df['importance'], 5)
__df = __df[['feature', 'importance']]
return __df
# -- main
try:
# noinspection PyTypeChecker
_df = _get_feature_importance_rf(model, predictors, features_to_sum)
# this is supposed to also work for XGBoost but it was broken in a recent release
# so below serves as fallback
except ValueError:
# noinspection PyTypeChecker
_df = _get_feature_importance_xgb(model, predictors, features_to_sum)
return _df
[docs]@export
def to_keras_3d(x: DFOrArray, window: int, y: DFOrArray = None, groupby: SequenceOrScalar = None,
groupby_to_dummy: bool = False, dropna: bool = True, reshape: bool = True)\
-> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]:
"""
reformat a DataFrame / 2D array to become a keras compatible 3D array. If dropna is True the first window
observations get dropped since they will contain NaN values in the required shifted elements.
:param x: numpy array or DataFrame
:param window: series-window, how many iterations to convolve
:param y: accompanying target / label DataFrame or numpy 2d array. If specified a modified version of y will be
returned to match x's shape where the first window elements have been dropped. [optional]
:param groupby: column to group by (shift observations in each group) [optional]
:param groupby_to_dummy: Whether to include the groupby value as pandas Dummy [optional]
:param dropna: Whether to drop na rows [optional]
:param reshape: Whether to reshape to keras format observations - timestamps - features [optional]
:return: if y is None: x as 3d array, else: Tuple[x, y]
"""
# -- assert
# if x input is already a 3d-array: return it as is
if hasattr(x, 'shape') and len(x.shape) == 3:
if y is None:
return x
else:
return x, y
# -- init
# - convert x and y to DataFrames
x = assert_df(x)
# - groupby
if groupby in [None, []]:
groupby = GROUPBY_DUMMY
x[GROUPBY_DUMMY] = 1
groupby = assert_list(groupby)
# handle groupby to dummy (needs to be here)
if groupby_to_dummy:
x = pd.concat([x, pd.get_dummies(x[groupby])], axis=1)
_x_cols = list_exclude(x.columns, groupby)
# handle y
if y is None:
_df = x
_y_cols = None
else:
y = assert_df(y)
_y_cols = list_exclude(y.columns, groupby)
_df = pd.concat([x, y], axis=1)
_df = drop_duplicate_cols(_df)
# -- main
# - init new x as list of window lists
_x = [[] for _ in range(window)]
# - init new y as list
_y = []
# - groupby loop
for _index, _df_i in _df.groupby(groupby):
# shift loop
for _step in range(window):
# shift by _step+1 since shifting by 0 would mean using the target (label) value as predictor (feature)
_x_shift_i = _df_i[_x_cols].shift(_step+1)
if dropna:
# drop the first window elements (since they are na in at least 1 cast)
_x_shift_i = _x_shift_i.iloc[window:]
_x[_step].append(_x_shift_i)
# drop the first window elements of y
if y is not None:
_y_i = _df_i[_y_cols]
if dropna:
_y_i = _y_i.iloc[window:]
_y.append(_y_i)
# - use pd concat to make lists of dfs into dfs
_x = [pd.concat(_) for _ in _x]
if y is not None:
_y = pd.concat(_y).values
# - turn x into keras 3d array using numpy.dstack
_x = np.dstack(_x)
# - reshape into the keras format observations - timestamps - features
if reshape:
_x_copy = _x.copy()
_x = np.zeros((_x_copy.shape[0], _x_copy.shape[2], _x_copy.shape[1]))
for _row in range(_x_copy.shape[0]):
_x[_row] = _x_copy[_row].T
# -- return
if y is None:
return _x
else:
return _x, _y
# TODO - make these members of Models
# def grid_search(model, grid, target, n_random=100, n_random_state=0, goal='acc_test', f=cla, opt_class=None,
# opt_score_type=None, ascending=None, do_print=True, display_score=False, float_format=',.4f',
# **kwargs):
# # -- defaults --
#
# if opt_score_type is None:
# if opt_class is not None:
# opt_score_type = 'f1_pr'
# else:
# opt_score_type = 'score'
#
# if ascending is None:
# # goal can be one of scores keys
# if goal in ['r2_train', 'r2_test', 'acc_train', 'acc_test', 'true_pos', 'true_neg']:
# _ascending = False
# else:
# _ascending = True
# else:
# _ascending = ascending
#
# # -- init --
#
# if is_list_like(target):
# target = target[0]
# warnings.warn('target cannot be a list for grid_search, using {}'.format(target))
#
# # you can pass a DataFrame or a dictionary as grid
# if isinstance(grid, pd.DataFrame):
# _grid = grid.copy()
# del grid
# elif isinstance(grid, Mapping):
#
# # expand gridf
#
# _grid = expand_dict_to_df(grid)
# del grid
#
# else:
# raise 'grid should be a dictionary or DataFrame'
#
# # if a number of random combinations to try was passed, use it to sample grid
# if n_random is not None:
# if n_random < _grid.shape[0]:
# _grid = _grid.sample(n=n_random, random_state=n_random_state).reset_index(drop=True)
#
# # -- main --
#
# # iter _grid rows
# _df_out = []
#
# _i = 0
# for _index, _row in _grid.iterrows():
#
# _it_df = pd.DataFrame({'it': [_i]})
# _i += 1
#
# _model = deepcopy(model)
# _string = ''
#
# for _key in _row.keys():
#
# _value = _row[_key]
# if isinstance(_value, float):
# if _value.is_integer(): _value = int(_value)
#
# if len(_string) > 0: _string += ', '
# _string += '{} = {}'.format(_key, _value)
#
# _model.set_params(**{_key: _value})
# _it_df[_key] = _value
#
# if do_print: tprint('{} / {} - {}'.format(_i, _grid.shape[0], _string))
#
# # execute function
# _dict = f(model=_model, target=target, do_print=False, display_score=display_score, **kwargs)
#
# # parse score df
# if opt_score_type == 'score':
# _score = _dict['score']
# elif opt_score_type == 'f1_pr':
# # switch case for k_cross
# if 'f1_pr' in _dict['cm'][target].keys():
# _score = _dict['cm'][target]['f1_pr']
# else:
# _score = _dict['cm'][target]['f1_pr_test']
# elif opt_score_type == 'cm':
# # switch case for k_cross
# if 'cm' in _dict['cm'][target].keys(): _score = _dict['cm'][target]['f1_pr']
# _score = _dict['cm'][target]['cm_test']
# else:
# raise ValueError('Unknown opt_score_type {}'.format(opt_score_type))
#
# if opt_class is not None: _score = _score[_score.index == opt_class]
#
# for _col in _score.columns: _it_df[_col] = _score[_col].mean()
#
# _df_out.append(_it_df)
#
# _df_out = pd.concat(_df_out, ignore_index=True, sort=False).sort_values(by=goal, ascending=_ascending)
#
# # find best combination
# if do_print:
# tprint('')
# print('best combination:')
# display_df(_df_out.head(1), float_format=float_format)
#
# return _df_out
#
#
# # wrapper for grid_search for regression
# def grid_reg(*args, goal='rmse_test_scaled', f=reg, **kwargs):
# return grid_search(*args, goal=goal, f=f, **kwargs)
#
#
# # wrapper for grid_search for regression
# def grid_cla(*args, goal='acc_test', display_cm=False, display_f1_pr=False, display_score=False, f=cla, **kwargs):
# return (
# grid_search(*args, goal=goal, display_cm=display_cm, display_f1_pr=display_f1_pr, display_score=display_score,
# f=f, **kwargs))
#
#
# # just a wrapper for grid_reg that uses k_cross validation by default
# def grid_reg_k(f=k_cross_reg, goal='rmse', **kwargs):
# return grid_search(goal=goal, f=f, **kwargs)
#
#
# def multi_modelling(f, goal, ascending, df_train, df_test, target, predictors_it, predictors_fix=None,
# fixed_only=True,
# do_print=True, print_prefix='', **kwargs):
# # init
# if predictors_fix is None:
# predictors_fix = []
# if not is_list_like(target): target = [target]
# if not is_list_like(predictors_it): predictors_it = [predictors_it]
# if not is_list_like(predictors_fix): predictors_fix = [predictors_fix]
#
# # a predictor cannot be both fix and it
# if len(predictors_fix) > 0:
# _predictors_it = [_ for _ in predictors_it if _ not in predictors_fix]
# if fixed_only: _predictors_it += [{'fixed_only': []}]
# else:
# _predictors_it = predictors_it
#
# ## empty dict so that it can become a DtaFrame
# _df_score = []
#
# _i = 0
# _i_max = len(_predictors_it)
#
# for _predictor in _predictors_it:
#
# _i += 1
#
# if isinstance(_predictor, Mapping):
# _predictor_name = list(_predictor.keys())[0]
# _predictor_value = _predictor[_predictor_name]
# else:
# _predictor_name = _predictor
# _predictor_value = _predictor
#
# if not is_list_like(_predictor_value):
# _predictors_score = [_predictor_value]
# else:
# _predictors_score = [] + _predictor_value
#
# if predictors_fix is not None: _predictors_score = _predictors_score + predictors_fix
#
# _print_prefix = '{}iteration {} / {} : {} - '.format(print_prefix, _i, _i_max, _predictor_name)
#
# _rmse_test = 0
# _rmse_train = 0
#
# _score = \
# f(df_train=df_train, df_test=df_test, target=target, predictors=_predictors_score,
# print_prefix=_print_prefix,
# do_print=do_print, display_score=False, **kwargs)['score']
# _score = pd.DataFrame(_score[_score.index.isin(target)].mean()).T
# _score['predictor'] = _predictor_name
# _score['predictor_value'] = _predictor_value
#
# _df_score.append(_score)
#
# _df_score = pd.concat(_df_score, ignore_index=True, sort=False).sort_values(by=goal,
# ascending=ascending).reset_index(
# drop=True)
#
# if do_print:
# tprint('done')
#
# return _df_score
# # wrapper
# def multi_cla(f=cla, goal='acc_test', ascending=False, **kwargs):
# return multi_modelling(f=f, goal=goal, ascending=ascending, **kwargs)
#
#
# # wrapper
# def multi_reg(f=reg, goal='rmse_test_scaled', ascending=True, **kwargs):
# return multi_modelling(f=f, goal=goal, ascending=ascending, **kwargs)
#
#
# # iteratively build a model by trying all predictors one by one, finding the best one and then trying
# # all again (repeat)
# def iter_modelling(f, goal, df_train, df_test, target, predictors_it, predictors_fix=None, max_depth=10, cutoff=0,
# do_print=True, **kwargs):
# if predictors_fix is None:
# predictors_fix = []
# if cutoff is None: cutoff = - np.inf
#
# # init
# _predictors_fix = deepcopy(predictors_fix)
# _predictors_it = deepcopy(predictors_it)
# if not is_list_like(_predictors_fix): _predictors_fix = [_predictors_fix]
# _predictors_fix = list(_predictors_fix)
# if max_depth > len(predictors_it): max_depth = len(predictors_it)
#
# # a predictor cannot be both fix and it
# _predictors_it = [_ for _ in _predictors_it if _ not in _predictors_fix]
#
# _prev_rmse = 9 ** 9
#
# for _depth in range(max_depth):
#
# _print_prefix = 'depth {} / {} - '.format(_depth + 1, max_depth)
#
# _df_score = f(df_train=df_train, df_test=df_test, target=target, predictors_it=_predictors_it,
# predictors_fix=_predictors_fix, fixed_only=False, do_print=do_print, print_prefix=_print_prefix,
# **kwargs)
# _best = _df_score.iloc[0]
# _best_goal = _best[goal]
# _best_predictor = _best['predictor_value']
#
# print('{}best result: {}'.format(_print_prefix, format_iloc(_best)))
#
# if _prev_rmse - _best_goal > cutoff:
#
# _prev_rmse = _best_goal
#
# # update predictors
# if not is_list_like(_best_predictor):
#
# _predictors_it = [_ for _ in _predictors_it if _ != _best_predictor]
# _predictors_fix += [_best_predictor]
#
# else:
#
# _predictors_it_new = []
#
# for _predictor in _predictors_it:
#
# if not isinstance(_predictor, Mapping):
# _predictors_it_new.append(_predictor)
# else:
# if not _best['predictor'] in _predictor.keys(): _predictors_it_new.append(_predictor)
#
# _predictors_it = deepcopy(_predictors_it_new)
# _predictors_fix += _best_predictor
#
# else:
#
# print('{}cutoff {} not reached, stopping'.format(_print_prefix, cutoff))
# break
#
# return None
#
#
# # wrapper
# def iter_cla(f=multi_cla, goal='acc_test', **kwargs):
# return iter_modelling(f=f, goal=goal, **kwargs)
#
#
# # wrapper
# def iter_reg(f=multi_reg, goal='rmse_test_scaled', **kwargs):
# return iter_modelling(f=f, goal=goal, **kwargs)
#
#
# # iteratively build a model by trying all predictors one by one and accepting those who improve more than cutoff
# # (faster than iter_reg)
# def forward_reg(df_train, df_test, predictors_it, predictors_fix, max_depth=10, cutoff=.001, do_print=True, f=reg,
# **kwargs):
# _predictors_fix = deepcopy(predictors_fix)
# _predictors_it = deepcopy(predictors_it)
# if not is_list_like(_predictors_fix): _predictors_fix = [_predictors_fix]
# _predictors_fix = list(_predictors_fix)
# _predictors_final = [] + _predictors_fix
#
# # a predictor cannot be both fix and it
# _predictors_it = [_ for _ in _predictors_it if _ not in _predictors_fix]
#
# if do_print: tprint('training base model...')
#
# _dict = f(
# df_train=df_train,
# df_test=df_test,
# predictors=predictors_fix,
# do_print=False,
# **kwargs
# )
#
# _base = _dict['score'].iloc[0]
#
# _prev_rmse = _base['rmse_test_scaled']
#
# print('base score: rmse_scaled train / test : {:.4f} / {:.4f} - rmse train / test : {:.4f} / {:.4f}'.format(
# _base['rmse_train_scaled'], _base['rmse_test_scaled'], _base['rmse_train'], _base['rmse_test']))
#
# for _depth in range(max_depth):
#
# _print_prefix = 'depth {} / {} - '.format(_depth + 1, max_depth)
#
# # do a multi_reg
# _df_score = multi_reg(df_train=df_train, df_test=df_test, predictors_it=_predictors_it,
# predictors_fix=_predictors_fix, fixed_only=False, do_print=do_print,
# print_prefix=_print_prefix, f=reg, **kwargs)
#
# # find base model
# _df_score_base = _df_score.query('@_prev_rmse-rmse_test_scaled>@cutoff')
#
# # if the len is zero no improvements are possible -> quit
# if _df_score_base.shape[0] == 0:
# if do_print: print('{}no improvements possible, quitting'.format(_print_prefix))
# break
#
# # get all predictors that made improvements:
# _predictors_pos_name = _df_score_base['predictor'].tolist()
# _predictors_pos = _df_score_base['predictor_value'].tolist()
# # print(_predictors_pos)
# _predictors_fix_new = _predictors_fix + flatten(_predictors_pos)
#
# if do_print: print('{}accepted predictors: {}'.format(_print_prefix, _predictors_pos_name))
#
# if do_print: tprint('{}training new base model...'.format(_print_prefix))
#
# # get new base
# _dict_new = f(
# df_train=df_train,
# df_test=df_test,
# predictors=_predictors_fix_new,
# do_print=False,
# **kwargs
# )
#
# _base_new = _dict_new['score'].iloc[0]
#
# print('{}new score: rmse_scaled train / test : {:.4f} / {:.4f} - rmse train / test : {:.4f} / {:.4f}'.format(
# _print_prefix, _base_new['rmse_train_scaled'], _base_new['rmse_test_scaled'], _base_new['rmse_train'],
# _base_new['rmse_test']))
#
# # new rmse
# if _prev_rmse - _base_new['rmse_test_scaled'] > cutoff:
#
# _prev_rmse = deepcopy(_base_new['rmse_test_scaled'])
# _base = _base_new.copy()
# _dict = deepcopy(_dict_new)
# _predictors_fix = deepcopy(_predictors_fix_new)
# _predictors_final += _predictors_pos_name
#
# # update predictors
# for _new_predictor, _new_predictor_name in zip(_predictors_pos, _predictors_pos_name):
#
# if not is_list_like(_new_predictor):
#
# _predictors_it = [_ for _ in _predictors_it if _ != _new_predictor]
#
# else:
#
# _predictors_it_new = []
#
# for _predictor in _predictors_it:
#
# if not isinstance(_predictor, Mapping):
# _predictors_it_new.append(_predictor)
# else:
# if not _new_predictor_name in _predictor.keys(): _predictors_it_new.append(_predictor)
#
# _predictors_it = deepcopy(_predictors_it_new)
# else:
# print('{}overall score improvement below cutoff {}, quitting'.format(_print_prefix, cutoff))
# break
#
# if len(_predictors_it) == 0:
# print('{}accepted all predictors, quitting'.format(_print_prefix))
# break
#
# print('')
# print('final predictors: {}'.format(_predictors_final))
# print('discarded predictors: {}'.format(_predictors_it))
# print('final score: rmse_scaled train / test : {:.4f} / {:.4f} - rmse train / test : {:.4f} / {:.4f}'.format(
# _base['rmse_train_scaled'], _base['rmse_test_scaled'], _base['rmse_train'], _base['rmse_test']))
#
# return _dict
#
#
# # iteratively build a model by trying all predictors and removing one by one and discarding those whose gain
# # was less than cutoff (faster than iter_reg)
# def backward_modelling(f, goal, ascending, cutoff, df_train, df_test, target, predictors_it, predictors_fix=None,
# max_depth=3, do_print=True, float_format=',.4f', **kwargs):
# # -- init --
# if predictors_fix is None:
# predictors_fix = []
# _predictors_fix = deepcopy(predictors_fix)
# _predictors_it = deepcopy(predictors_it)
# if not is_list_like(_predictors_fix): _predictors_fix = [_predictors_fix]
# _predictors_fix = list(_predictors_fix)
# _predictors_final = [] + _predictors_fix
#
# # a predictor cannot be both fix and it
# _predictors_it = [_ for _ in _predictors_it if _ not in _predictors_fix]
# _predictors_all = predictors_fix + _predictors_it
#
# # -- base --
# if do_print: tprint('training base model...')
#
# _dict = f(
# df_train=df_train,
# df_test=df_test,
# target=target,
# predictors=_predictors_all,
# do_print=False,
# display_score=False,
# **kwargs
# )f
#
# _score = _dict['score']
# _score = _score[_score.index.isin(target)]
# _base = _score.mean()
#
# _best_goal = _base[goal]
#
# print('base score: {}'.format(format_iloc(_base, float_format=float_format)))
#
# # -- main loop --
# for _depth in range(max_depth):
#
# _print_prefix = 'depth {} / {} - '.format(_depth + 1, max_depth)
#
# tprint('{}init...'.format(_print_prefix))
#
# _i = 0
# _i_max = len(_predictors_it)
# _predictors_it_new = deepcopy(_predictors_it)
# _removed = 0
#
# # loop all remaining predictors
# for _predictor in _predictors_it:
#
# _i += 1
#
# _predictors = [_ for _ in _predictors_all if _ != _predictor]
#
# # try:
# if True:
#
# _dict_new = f(
# df_train=df_train,
# df_test=df_test,
# target=target,
# predictors=_predictors,
# do_print=False,
# display_score=False,
# **kwargs
# )
#
# _score_new = _dict_new['score']
# _score_new = _score_new[_score_new.index.isin(target)]
# _base_new = _score_new.mean()
#
# _goal = _base_new[goal]
# _full_score = format_iloc(_base_new, float_format=float_format)
# _new_score = _base_new[goal]
# _new_score_formatted = format_iloc(_base_new[[goal]], float_format=float_format)
#
# if ascending:
# _goal_diff = _best_goal - _new_score
# else:
# _goal_diff = _new_score - _best_goal
#
# tprint(
# '{}predictor {} / {} : {} ; {} - best: {} - diff: {}'.format(_print_prefix, _i, _i_max,
# _predictor,
# _new_score_formatted,
# format(_best_goal, float_format),
# format(_goal_diff, float_format)))
#
# if _goal_diff <= cutoff:
#
# _predictors_all = deepcopy(_predictors)
# _predictors_it_new.remove(_predictor)
# _removed += 1
# _dict = deepcopy(_dict_new)
#
# # if the diff is below 0 a new best score was reached
# if _goal_diff < 0: _best_goal = deepcopy(_new_score)
#
# # except:
# # print('')
# # print('predictor {} failed'.format(_predictor))
# # pass
#
# if _removed == 0:
# tprint('')
# print('no improvements possible, quitting')
# break
#
# _predictors_it = deepcopy(_predictors_it_new)
#
# tprint('')
# print('{}predictors {} ; {}'.format(_print_prefix, _predictors_all, _full_score))
#
# if len(_predictors_all) == 1:
# print('only 1 predictor left, quitting')
# break
#
# return _dict
#
#
# def backward_cla(f=cla, goal='acc_test', ascending=True, cutoff=0, float_format=',.3f', **kwargs):
# return backward_modelling(f=f, goal=goal, ascending=ascending, cutoff=cutoff, float_format=float_format,
# **kwargs)
#
#
# def backward_reg(f=reg, goal='rmse_test_scaled', ascending=False, cutoff=0, **kwargs):
# return backward_modelling(f=f, goal=goal, ascending=ascending, cutoff=cutoff, **kwargs)