Source code for hhpy.modelling

"""
hhpy.modelling.py
~~~~~~~~~~~~~~~~~

Contains a model class that is based on pandas DataFrames and wraps around sklearn and other frameworks
to provide convenient train test functions.

"""

# standard imports
import itertools
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# third party imports
from copy import deepcopy
from typing import Sequence, Mapping, Union, Callable, Optional

from sklearn.exceptions import DataConversionWarning

try:
    from IPython.core.display import display
except ImportError:
    display = print

# local imports
from hhpy.main import export, is_list_like, force_list, tprint, DocstringProcessor
from hhpy.ds import k_split, df_score
from hhpy.plotting import ax_as_list, legend_outside, rcParams as hpt_rcParams, docstr as docstr_hpt

# --- constants
__all__ = []

validations = {
    'Model_predict_valid_return_types': ['y', 'df', 'DataFrame'],
    'Models_predict_valid_return_types': ['y', 'df', 'DataFrame', 'self'],
    'Models_score_valid_return_types': ['self', 'df', 'DataFrame'],
    'Models_score_valid_scores': ['r2', 'rmse', 'mae', 'stdae', 'medae'],
    'fit_valid_fit_types': ['train_test', 'k_cross', 'final'],
}

docstr = DocstringProcessor(
    df='Pandas DataFrame containing the training and testing data. '
       'Can be saved to the Model object or supplied on an as needed basis.',
    X_ref='List of features (predictors) used for training the model',
    y_ref='List of labels (targets) to be predicted',
    X='The feature (predictor) data used for training as DataFrame, np.array or column names',
    y='The label (target) data used for training as DataFrame, np.array or column names',
    X_test='The feature (predictor) data used for testing as DataFrame, np.array or column names',
    y_test='The label (target) data used for testing as DataFrame, np.array or column names',
    df_train='Pandas DataFrame containing the training data, optional if array like data is passed for X/y',
    df_test='Pandas DataFrame containing the testing data, optional if array like data is passed for X/y test',
    X_predict='The feature (predictor) data used for predicting as DataFrame, np.array or column names',
    df_predict='Pandas DataFrame containing the predict data, optional if array like data is passed for X_predict',
    model_in='Any model object that implements .fit and .predict',
    name='Name of the model, used for naming columns [optional]',
    dropna='Whether to drop rows containing NA in the training data [optional]',
    scaler_X='Scalar object that implements .transform and .inverse_transform, applied to the features (predictors)'
             'before training and inversely after predicting',
    scaler_y='Scalar object that implements .transform and .inverse_transform, applied to the labels (targets)'
             'before training and inversely after predicting',
    do_print='Whether to print the steps to console',
    display_score='Whether to display the score DataFrame',
    ensemble='if True also predict with Ensemble like combinations of models. If True or mean calculate'
             'mean of individual predictions. If median calculate median of individual predictions.',
    printf='print function to use for logging',
    **validations
)


# --- classes
class _BaseModel:
    """
        base class for unified model classes defined below
        the goal is to have a unified meta framework for multiple machine learning frameworks
        that is convenient to use
    """

    def __init__(self, name: str):

        self.__name__ = 'hhpy.modelling.Base'
        self.__attributes__ = ['__name__', 'name']
        self.name = name

    def __repr__(self):
        _str = self.__name__ + '('

        _it = -1
        for _attr in self.__attributes__[1:]:
            _it += 1
            if _it > 0:
                _str += ', '
            _str += '{}={}'.format(_attr, self.__getattribute__(_attr))

        _str += ')'

        return _str

    def to_dict(self):

        _dict = {}
        for _attr_name in self.__attributes__:
            _attr = self.__getattribute__(_attr_name)
            if 'to_dict' in dir(_attr):
                _attr = _attr.to_dict()
            elif is_list_like(_attr):
                if isinstance(_attr, Mapping):
                    for _key, _value in _attr.items():
                        if 'to_dict' in dir(_value):
                            _attr[_key] = _value.to_dict()
                else:
                    for _i in range(len(_attr)):
                        if 'to_dict' in dir(_attr[_i]):
                            _attr[_i] = _attr[_i].to_dict()
            _dict[_attr_name] = _attr

        return _dict

    def from_dict(self, dct: Mapping):

        for _attr_name in self.__attributes__:
            if _attr_name not in dct.keys():
                continue
            _attr = dct[_attr_name]
            if is_list_like(_attr):
                if isinstance(_attr, Mapping):

                    if '__name__' in _attr.keys():

                        _name = _attr['__name__']
                        if _name[:3] == 'cf.':
                            _name = _name[3:]
                        _attr_eval = eval(_name + '()')
                        if 'from_dict' in dir(_attr_eval):
                            _attr_eval.from_dict(_attr)
                        _attr = _attr_eval

                    else:

                        for _attr_key, _attr_value in _attr.items():

                            if isinstance(_attr_value, Mapping):

                                if '__name__' in _attr_value.keys():

                                    _name = _attr_value['__name__']
                                    if _name[:3] == 'cf.':
                                        _name = _name[3:]
                                    _attr_eval = eval(_name + '()')
                                    if 'from_dict' in dir(_attr_eval):
                                        _attr_eval.from_dict(_attr_value)
                                    _attr[_attr_key] = _attr_eval

                else:
                    for _i in range(len(_attr)):

                        _attr_value = _attr[_i]

                        if isinstance(_attr_value, Mapping):

                            if '__name__' in _attr_value.keys():

                                _name = _attr_value['__name__']
                                if _name[:3] == 'cf.':
                                    _name = _name[3:]
                                _attr_eval = eval(_name + '()')
                                if 'from_dict' in dir(_attr_eval):
                                    _attr_eval.from_dict(_attr_value)
                                _attr[_i] = _attr_eval

            self.__setattr__(_attr_name, _attr)

    def save(self, filename: str, f: Callable = pd.to_pickle):

        _dict = self.copy().to_dict()
        f(_dict, filename)

    def load(self, filename: str, f: Callable = pd.read_pickle):

        self.from_dict(f(filename))

    def copy(self):

        return deepcopy(self)


[docs]@docstr @export class Model(_BaseModel): """ A unified modeling class that is extended from sklearn, accepts any model that implements .fit and .predict :param model: %(model_in)s :param name: %(name)s :param X_ref: %(X_ref)s :param y_ref: %(y_ref)s """ @docstr def __init__(self, model: object = None, name: str = 'pred', X_ref: Union[Sequence, str] = None, y_ref: Union[Sequence, str] = None): """ init method :param model: %(model_in)s :param name: %(name)s :param X_ref: %(X_ref)s :param y_ref: %(y_ref)s """ # -- check super().__init__(name) # -- init if X_ref is not None: X_ref = force_list(X_ref) if y_ref is not None: y_ref = force_list(y_ref) # -- assign self.__attributes__ = ['__name__', 'name', 'model', 'X_ref', 'y_ref', 'is_fit'] self.__name__ = 'cf.Model' if isinstance(model, str): model = eval(model) self.model = model self.X_ref = X_ref self.y_ref = y_ref self.is_fit = False
[docs] @docstr def fit(self, X: Union[np.ndarray, Sequence, str] = None, y: Union[np.ndarray, Sequence, str] = None, df: pd.DataFrame = None, dropna: bool = True, X_test: Union[np.ndarray, Sequence, str] = None, y_test: Union[np.ndarray, Sequence, str] = None, df_test: pd.DataFrame = None) -> None: """ generalized fit method extending on model.fit :param X: %(X)s :param y: %(y)s :param df: %(df_train)s :param dropna: %(dropna)s :param X_test: %(X_test)s :param y_test: %(y_test)s :param df_test: %(df_test)s :return: None """ if df is None: if isinstance(X, pd.DataFrame) and (isinstance(y, pd.DataFrame) or isinstance(y, pd.Series)): _df_X = X.copy() _df_y = pd.DataFrame(y.copy()) _df = pd.concat([_df_X, _df_y], axis=1) else: _df_X = pd.DataFrame(X) _df_y = pd.DataFrame(y) _df = pd.concat([_df_X, _df_y], ignore_index=True, sort=False, axis=1) self.X_ref = _df_X.columns self.y_ref = _df_y.columns del _df_X, _df_y else: if self.X_ref is None: self.X_ref = force_list(X) if self.y_ref is None: self.y_ref = force_list(y) _df = df.copy() del df if df_test is None: _X_test = X_test _y_test = y_test else: _X_test = df_test[self.X_ref] _y_test = df_test[self.y_ref] if dropna: _df = _df.dropna() _X = _df[self.X_ref] _y = _df[self.y_ref] # -- fit _kwargs = {'X': _X, 'y': _y} if 'X_test' in self.model.fit.__code__.co_varnames: _kwargs['X_test'] = _X_test if 'y_test' in self.model.fit.__code__.co_varnames: _kwargs['y_test'] = _y_test warnings.simplefilter('ignore', (FutureWarning, DataConversionWarning)) self.model.fit(**_kwargs) warnings.simplefilter('default', (FutureWarning, DataConversionWarning)) self.is_fit = True
[docs] @docstr def predict(self, X=None, df=None, return_type='y') -> Union[pd.Series, pd.DataFrame]: """ Generalized predict method based on model.predict :param X: %(X)s :param df: %(df)s :param return_type: one of %(Model_predict_valid_return_types)s, if 'y' returns a pandas Series / DataFrame with only the predictions, if one of 'df','DataFrame' returns the full DataFrame with predictions added :return: see return_type """ if not self.is_fit: raise ValueError('Model is not fit yet') _valid_return_types = validations['Model_predict_valid_return_types'] assert(return_type in _valid_return_types), 'return type must be one of {}'.format(_valid_return_types) # ensure pandas df if df is None: _df = pd.DataFrame(X) else: _df = df.copy() del df _X = _df[self.X_ref].dropna() _y_ref = ['{}_pred'.format(_) for _ in self.y_ref] # special case if there is only one target (most algorithms support only one) if len(self.y_ref) == 1: _y_ref = _y_ref[0] # predict using sklearn api _y_pred = self.model.predict(_X) # cast to pandas _y_pred = pd.DataFrame(_y_pred) _y_pred.index = _X.index # feed back to df _df[_y_ref] = _y_pred if return_type == 'y': return _df[_y_ref] elif return_type in ['df', 'DataFrame']: return _df else: raise ValueError('{} is not a valid return_type'.format(return_type))
[docs]@docstr @export class Models(_BaseModel): """ Collection of Models that allow for fitting and predicting with multiple Models at once, comparing accuracy and creating Ensembles :param args: multiple Model objects that will form a Models Collection :param name: name of the collection :param df: %(df)s :param X_ref: %(X_ref)s :param y_ref: %(y_ref)s :param scaler_X: %(scaler_X)s :param scaler_y: %(scaler_y)s :param printf: %(printf)s """ def __init__(self, *args: Union[object, Sequence], name: str = None, df: pd.DataFrame = None, X_ref: Union[Sequence, str] = None, y_ref: Union[Sequence, str] = None, scaler_X: object = None, scaler_y: object = None, printf: Callable = tprint): super().__init__(name) _models = [] _model_names = [] # ensure hhpy.modelling.Model _it = -1 for _arg in args: for _model in force_list(_arg): _it += 1 if not isinstance(_model, Model): if 'name' in dir(_model): _name = _model.name elif '__name__' in dir(_model): _name = '{}_{}'.format(_model.__name__, _it) else: _name = 'model_{}'.format(_it) _model = Model(_model, name=_name) _models.append(deepcopy(_model)) if _model.name not in _model_names: _model_names.append(_model.name) # -- assign self.__attributes__ = ['__name__', 'name', 'models', 'fit_type', 'df', 'X_ref', 'y_ref', 'scaler_X', 'scaler_y', 'model_names', 'df_score'] self.__name__ = 'hhpy.modelling.Models' self.models = _models self.fit_type = None self.df = df self.X_ref = force_list(X_ref) self.y_ref = force_list(y_ref) self.scaler_X = scaler_X self.scaler_y = scaler_y self.model_names = _model_names self.df_score = None self.printf = printf
[docs] @docstr def k_split(self, **kwargs): """ apply hhpy.ds.k_split to self to create train-test or k-cross ready data :param kwargs: keyword arguments passed to hhpy.ds.k_split :return: None """ # -- init assert self.df is not None, 'You must specify a df' # -- split self.df['_k_index'] = k_split(df=self.df, return_type='s', **kwargs)
[docs] @docstr def model_by_name(self, name: Union[list, str]) -> Union[Model, list]: """ extract a list of Models from the collection by their names :param name: name of the Model :return: list of Models """ _models = [] for _model in self.models: if _model.name in force_list(name): if not is_list_like(name): return _model else: _models.append(_model) return _models
[docs] @docstr def fit(self, fit_type: str = 'train_test', do_print: bool = True): """ fit all Model objects in collection :param fit_type: one of %(fit_valid_fit_types)s :param do_print: %(do_print)s :return: None """ # TODO - CROSS VALIDATION _valid_fit_types = validations['fit_valid_fit_types'] assert fit_type in _valid_fit_types, 'fit type must be one of {}'.format(_valid_fit_types) assert not ((fit_type != 'final') and ('_k_index' not in self.df.columns)), 'DataFrame is not k_split yet' assert fit_type != 'cross', 'cross validation not implemented yet' _df_X = self.df[self.X_ref] _df_y = self.df[self.y_ref] if self.scaler_X is not None: self.scaler_X = self.scaler_X.fit(_df_X) _df_X_scaled = pd.DataFrame(self.scaler_X.transform(_df_X), columns=_df_X.columns) else: _df_X_scaled = _df_X.copy() if self.scaler_y is not None: self.scaler_y = self.scaler_y.fit(_df_y) _df_y_scaled = pd.DataFrame(self.scaler_y.transform(_df_y), columns=_df_y.columns) else: _df_y_scaled = _df_y.copy() _df_scaled = pd.concat([_df_X_scaled, _df_y_scaled], axis=1) _df_scaled['_k_index'] = self.df['_k_index'] if fit_type == 'train_test': _k_tests = [_df_scaled['_k_index'].max()] self.df['_test'] = self.df['_k_index'] == _k_tests[0] elif fit_type == 'k_cross': _k_tests = sorted(_df_scaled['_k_index'].unique()) else: _k_tests = [-1] for _k_test in _k_tests: _ = _k_test _df_train = _df_scaled.query('_k_index!=@_k_test') _df_test = _df_scaled.query('_k_index==@_k_test') for _model in self.models: if do_print: self.printf('fitting model {}...'.format(_model.name)) _model.fit(X=self.X_ref, y=self.y_ref, df=_df_train, df_test=_df_test) self.fit_type = fit_type
[docs] @docstr def predict(self, X=None, df=None, return_type='self', ensemble=False, do_print=True): """ predict with all models in collection :param X: %(X_predict)s :param df: %(df_predict)s :param return_type: one of %(Models_predict_valid_return_types)s :param ensemble: %(ensemble)s :param do_print: %(do_print)s :return: if return_type is self: None, else see Model.predict """ _valid_return_types = ['self', 'df', 'DataFrame'] assert(return_type in _valid_return_types),\ 'return_type must be one of {}'.format(_valid_return_types) if not self.fit_type: raise ValueError('Model is not fit yet') if X is None: if df is None: X = self.df else: X = df if df is not None: _X = df[X] elif isinstance(X, pd.DataFrame): _X = X[self.X_ref] else: _X = X if self.scaler_X is not None: # noinspection PyUnresolvedReferences _X = self.scaler_X.transform(_X) _X = pd.DataFrame(_X, columns=self.X_ref) _y_preds = [] _y_ref_preds = [] _model_names = [] for _model in self.models: if do_print: self.printf('predicting model {}...'.format(_model.name)) _model_names.append(_model.name) _y_ref_preds += ['{}_{}'.format(_, _model.name) for _ in _model.y_ref] _y_pred_scaled = _model.predict(X=_X) if self.scaler_y is None: _y_pred = deepcopy(_y_pred_scaled) else: _df_y_pred_scaled = pd.DataFrame(_y_pred_scaled) _df_y_pred_scaled.columns = _model.y_ref # if the model is predicting only a subset of ys we need to reshape before we scale if _model.y_ref != self.y_ref: for _col in self.y_ref: if _col not in _model.y_ref: _df_y_pred_scaled[_col] = np.nan # ensure correct order _df_y_pred_scaled = _df_y_pred_scaled[self.y_ref] _df_y_pred = self.scaler_y.inverse_transform(_df_y_pred_scaled) _df_y_pred = pd.DataFrame(_df_y_pred) _df_y_pred.columns = self.y_ref _y_pred = _df_y_pred[_model.y_ref] _y_preds.append(_y_pred) # drop duplicates _model_names = list(set(_model_names)) _df = pd.concat(_y_preds, axis=1, sort=False) _df.columns = _y_ref_preds if ensemble: # supports up to 3 model ensembles for _comb in list(itertools.combinations(_model_names, 2)) + list(itertools.combinations(_model_names, 3)): for _y_ref in self.y_ref: _comb_name = '_'.join(_comb) _y_comb_name = '{}_{}'.format(_y_ref, _comb_name) _cols = [] for _element in _comb: _cols.append('{}_{}'.format(_y_ref, _element)) _df[_y_comb_name] = _df[_cols].mean(axis=1) if _comb_name not in self.model_names: self.model_names.append(_comb_name) if return_type == 'self': for _col in _df.columns: self.df[_col] = _df[_col] elif return_type in ['df', 'DataFrame']: return _df
[docs] @docstr def score(self, return_type: str = 'self', pivot: bool = False, do_print: bool = True, display_score: bool = True, **kwargs) -> Optional[pd.DataFrame]: """ calculate score of the Model predictions :param return_type: one of %(Models_score_valid_return_types)s :param pivot: whether to pivot the DataFrame for easier readability [optional] :param do_print: %(do_print)s :param display_score: %(display_score)s :param kwargs: other keyword arguments passed to :func: `~hhpy.ds.df_score` :return: if return_type is 'self': None, else: pandas DataFrame containing the scores """ if do_print: self.printf('scoring...') _valid_return_types = ['self', 'df', 'DataFrame'] assert(return_type in _valid_return_types),\ 'return_type must be one of {}'.format(_valid_return_types) if not self.fit_type: raise ValueError('Model is not fit yet') elif self.fit_type == 'train_test': _df = self.df.query('_test') else: # self.fit_type == 'final' _df = self.df.copy() _df_score = df_score(df=_df, y_true=self.y_ref, pred_suffix=self.model_names, pivot=pivot, **kwargs) if display_score: if pivot: _display_score = _df_score else: _display_score = _df_score.pivot_table(index=['y_ref', 'model'], columns='score', values='value') display(_display_score) if return_type == 'self': self.df_score = _df_score else: return _df_score
[docs] @docstr def train(self, df: pd.DataFrame = None, k: int = 5, groupby: Union[Sequence, str] = None, sortby: Union[Sequence, str] = None, random_state: int = None, fit_type: str = 'train_test', ensemble: bool = False, scores: Union[Sequence, str] = None, scale: float = None, do_predict: bool = True, do_score: bool = True, do_split: bool = True, do_fit: bool = True, do_print: bool = True, display_score: bool = True): """ wrapper method that combined k_split, train, predict and score :param df: %(df)s :param k: see hhpy.ds.k_split see hhpy.ds.k_split :param groupby: see hhpy.ds.k_split :param sortby: see hhpy.ds.k_split :param random_state: see hhpy.ds.k_split :param fit_type: see .fit :param ensemble: %(ensemble)s :param scores: see .score :param scale: see .score :param do_print: %(do_print)s :param display_score: %(display_score)s :param do_split: whether to apply k_split [optional] :param do_fit: whether to fit the Models [optional] :param do_predict: whether to add predictions to DataFrame [optional] :param do_score: whether to create self.df_score [optional] :return: None """ if df is not None: self.df = df if do_split: self.k_split(k=k, groupby=groupby, sortby=sortby, random_state=random_state, do_print=do_print) if do_fit: self.fit(fit_type=fit_type, do_print=do_print) if do_predict: self.predict(ensemble=ensemble, do_print=do_print) if do_score: self.score(scores=scores, scale=scale, do_print=do_print, display_score=display_score) if do_print: self.printf('training done')
[docs] @docstr_hpt def scoreplot(self, x='y_ref', y='value', hue='model', hue_order=None, row='score', row_order=None, palette=hpt_rcParams['palette'], width=16, height=9 / 2, scale=None, query=None, return_fig_ax=False, **kwargs) -> Optional[tuple]: """ plot the score(s) using sns.barplot :param x: %(x)s :param y: %(y)s :param hue: %(hue)s :param hue_order: %(order)s :param row: the variable to wrap around the rows [optional] :param row_order: %(order)s :param palette: %(palette)s :param width: %(subplot_width)s :param height: %(subplot_height)s :param scale: scale the values [optional :param query: query to be passed to pd.DataFrame.query before plotting [optional] :param return_fig_ax: %(return_fig_ax)s :param kwargs: other keyword arguments passed to sns.barplot :return: see return_fig_ax """ # -- init if row_order is None: row_order = ['r2', 'rmse', 'mae', 'stdae', 'medae'] if hue_order is None: hue_order = self.model_names _row_order = force_list(row_order) fig, ax = plt.subplots(nrows=len(_row_order), figsize=(width, height * len(_row_order))) _ax_list = ax_as_list(ax) _df_score = self.score(return_type='df', scale=scale, do_print=False, display_score=False) if query is not None: _df_score = _df_score.query(query) _row = -1 for _row_value in _row_order: _row += 1 _ax = _ax_list[_row] sns.barplot(x=x, y=y, hue=hue, hue_order=hue_order, data=_df_score.query('{}==@_row_value'.format(row)), ax=_ax, palette=palette, **kwargs) _y_label = _row_value if scale: _y_label += '_scale={}'.format(scale) _ax.set_ylabel(_y_label) _ax.set_xlabel('') legend_outside(ax) if return_fig_ax: return fig, ax else: plt.show(fig)
# --- functions
[docs]@export def dict_to_model(dic: Mapping) -> Model: """ restore a Model object from a dictionary :param dic: dictionary containing the model definition :return: Model """ # init empty model if dic['__name__'] == 'Model': _model = Model() else: raise ValueError('__name__ not recognized') # reconstruct from dict _model.from_dict(dic) return _model
[docs]@export def force_model(model: Union[object, Mapping]) -> Model: """ takes any Model, model object or dictionary and converts to Model :param model: Mapping or object containing a model :return: Model """ if not isinstance(model, Mapping): if not isinstance(model, Model): return Model(model) return model if '__name__' in model.keys(): if model['__name__'] in ['hhpy.modelling.Model']: _model = dict_to_model(model) else: raise ValueError('__name__ not recognized') else: raise KeyError('no element named __name__') return _model
[docs]@export def get_coefs(model: object, y: Union[Sequence, str]): """ get coefficients of a linear regression in a sorted data frame :param model: model object :param y: name of the coefficients :return: pandas DataFrame containing the coefficient names and values """ if isinstance(model, Model): _model = model.model else: _model = model assert(hasattr(_model, 'coef_')), 'Attribute coef_ not available, did you specify a linear model?' _coef = _model.coef_.tolist() _coef.append(model.intercept_) _df = pd.DataFrame() _df['feature'] = force_list(y) + ['intercept'] _df['coef'] = _coef _df = _df.sort_values(['coef'], ascending=False).reset_index(drop=True) return _df
[docs]@export def get_feature_importance(model: object, predictors: Union[Sequence, str], features_to_sum: Mapping = None) -> pd.DataFrame: """ get feature importance of a decision tree like model in a sorted data frame :param model: model object :param predictors: names of the predictors properly sorted :param features_to_sum: if you want to sum features please provide name mappings :return: pandas DataFrame containing the feature importances """ def _get_feature_importance_rf(_f_model, _f_predictors, _f_features_to_sum=None): _feature_importances = _f_model.feature_importances_ __df = pd.DataFrame() __df['feature'] = _f_predictors __df['importance'] = _feature_importances if _f_features_to_sum is not None: for _key in list(_f_features_to_sum.keys()): __df['feature'] = np.where(__df['feature'].isin(_f_features_to_sum[_key]), _key, __df['feature']) __df = __df.groupby('feature').sum().reset_index() __df = __df.sort_values(['importance'], ascending=False).reset_index(drop=True) __df['importance'] = np.round(__df['importance'], 5) return __df # get feature importance of a decision tree like model in a sorted data frame def _get_feature_importance_xgb(_f_model, _f_predictors, _f_features_to_sum=None): # get f_score _f_score = _f_model.get_booster().get_fscore() # init df to return __df = pd.DataFrame() # get predictor code __df['feature_code'] = list(_f_score.keys()) __df['feature_code'] = __df['feature_code'].str[1:].astype(int) # code importance __df['importance_abs'] = list(_f_score.values()) __df['importance'] = __df['importance_abs'] / __df['importance_abs'].sum() __df = __df.sort_values(['feature_code']).reset_index(drop=True) __df['feature'] = [_f_predictors[x] for x in range(len(_f_predictors)) if x in __df['feature_code']] if _f_features_to_sum is not None: for _key in list(_f_features_to_sum.keys()): __df['feature'] = np.where(__df['feature'].isin(_f_features_to_sum[_key]), _key, __df['feature']) __df = __df.groupby('feature').sum().reset_index() __df = __df.sort_values(['importance'], ascending=False).reset_index(drop=True) __df['importance'] = np.round(__df['importance'], 5) __df = __df[['feature', 'importance']] return __df try: # noinspection PyTypeChecker _df = _get_feature_importance_rf(model, predictors, features_to_sum) # this is supposed to also work for XGBoost but it was broken in a recent release # so below serves as fallback except ValueError: # noinspection PyTypeChecker _df = _get_feature_importance_xgb(model, predictors, features_to_sum) return _df
# TODO - make these members of Models # def grid_search(model, grid, target, n_random=100, n_random_state=0, goal='acc_test', f=cla, opt_class=None, # opt_score_type=None, ascending=None, do_print=True, display_score=False, float_format=',.4f', # **kwargs): # # -- defaults -- # # if opt_score_type is None: # if opt_class is not None: # opt_score_type = 'f1_pr' # else: # opt_score_type = 'score' # # if ascending is None: # # goal can be one of scores keys # if goal in ['r2_train', 'r2_test', 'acc_train', 'acc_test', 'true_pos', 'true_neg']: # _ascending = False # else: # _ascending = True # else: # _ascending = ascending # # # -- init -- # # if is_list_like(target): # target = target[0] # warnings.warn('target cannot be a list for grid_search, using {}'.format(target)) # # # you can pass a DataFrame or a dictionary as grid # if isinstance(grid, pd.DataFrame): # _grid = grid.copy() # del grid # elif isinstance(grid, Mapping): # # # expand gridf # # _grid = expand_dict_to_df(grid) # del grid # # else: # raise 'grid should be a dictionary or DataFrame' # # # if a number of random combinations to try was passed, use it to sample grid # if n_random is not None: # if n_random < _grid.shape[0]: # _grid = _grid.sample(n=n_random, random_state=n_random_state).reset_index(drop=True) # # # -- main -- # # # iter _grid rows # _df_out = [] # # _i = 0 # for _index, _row in _grid.iterrows(): # # _it_df = pd.DataFrame({'it': [_i]}) # _i += 1 # # _model = deepcopy(model) # _string = '' # # for _key in _row.keys(): # # _value = _row[_key] # if isinstance(_value, float): # if _value.is_integer(): _value = int(_value) # # if len(_string) > 0: _string += ', ' # _string += '{} = {}'.format(_key, _value) # # _model.set_params(**{_key: _value}) # _it_df[_key] = _value # # if do_print: tprint('{} / {} - {}'.format(_i, _grid.shape[0], _string)) # # # execute function # _dict = f(model=_model, target=target, do_print=False, display_score=display_score, **kwargs) # # # parse score df # if opt_score_type == 'score': # _score = _dict['score'] # elif opt_score_type == 'f1_pr': # # switch case for k_cross # if 'f1_pr' in _dict['cm'][target].keys(): # _score = _dict['cm'][target]['f1_pr'] # else: # _score = _dict['cm'][target]['f1_pr_test'] # elif opt_score_type == 'cm': # # switch case for k_cross # if 'cm' in _dict['cm'][target].keys(): _score = _dict['cm'][target]['f1_pr'] # _score = _dict['cm'][target]['cm_test'] # else: # raise ValueError('Unknown opt_score_type {}'.format(opt_score_type)) # # if opt_class is not None: _score = _score[_score.index == opt_class] # # for _col in _score.columns: _it_df[_col] = _score[_col].mean() # # _df_out.append(_it_df) # # _df_out = pd.concat(_df_out, ignore_index=True, sort=False).sort_values(by=goal, ascending=_ascending) # # # find best combination # if do_print: # tprint('') # print('best combination:') # display_df(_df_out.head(1), float_format=float_format) # # return _df_out # # # # wrapper for grid_search for regression # def grid_reg(*args, goal='rmse_test_scaled', f=reg, **kwargs): # return grid_search(*args, goal=goal, f=f, **kwargs) # # # # wrapper for grid_search for regression # def grid_cla(*args, goal='acc_test', display_cm=False, display_f1_pr=False, display_score=False, f=cla, **kwargs): # return ( # grid_search(*args, goal=goal, display_cm=display_cm, display_f1_pr=display_f1_pr, display_score=display_score, # f=f, **kwargs)) # # # # just a wrapper for grid_reg that uses k_cross validation by default # def grid_reg_k(f=k_cross_reg, goal='rmse', **kwargs): # return grid_search(goal=goal, f=f, **kwargs) # # # def multi_modelling(f, goal, ascending, df_train, df_test, target, predictors_it, predictors_fix=None, # fixed_only=True, # do_print=True, print_prefix='', **kwargs): # # init # if predictors_fix is None: # predictors_fix = [] # if not is_list_like(target): target = [target] # if not is_list_like(predictors_it): predictors_it = [predictors_it] # if not is_list_like(predictors_fix): predictors_fix = [predictors_fix] # # # a predictor cannot be both fix and it # if len(predictors_fix) > 0: # _predictors_it = [_ for _ in predictors_it if _ not in predictors_fix] # if fixed_only: _predictors_it += [{'fixed_only': []}] # else: # _predictors_it = predictors_it # # ## empty dict so that it can become a DtaFrame # _df_score = [] # # _i = 0 # _i_max = len(_predictors_it) # # for _predictor in _predictors_it: # # _i += 1 # # if isinstance(_predictor, Mapping): # _predictor_name = list(_predictor.keys())[0] # _predictor_value = _predictor[_predictor_name] # else: # _predictor_name = _predictor # _predictor_value = _predictor # # if not is_list_like(_predictor_value): # _predictors_score = [_predictor_value] # else: # _predictors_score = [] + _predictor_value # # if predictors_fix is not None: _predictors_score = _predictors_score + predictors_fix # # _print_prefix = '{}iteration {} / {} : {} - '.format(print_prefix, _i, _i_max, _predictor_name) # # _rmse_test = 0 # _rmse_train = 0 # # _score = \ # f(df_train=df_train, df_test=df_test, target=target, predictors=_predictors_score, # print_prefix=_print_prefix, # do_print=do_print, display_score=False, **kwargs)['score'] # _score = pd.DataFrame(_score[_score.index.isin(target)].mean()).T # _score['predictor'] = _predictor_name # _score['predictor_value'] = _predictor_value # # _df_score.append(_score) # # _df_score = pd.concat(_df_score, ignore_index=True, sort=False).sort_values(by=goal, # ascending=ascending).reset_index( # drop=True) # # if do_print: # tprint('done') # # return _df_score # # wrapper # def multi_cla(f=cla, goal='acc_test', ascending=False, **kwargs): # return multi_modelling(f=f, goal=goal, ascending=ascending, **kwargs) # # # # wrapper # def multi_reg(f=reg, goal='rmse_test_scaled', ascending=True, **kwargs): # return multi_modelling(f=f, goal=goal, ascending=ascending, **kwargs) # # # # iteratively build a model by trying all predictors one by one, finding the best one and then trying # # all again (repeat) # def iter_modelling(f, goal, df_train, df_test, target, predictors_it, predictors_fix=None, max_depth=10, cutoff=0, # do_print=True, **kwargs): # if predictors_fix is None: # predictors_fix = [] # if cutoff is None: cutoff = - np.inf # # # init # _predictors_fix = deepcopy(predictors_fix) # _predictors_it = deepcopy(predictors_it) # if not is_list_like(_predictors_fix): _predictors_fix = [_predictors_fix] # _predictors_fix = list(_predictors_fix) # if max_depth > len(predictors_it): max_depth = len(predictors_it) # # # a predictor cannot be both fix and it # _predictors_it = [_ for _ in _predictors_it if _ not in _predictors_fix] # # _prev_rmse = 9 ** 9 # # for _depth in range(max_depth): # # _print_prefix = 'depth {} / {} - '.format(_depth + 1, max_depth) # # _df_score = f(df_train=df_train, df_test=df_test, target=target, predictors_it=_predictors_it, # predictors_fix=_predictors_fix, fixed_only=False, do_print=do_print, print_prefix=_print_prefix, # **kwargs) # _best = _df_score.iloc[0] # _best_goal = _best[goal] # _best_predictor = _best['predictor_value'] # # print('{}best result: {}'.format(_print_prefix, format_iloc(_best))) # # if _prev_rmse - _best_goal > cutoff: # # _prev_rmse = _best_goal # # # update predictors # if not is_list_like(_best_predictor): # # _predictors_it = [_ for _ in _predictors_it if _ != _best_predictor] # _predictors_fix += [_best_predictor] # # else: # # _predictors_it_new = [] # # for _predictor in _predictors_it: # # if not isinstance(_predictor, Mapping): # _predictors_it_new.append(_predictor) # else: # if not _best['predictor'] in _predictor.keys(): _predictors_it_new.append(_predictor) # # _predictors_it = deepcopy(_predictors_it_new) # _predictors_fix += _best_predictor # # else: # # print('{}cutoff {} not reached, stopping'.format(_print_prefix, cutoff)) # break # # return None # # # # wrapper # def iter_cla(f=multi_cla, goal='acc_test', **kwargs): # return iter_modelling(f=f, goal=goal, **kwargs) # # # # wrapper # def iter_reg(f=multi_reg, goal='rmse_test_scaled', **kwargs): # return iter_modelling(f=f, goal=goal, **kwargs) # # # # iteratively build a model by trying all predictors one by one and accepting those who improve more than cutoff # # (faster than iter_reg) # def forward_reg(df_train, df_test, predictors_it, predictors_fix, max_depth=10, cutoff=.001, do_print=True, f=reg, # **kwargs): # _predictors_fix = deepcopy(predictors_fix) # _predictors_it = deepcopy(predictors_it) # if not is_list_like(_predictors_fix): _predictors_fix = [_predictors_fix] # _predictors_fix = list(_predictors_fix) # _predictors_final = [] + _predictors_fix # # # a predictor cannot be both fix and it # _predictors_it = [_ for _ in _predictors_it if _ not in _predictors_fix] # # if do_print: tprint('training base model...') # # _dict = f( # df_train=df_train, # df_test=df_test, # predictors=predictors_fix, # do_print=False, # **kwargs # ) # # _base = _dict['score'].iloc[0] # # _prev_rmse = _base['rmse_test_scaled'] # # print('base score: rmse_scaled train / test : {:.4f} / {:.4f} - rmse train / test : {:.4f} / {:.4f}'.format( # _base['rmse_train_scaled'], _base['rmse_test_scaled'], _base['rmse_train'], _base['rmse_test'])) # # for _depth in range(max_depth): # # _print_prefix = 'depth {} / {} - '.format(_depth + 1, max_depth) # # # do a multi_reg # _df_score = multi_reg(df_train=df_train, df_test=df_test, predictors_it=_predictors_it, # predictors_fix=_predictors_fix, fixed_only=False, do_print=do_print, # print_prefix=_print_prefix, f=reg, **kwargs) # # # find base model # _df_score_base = _df_score.query('@_prev_rmse-rmse_test_scaled>@cutoff') # # # if the len is zero no improvements are possible -> quit # if _df_score_base.shape[0] == 0: # if do_print: print('{}no improvements possible, quitting'.format(_print_prefix)) # break # # # get all predictors that made improvements: # _predictors_pos_name = _df_score_base['predictor'].tolist() # _predictors_pos = _df_score_base['predictor_value'].tolist() # # print(_predictors_pos) # _predictors_fix_new = _predictors_fix + flatten(_predictors_pos) # # if do_print: print('{}accepted predictors: {}'.format(_print_prefix, _predictors_pos_name)) # # if do_print: tprint('{}training new base model...'.format(_print_prefix)) # # # get new base # _dict_new = f( # df_train=df_train, # df_test=df_test, # predictors=_predictors_fix_new, # do_print=False, # **kwargs # ) # # _base_new = _dict_new['score'].iloc[0] # # print('{}new score: rmse_scaled train / test : {:.4f} / {:.4f} - rmse train / test : {:.4f} / {:.4f}'.format( # _print_prefix, _base_new['rmse_train_scaled'], _base_new['rmse_test_scaled'], _base_new['rmse_train'], # _base_new['rmse_test'])) # # # new rmse # if _prev_rmse - _base_new['rmse_test_scaled'] > cutoff: # # _prev_rmse = deepcopy(_base_new['rmse_test_scaled']) # _base = _base_new.copy() # _dict = deepcopy(_dict_new) # _predictors_fix = deepcopy(_predictors_fix_new) # _predictors_final += _predictors_pos_name # # # update predictors # for _new_predictor, _new_predictor_name in zip(_predictors_pos, _predictors_pos_name): # # if not is_list_like(_new_predictor): # # _predictors_it = [_ for _ in _predictors_it if _ != _new_predictor] # # else: # # _predictors_it_new = [] # # for _predictor in _predictors_it: # # if not isinstance(_predictor, Mapping): # _predictors_it_new.append(_predictor) # else: # if not _new_predictor_name in _predictor.keys(): _predictors_it_new.append(_predictor) # # _predictors_it = deepcopy(_predictors_it_new) # else: # print('{}overall score improvement below cutoff {}, quitting'.format(_print_prefix, cutoff)) # break # # if len(_predictors_it) == 0: # print('{}accepted all predictors, quitting'.format(_print_prefix)) # break # # print('') # print('final predictors: {}'.format(_predictors_final)) # print('discarded predictors: {}'.format(_predictors_it)) # print('final score: rmse_scaled train / test : {:.4f} / {:.4f} - rmse train / test : {:.4f} / {:.4f}'.format( # _base['rmse_train_scaled'], _base['rmse_test_scaled'], _base['rmse_train'], _base['rmse_test'])) # # return _dict # # # # iteratively build a model by trying all predictors and removing one by one and discarding those whose gain # # was less than cutoff (faster than iter_reg) # def backward_modelling(f, goal, ascending, cutoff, df_train, df_test, target, predictors_it, predictors_fix=None, # max_depth=3, do_print=True, float_format=',.4f', **kwargs): # # -- init -- # if predictors_fix is None: # predictors_fix = [] # _predictors_fix = deepcopy(predictors_fix) # _predictors_it = deepcopy(predictors_it) # if not is_list_like(_predictors_fix): _predictors_fix = [_predictors_fix] # _predictors_fix = list(_predictors_fix) # _predictors_final = [] + _predictors_fix # # # a predictor cannot be both fix and it # _predictors_it = [_ for _ in _predictors_it if _ not in _predictors_fix] # _predictors_all = predictors_fix + _predictors_it # # # -- base -- # if do_print: tprint('training base model...') # # _dict = f( # df_train=df_train, # df_test=df_test, # target=target, # predictors=_predictors_all, # do_print=False, # display_score=False, # **kwargs # ) # # _score = _dict['score'] # _score = _score[_score.index.isin(target)] # _base = _score.mean() # # _best_goal = _base[goal] # # print('base score: {}'.format(format_iloc(_base, float_format=float_format))) # # # -- main loop -- # for _depth in range(max_depth): # # _print_prefix = 'depth {} / {} - '.format(_depth + 1, max_depth) # # tprint('{}init...'.format(_print_prefix)) # # _i = 0 # _i_max = len(_predictors_it) # _predictors_it_new = deepcopy(_predictors_it) # _removed = 0 # # # loop all remaining predictors # for _predictor in _predictors_it: # # _i += 1 # # _predictors = [_ for _ in _predictors_all if _ != _predictor] # # # try: # if True: # # _dict_new = f( # df_train=df_train, # df_test=df_test, # target=target, # predictors=_predictors, # do_print=False, # display_score=False, # **kwargs # ) # # _score_new = _dict_new['score'] # _score_new = _score_new[_score_new.index.isin(target)] # _base_new = _score_new.mean() # # _goal = _base_new[goal] # _full_score = format_iloc(_base_new, float_format=float_format) # _new_score = _base_new[goal] # _new_score_formatted = format_iloc(_base_new[[goal]], float_format=float_format) # # if ascending: # _goal_diff = _best_goal - _new_score # else: # _goal_diff = _new_score - _best_goal # # tprint( # '{}predictor {} / {} : {} ; {} - best: {} - diff: {}'.format(_print_prefix, _i, _i_max, # _predictor, # _new_score_formatted, # format(_best_goal, float_format), # format(_goal_diff, float_format))) # # if _goal_diff <= cutoff: # # _predictors_all = deepcopy(_predictors) # _predictors_it_new.remove(_predictor) # _removed += 1 # _dict = deepcopy(_dict_new) # # # if the diff is below 0 a new best score was reached # if _goal_diff < 0: _best_goal = deepcopy(_new_score) # # # except: # # print('') # # print('predictor {} failed'.format(_predictor)) # # pass # # if _removed == 0: # tprint('') # print('no improvements possible, quitting') # break # # _predictors_it = deepcopy(_predictors_it_new) # # tprint('') # print('{}predictors {} ; {}'.format(_print_prefix, _predictors_all, _full_score)) # # if len(_predictors_all) == 1: # print('only 1 predictor left, quitting') # break # # return _dict # # # def backward_cla(f=cla, goal='acc_test', ascending=True, cutoff=0, float_format=',.3f', **kwargs): # return backward_modelling(f=f, goal=goal, ascending=ascending, cutoff=cutoff, float_format=float_format, # **kwargs) # # # def backward_reg(f=reg, goal='rmse_test_scaled', ascending=False, cutoff=0, **kwargs): # return backward_modelling(f=f, goal=goal, ascending=ascending, cutoff=cutoff, **kwargs)