"""
hhpy.main.py
~~~~~~~~~~~~
Contains basic calculation functions that are used in the more specialized versions of the package but can also be used
on their own
"""
# ---- imports
# --- standard imports
import numpy as np
import pandas as pd
import warnings
import os
import sys
import datetime
import h5py
import re
import functools
# --- third party imports
from typing import Any, Callable, Union, Sequence, Mapping, List, Optional, Iterable, AbstractSet, ValuesView
from types import FunctionType
from docrep import DocstringProcessor
from collections import defaultdict
from copy import deepcopy
from time import sleep
from json import JSONDecodeError
# --- optional imports
try:
# noinspection PyPackageRequirements
from googletrans import Translator
except ImportError:
Translator = None
try:
# noinspection PyPackageRequirements
import emoji
except ImportError:
emoji = None
# ---- init
pd.plotting.register_matplotlib_converters()
pd.options.mode.chained_assignment = None
# ---- variables
# --- globals for functions
global_t = datetime.datetime.now() # for times progress bar
global_tprint_len = 0 # for temporary printing
# --- typing classes
Scalar = Union[int, float, str, bytes, None]
ListOfScalars = Union[List[Scalar], Scalar]
SequenceOrScalar = Union[Sequence, Scalar, AbstractSet]
SequenceOfScalars = Union[Sequence[Scalar], Scalar]
DFOrArray = Union[pd.DataFrame, np.ndarray]
# --- rcParams
rcParams = {
'tprint.r_loc': 'front',
}
# ---- constants
# --- true constants
STRING_NAN = ['nan', 'nat', 'NaN', 'NaT']
GROUPBY_DUMMY = '__groupby__'
# --- validations
validations = {
'reformat_string__case': ['lower', 'upper'],
'dict_inv__duplicates': ['adjust', 'drop'],
'progressbar__mode': ['perc', 'remaining', 'elapsed'],
}
# --- docstr
docstr = DocstringProcessor(
df='Pandas DataFrame containing the data',
x='Main variable, name of a column in the DataFrame or vector data',
warn='Whether to show UserWarnings triggered by this function. Set to False to suppress, other warnings will still '
'be triggered [optional]',
**validations
)
# ---- decorators
def export(fn):
# based on https://stackoverflow.com/questions/41895077/export-decorator-that-manages-all
mod = sys.modules[fn.__module__]
if hasattr(mod, '__all__'):
mod.__all__.append(fn.__name__)
else:
mod.__all__ = [fn.__name__]
return fn
# ---- classes
[docs]@export
class BaseClass:
"""
Base class for various classes deriving from this. Implements __repr__, converting to dict as well as
saving to pickle and restoring from pickle.
Does NOT provide __init__ since it cannot be used by itself
"""
# --- globals
__name__ = 'BaseClass'
__attributes__ = []
__attributes_no_repr__ = []
# --- functions
def __repr__(self):
return get_repr(self)
[docs] def to_dict(self) -> dict:
"""
Converts self to a dictionary
:return: Dictionary
"""
if len(self.__attributes__) == 0:
warnings.warn('self.__attributes__ has length zero, did you declare it?')
_dict = {}
for _attr_name in list_merge('__name__', self.__attributes__):
_attr = self.__getattribute__(_attr_name)
if 'to_dict' in dir(_attr):
_attr = _attr.to_dict()
elif is_list_like(_attr):
if isinstance(_attr, Mapping):
for _key, _value in _attr.items():
if 'to_dict' in dir(_value):
# noinspection PyUnresolvedReferences
_attr[_key] = _value.to_dict()
else:
for _i in range(len(_attr)):
if 'to_dict' in dir(_attr[_i]):
_attr[_i] = _attr[_i].to_dict()
_dict[_attr_name] = _attr
return _dict
[docs] def from_dict(self, dct: Mapping):
"""
Restores self from a dictionary
:param dct: Dictionary created from :meth:`~BaseClass.to_dict`
:return: None
"""
if len(self.__attributes__) == 0:
warnings.warn('self.__attributes__ has length zero, did you declare it?')
for _attr_name in list_merge('__name__', self.__attributes__):
if _attr_name not in dct.keys():
continue
_attr = dct[_attr_name]
if is_list_like(_attr):
if isinstance(_attr, Mapping):
if '__name__' in _attr.keys():
_name = _attr['__name__']
# remnant of a time when this was a file called cf.py
# if _name[:3] == 'cf.':
# _name = _name[3:]
# evaluate, i.e. instantiate
_attr_eval = eval(_name + '()')
if 'from_dict' in dir(_attr_eval):
_attr_eval.from_dict(_attr)
_attr = _attr_eval
else:
for _attr_key, _attr_value in _attr.items():
if isinstance(_attr_value, Mapping):
if '__name__' in _attr_value.keys():
_name = _attr_value['__name__']
if _name[:3] == 'cf.':
_name = _name[3:]
_attr_eval = eval(_name + '()')
if 'from_dict' in dir(_attr_eval):
_attr_eval.from_dict(_attr_value)
# noinspection PyUnresolvedReferences
_attr[_attr_key] = _attr_eval
else:
for _i in range(len(_attr)):
_attr_value = _attr[_i]
if isinstance(_attr_value, Mapping):
if '__name__' in _attr_value.keys():
_name = _attr_value['__name__']
if _name[:3] == 'cf.':
_name = _name[3:]
_attr_eval = eval(_name + '()')
if 'from_dict' in dir(_attr_eval):
_attr_eval.from_dict(_attr_value)
_attr[_i] = _attr_eval
self.__setattr__(_attr_name, _attr)
[docs] def save(self, filename: str, f: Callable = pd.to_pickle):
"""
Save self to file using an arbitrary function that supports saving dictionaries. Note that the object
is implicitly converted to a dictionary before saving.
:param filename: filename (path) to be used
:param f: function to be used [optional]
:return: None
"""
_dict = self.copy().to_dict()
f(_dict, filename)
[docs] def load(self, filename: str, f: Callable = pd.read_pickle):
"""
Load self from file saved with :meth:`~BaseClass.save` using an arbitrary function that supports loading
dictionaries.
:param filename: filename (path) of the file
:param f: function to be used [optional]
:return: None
"""
self.from_dict(f(filename))
[docs] def to_pickle(self, *args, **kwargs):
"""
Wrapper for :meth:`~BaseClass.save` using f =
`pandas.to_pickle <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_pickle.html>`_
:param args: passed to save [optional]
:param kwargs: passed to save [optional]
:return: see save
"""
self.save(*args, f=pd.to_pickle, **kwargs)
[docs] def read_pickle(self, *args, **kwargs):
"""
Wrapper for :meth:`BaseClass.load` using f =
`pandas.read_pickle <https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_pickle.html>`_
:param args: passed to load [optional]
:param kwargs: passed to load [optional]
:return: see load
"""
self.load(*args, f=pd.read_pickle, **kwargs)
[docs] def copy(self):
"""
Uses `copy.deepcopy <https://docs.python.org/3/library/copy.html>`_ to return a copy of the object
:return: Copy of self
"""
return deepcopy(self)
# ---- functions
# --- internal functions
def get_repr(obj: Any, rules: Mapping[type, Callable] = None, map_list: bool = True, map_dict: bool = True) -> str:
"""
basic reuseable repr method for custom classes
:param obj: Any instance of a custom class implementing .__name__ (str) and .__attributes__ (List[str])
:param rules: Rules as dictionary of types and callables. Callable argument will be attribute value
:param map_list: Whether to map the rules to list elements
:return: str
"""
def _get_repr_i(value: Any) -> str:
__repr_i = repr(value)
# case by case selector
if isinstance(value, np.ndarray):
__repr_i = f"Array{value.shape})"
elif isinstance(value, pd.DataFrame):
__repr_i = f"DataFrame{value.shape}"
elif isinstance(value, pd.Series):
__repr_i = f"Series{value.shape}"
elif hasattr(value, '__code__'):
if hasattr(value, '__name__'):
__name = value.__name__
else:
__name = 'Callable'
__repr_i = f"{__name}{value.__code__.co_varnames}"
# eval custom rules
if rules is not None:
for _type, _callable in rules.items():
if isinstance(value, _type) or (value == _type):
try:
__repr_i = _callable(value)
except Exception as _e:
print(f"{_e.__class__.__name__}: {_e} handled for {value}")
return __repr_i
# -- assert
# check if self.__name__ is still the same as Base (i.e. unset)
# - name
if hasattr(obj, '__name__'):
_name = obj.__name__
else:
warnings.warn('Object has no __name__ attribute, did you declare it?')
_name = '{Unnamed}'
if obj.__name__ == 'BaseClass' and obj.__class__ != BaseClass:
warnings.warn('__name__ is equal to BaseClass, did you declare it?')
# - attributes
if hasattr(obj, '__attributes__'):
_attributes = obj.__attributes__
# exclude attributes to be hidden in repr
if hasattr(obj, '__attributes_no_repr__'):
_attributes = list_exclude(_attributes, obj.__attributes_no_repr__)
else:
warnings.warn('Object has no __attributes__ attribute, did you declare it?')
_attributes = []
if len(obj.__attributes__) == 0:
warnings.warn('self.__attributes__ has length zero, did you declare it?')
# - rules
if rules is not None and not isinstance(rules, Mapping):
raise ValueError('rules should be a dictionary of types and callables')
# -- init
_repr = f"{_name}("
# iterator for separator handling
_it = -1
# -- main
for _attribute in _attributes:
# don't print __name__
if _attribute == '__name__':
continue
# check if _attr exists
if hasattr(obj, _attribute):
# get value
_value = obj.__getattribute__(_attribute)
if _value is None:
continue
if map_list and isinstance(_value, list):
_value = [_get_repr_i(_) for _ in _value]
if map_dict and isinstance(_value, dict):
for __key, __value in _value.items():
_value[__key] = _get_repr_i(__value)
# get repr_i from value
_repr_i = _get_repr_i(value=_value)
# only iterate if you print
_it += 1
# add separator
if _it > 0:
_repr += ', '
# add to repr string
_repr += f"{_attribute}={_repr_i}"
else:
warnings.warn(f"{_attribute} is specified in self.__attributes__ but does not exist. Skipping...")
continue
# close brace
_repr += ')'
# -- return
return _repr
# --- exported functions
[docs]@export
def today(date_format: str = '%Y_%m_%d') -> str:
"""
Returns today's date as string
:param date_format: The formating string for the date. Passed to strftime
:return: Formated String
**Examples**
>>> today()
'2020_01_14'
"""
return datetime.datetime.today().strftime(date_format)
[docs]@export
def size(byte: int, unit: str = 'MB', dec: int = 2) -> str:
"""
Formats bytes as human readable string
:param byte: The byte amount to be formated
:param unit: The unit to display the output in, supports 'KB', 'MB', 'GB' and 'TB'
:param dec: The number of decimals to use
:return: Formated bytes as string
**Examples**
>>> size(1024, unit='KB')
'1.0 KB'
>>> size(1024*1024*10, unit='MB')
'10.0 MB'
>>> size(10**10, unit='GB')
'9.31 GB'
"""
_power = {'KB': 1, 'MB': 2, 'GB': 3, 'TB': 4}[unit.upper()]
return '{} {}'.format(np.round(byte / (1024 ** _power), dec), unit)
[docs]@export
def mem_usage(pandas_obj, *args, **kwargs) -> str:
"""
Get memory usage of a pandas object
:param pandas_obj: Pandas object to get the memory usage of
:param args: passed to size()
:param kwargs: passed to size()
:return: memory usage of a pandas object formated as string
**Examples**
>>> import seaborn as sns
>>> diamonds = sns.load_dataset('diamonds')
>>> mem_usage(diamonds)
'12.62 MB'
"""
if isinstance(pandas_obj, pd.DataFrame):
_usage_b = pandas_obj.memory_usage(deep=True).sum()
else: # we assume if not a df it's a series
_usage_b = pandas_obj.memory_usage(deep=True)
return size(_usage_b, *args, **kwargs)
[docs]@export
def tprint(*args, sep: str = ' ', r_loc: str = rcParams['tprint.r_loc'], **kwargs):
"""
Wrapper for print() but with a carriage return at the end.
This results in the text being overwritten by the next print call.
Can be used for progress bars and the like.
:param args: arguments to print
:param sep: separator
:param r_loc: where to put the carriage return, one of ['front', 'end']. Some interpreters (e.g. PyCharm)
don't like end since they automatically clear the print area after each carriage return. When using front
a regular print after a tprint will start at the end of the tprint. When using 'end' a regular
print will overwrite the tprint output but will not clear the console so if it is . In either case a blank
tprint() will clear the console and restore default print behaviour.
:param kwargs: passed to print
:return: None
**Examples**
>>> tprint('Hello World')
'Hello World'
>>> tprint(1)
>>> tprint(2)
2
"""
global global_tprint_len
_allowed_r_locs = ['front', 'end']
if r_loc not in _allowed_r_locs:
warnings.warn(f'r_loc not in {_allowed_r_locs}, defaulting to {rcParams["tprint.r_loc"]}')
r_loc = rcParams['tprint.r_loc']
_string = ''
_arg_len = 0
for _arg in args:
if len(_string) > 0:
_string += sep
_arg_len += 1
_string += str(_arg)
_arg_len += len(str(_arg))
# get whitespace len
_whitespace_len = global_tprint_len - _arg_len
if _whitespace_len > 0:
_string += ' ' * _whitespace_len
# print
if r_loc == 'front':
print('\r' + _string, end='', **kwargs)
# reset tprint
if len(args) == 0 or (len(args) == 1 and args[0] == ''):
print('', end='\r', **kwargs)
else: # r_loc == 'end'
print(_string, end='\r', **kwargs)
# store len for next tprint use
global_tprint_len = _arg_len
[docs]@export
def fprint(*args, file: str = '_fprint.txt', sep: str = ' ', mode: str = 'replace', append_sep: str = '\n',
timestamp: bool = True, do_print: bool = False, do_tprint: bool = False):
"""
Write the output of print to a file instead. Supports also writing to console.
:param args: the arguments to print
:param file: the name of the file to print to
:param sep: separator
:param mode: weather to append or replace the contents of the file
:param append_sep: if mode=='append', use this separator
:param timestamp: weather to include a timestamp in the print statement
:param do_print: weather to also print to console
:param do_tprint: weather to also print to console using tprint
:return: None
**Examples**
The below output gets written to a file called 'fprint.txt'
>>> fprint('Hello World', file='fprint.txt')
The below output gets written both to a file and to console
>>> fprint('Hello World', file='fprint.txt', do_print=True)
'Hello World'
"""
if file[-4:] != '.txt':
file += '.txt'
# if append -> get old content ; else start with empty string
if (mode == 'append') and (os.path.exists(file)):
with open(file, 'r') as _txt:
_text = _txt.read()
_string = _text + append_sep
else:
_string = ''
if timestamp:
_string += '[{:%Y-%m-%d %H:%M:%S}]: '.format(datetime.datetime.now())
# args to string
_print = ''
_i = -1
for _arg in args:
_i += 1
if _i > 0:
_print += sep
_print += str(_arg)
_string += _print
# can also print to console
if do_tprint:
tprint(_print)
if do_print:
print(_print)
# write to file
with open(file, 'w') as _txt:
_txt.write(_string)
[docs]@export
def elapsed_time_init() -> None:
"""
Resets reference time for elapsed_time()
:return: None
**Examples**
see :func:`elapsed_time`
"""
global global_t
global_t = datetime.datetime.now()
[docs]@export
def elapsed_time(do_return: bool = True, ref_t: datetime.datetime = None) -> datetime.timedelta:
"""
Get the elapsed time since reference time ref_time.
:param do_return: Whether to return or print
:param ref_t: Reference time. If None is provided the time elapsed_time_init() was last called is used.
:return: In case of do_return: Datetime object containing the elapsed time. Else calls tprint and returns None.
**Examples**
>>> from time import sleep
>>> elapsed_time_init()
>>> sleep(1)
>>> elapsed_time(do_return=False)
'0:00:01.0'
>>> from time import sleep
>>> elapsed_time_init()
>>> sleep(1)
>>> elapsed_time(do_return=True)
datetime.timedelta(0, 1, 1345)
"""
global global_t
if ref_t is None:
ref_t = global_t
_delta_t = datetime.datetime.now() - ref_t
if do_return:
return _delta_t
else:
tprint(str(_delta_t)[:-5])
[docs]@export
def total_time(i: int, i_max: int) -> datetime.timedelta:
"""
Estimates total time of running operation by linear extrapolation using iteration counters.
:param i: current iteration
:param i_max: max iteration
:return: datetime object representing estimated total time of operation
"""
_perc_f = i / i_max * 100
_elapsed_time = elapsed_time(do_return=True)
_total_time = _elapsed_time * 100 / _perc_f
return _total_time
[docs]@export
def remaining_time(i: int, i_max: int) -> datetime.timedelta:
"""
Estimates remaining time of running operation by linear extrapolation using iteration counters.
:param i: current iteration
:param i_max: max iteration
:return: datetime object representing estimated remaining time of operation
"""
_elapsed_time = elapsed_time(do_return=True)
_total_time = total_time(i, i_max)
_remaining_time = _total_time - _elapsed_time
return _remaining_time
[docs]@docstr
@export
def progressbar(i: int = 1, i_max: int = 1, symbol: str = '=', empty_symbol: str = '_', mid: str = None,
mode: str = 'perc', print_prefix: str = '', p_step: int = 1, printf: Callable = tprint,
persist: bool = False, **kwargs):
"""
Prints a progressbar for the currently running process based on iteration counters.
:param i: current iteration
:param i_max: max iteration
:param symbol: symbol that represents reached progress blocks
:param empty_symbol: symbol that represents not yet reached progress blocks
:param mid: what to write in the middle of the progressbar, if mid is passed mode is ignored
:param mode: One of %(progressbar__mode)s
If perc is passed writes percentage. If 'remaining' or 'elapsed' writes remaining or elapsed time respectively.
[optional]
:param print_prefix: what to write in front of the progressbar. Useful when calling progressbar multiple times
from different functions.
:param p_step: progressbar prints one symbol (progress block) per p_step
:param printf: Using tprint by default. Use fprint to write to file instead.
:param persist: Whether to persist the progressbar after reaching 100 percent.
:param kwargs: Passed to print function
:return:
"""
# -- assert
if mode not in validations['progressbar__mode']:
raise ValueError(f"mode must be one of {validations['progressbar__mode']}")
# -- init
_perc_f = i / i_max * 100
_perc = int(np.floor(_perc_f))
_rem = 100 - _perc
if len(print_prefix) > 0 and (print_prefix[-2:] != ': ') and (print_prefix[-1:] not in [':', '\n']):
print_prefix += ": "
# -- main
if _perc <= 50:
_right = empty_symbol * (50 // p_step)
_left = symbol * int(np.ceil(_perc / p_step)) + empty_symbol * ((50 - _perc) // p_step)
else:
_left = symbol * (50 // p_step)
_right = symbol * int(np.ceil(((50 - (100 - _perc)) / p_step))) + empty_symbol * ((100 - _perc) // p_step)
if mid is not None:
_mid = mid
elif mode in ['remaining', 'elapsed']:
# if i == 0: init elapse time
if i == 0:
elapsed_time_init()
# get elapsed time
_elapsed_time = elapsed_time(do_return=True)
# special case for i==0 since we cannot calculate remaining time
if i == 0:
_mid = '{}'.format(str(_elapsed_time)[:-5])
else:
_total_time = _elapsed_time * 100 / _perc_f
_remaining_time = _total_time - _elapsed_time
if i < i_max:
if mode == 'remaining':
_mid = '-{}'.format(str(_remaining_time)[:-5])
else:
_mid = '{} / {}'.format(str(_elapsed_time)[:-5], str(_total_time)[:-5])
else:
_mid = '{}'.format(str(_elapsed_time)[:-5])
elif mode == 'perc':
_mid = '{:6.2f}%'.format(_perc_f)
else:
_mid = ''
_bar = f"{print_prefix}|{_left}{_mid}{_right}|"
printf(_bar, **kwargs)
if persist and i == i_max:
print('')
[docs]@export
def time_to_str(t: datetime.datetime, time_format: str = '%Y-%m-%d') -> str:
"""
Wrapper for strftime
:param t: datetime object
:param time_format: time format, passed to strftime
:return: formated datetime as string
"""
return pd.to_datetime(t).strftime(time_format)
[docs]@export
def cf_vec(x: Any, func: Callable, to_list: bool = True, *args, **kwargs) -> Any:
"""
Pandas compatible vectorize function. In case a DataFrame is passed the function is applied to all columns.
:param x: Any vector like object
:param func: Any function that should be vectorized
:param to_list: Whether to cast the output to a list
:param args: passed to func
:param kwargs: passed to func
:return: Vector like object
"""
# - case: pandas DataFrame
if isinstance(x, pd.DataFrame):
_df = x.copy()
for _col in _df.columns:
_df[_col] = func(_df[_col], *args, **kwargs)
return _df
# - case: numpy array
_x = np.array(x)
if _x.shape == ():
_out = func(x, *args, **kwargs)
elif (len(_x.shape) == 1) and to_list:
_out = [func(_x_i, *args, **kwargs) for _x_i in _x]
else:
with np.nditer(_x, op_flags=['readwrite']) as _it:
for _x_i in _it:
_x_i[...] = func(_x_i, *args, **kwargs)
_out = _x
if to_list:
_out = assert_list(_out)
return _out
[docs]@export
def round_signif_i(x: np.number, digits: int = 1) -> float:
"""
Round to significant number of digits for a Scalar number
:param x: any number
:param digits: integer amount of significant digits
:return: float rounded to significant digits
"""
if not np.isfinite(x):
return x
elif x == 0:
return 0
else:
_scale = -int(np.floor(np.log10(abs(x)))) + digits - 1
return round(float(x), _scale)
[docs]@export
def round_signif(x: Any, *args, **kwargs) -> Any:
"""
Round to significant number of digits
:param x: any vector like object of numbers
:param args: passed to cf_vec
:param kwargs: passed to cf_vec
:return: Vector like object of floats rounded to significant digits
"""
return cf_vec(x, round_signif_i, *args, **kwargs)
[docs]@export
def floor_signif(x: Any, digits: int = 1) -> Any:
"""
Floor to significant number of digits
:param x: any vector like object of numbers
:param digits: integer amount of significant digits
:return: float floored to significant digits
"""
if x == 0:
return 0
else:
round_signif_x = round_signif(x, digits=digits)
if round_signif_x <= x:
return round_signif_x
else:
_scale = -int(np.floor(np.log10(abs(x)))) + digits - 1
return round_signif_x - 1 / np.power(10., _scale)
[docs]@export
def ceil_signif(x: Any, digits: int = 1) -> Any:
"""
Ceil to significant number of digits
:param x: any vector like object of numbers
:param digits: integer amount of significant digits
:return: float ceiled to significant digits
"""
if x == 0:
return 0
else:
round_signif_x = round_signif(x, digits=digits)
if round_signif_x >= x:
return round_signif_x
else:
_scale = -int(np.floor(np.log10(abs(x)))) + digits - 1
return round_signif_x + 1 / np.power(10., _scale)
[docs]@export
def concat_cols(df: pd.DataFrame, columns: list, sep: str = '_', to_int: bool = False) -> pd.Series:
"""
Concat a number of columns of a pandas DataFrame
:param df: Pandas DataFrame
:param columns: Names of the columns to be concat
:param sep: Separator
:param to_int: If true: Converts columns to int before concatting
:return: Pandas Series containing the concat columns
"""
_df = df.copy()
del df
_df['_out'] = ''
for _it, _column in enumerate(assert_list(columns)):
if _it > 0:
_df['_out'] = _df['_out'] + sep
_col = _df[_column]
if to_int:
_col = _col.round(0).astype(int)
_df['_out'] = _df['_out'] + _col.astype(str)
return _df['_out']
[docs]@export
def list_unique(lst: Any) -> list:
"""
Returns unique elements from a list (dropping duplicates)
:param lst: any list like object
:return: list containing each element only once
"""
return list(dict.fromkeys(assert_list(lst)))
[docs]@export
def list_duplicate(lst: Any) -> list:
"""
Returns only duplicate elements from a list
:param lst: any list like object
:return: list of duplicates values
"""
_ind = pd.Index(lst)
return list_unique(_ind[_ind.duplicated()].tolist())
[docs]@export
def list_flatten(lst: Any) -> list:
"""
Flatten a list of lists
:param lst: list of lists
:return: flattened list
"""
return list(np.array(assert_list(lst)).flat)
[docs]@export
def list_merge(*args: Any, unique: bool = True, flatten: bool = False) -> list:
"""
Merges n lists together
:param args: The lists to be merged together
:param unique: if True then duplicate elements will be dropped
:param flatten: if True then the individual lists will be flatten before merging
:return: The merged list
"""
_list = []
for _arg in args:
if _arg is None:
continue
if flatten:
_arg = list_flatten(_arg)
else:
_arg = assert_list(_arg)
_list += _arg
if unique:
_list = list_unique(_list)
return _list
[docs]@export
def list_intersection(lst: SequenceOrScalar, *args: SequenceOrScalar) -> list:
"""
Returns common elements of n lists
:param lst: the first list
:param args: the subsequent lists
:return: the list of common elements
"""
# more performant than list comprehension
_list_out = assert_list(lst)
for _arg in args:
_list = assert_list(_arg)
_list_out = list(set(_list_out).intersection(_list))
return _list_out
[docs]@export
def list_exclude(lst: SequenceOrScalar, *args: SequenceOrScalar) -> list:
"""
Returns a list that includes only those elements from the first list that are not in any subsequent list.
Can also be called with non list args, then those elements are removed.
:param lst: the list to exclude from
:param args: the subsequent lists
:return: the filtered list
"""
# more performant than list comprehension
_list_out = assert_list(lst)
for _arg in args:
try:
if _arg in _list_out:
_list_out.remove(_arg)
except Exception as _e: # sometimes causes errors when comparing multi objects
_ = _e
pass
for _el in assert_list(_arg):
if _el in _list_out:
_list_out.remove(_el)
return _list_out
[docs]@export
def rand(shape: tuple = None, lower: int = None, upper: int = None, step: int = None, seed: int = None) -> np.array:
"""
A seedable wrapper for numpy.random.random_sample that allows for boundaries and steps
:param shape: A tuple containing the shape of the desired output array
:param lower: Lower bound of random numbers
:param upper: Upper bound of random numbers
:param step: Minimum step between random numbers
:param seed: Random Seed
:return: Numpy Array
"""
# seed
if seed is not None:
np.random.seed(seed)
# create base random numbers (between 0 and 1)
_rand = np.random.random_sample(shape)
# default values
if lower is None:
lower = 0
if upper is None:
upper = lower + 1
_samples = _rand * (upper - lower) + lower
# apply step
if step is not None:
_samples = np.round(_samples / step) * step
# if step is integer: return integers
if isinstance(step, int):
_samples = _samples.astype(int)
return _samples
[docs]@export
def dict_list(*args, dict_type: str = 'defaultdict') -> dict:
"""
Creates a dictionary of empty named lists. Useful for iteratively creating a pandas DataFrame
:param args: The names of the lists
:param dict_type: Whether to use a 'regular' or 'defaultdict' (default to empty list) type dictionary
:return: Dictionary of empty named lists
"""
if dict_type == 'regular':
_dict = {}
else:
_dict = defaultdict(list)
for _arg in args:
for _list in assert_list(_arg):
_dict[_list] = []
return _dict
[docs]@export
def append_to_dict_list(dct: Union[dict, defaultdict], append: Union[dict, list],
inplace: bool = True) -> Optional[dict]:
"""
Appends to a dictionary of named lists. Useful for iteratively creating a pandas DataFrame.
:param dct: Dictionary to append to
:param append: List or dictionary of values to append
:param inplace: Modify inplace or return modified copy
:return: None if inplace, else modified dictionary
"""
if not inplace:
dct = dct.copy()
# allows lists and dicts
if not isinstance(append, Mapping):
if is_list_like(append):
_append = list(append)
else:
_append = [append]
if len(_append) > len(dct):
warnings.warn('list is longer than dict, trailing entries will be lost')
_append = dict(zip(dct.keys(), _append))
else:
_append = append
for _key in _append.keys():
dct[_key].append(_append[_key])
if not inplace:
return dct
[docs]@export
def is_scalar(obj: Any) -> bool:
"""
Checks if a given python object is scalar, i.e. one of int, float, str, bytes
:param obj: Any python object
:return: True if scaler, else False
"""
return isinstance(obj, Scalar.__args__)
[docs]@export
def is_list_like(obj: Any) -> bool:
"""
Checks if a given python object is list like. The conditions must be satisfied:
* not a string or bytes object
* one of (Sequence, 1d-array like Iterable)
:param obj: Any python object
:return: True if list like, else False
"""
# str, bytes
if isinstance(obj, (str, bytes)):
return False
# Sequence and similar (AbstractSet includes KeysView and ItemsView but not ValuesView)
if isinstance(obj, (Sequence, AbstractSet, ValuesView, pd.Index)):
return True
# Iterable
if isinstance(obj, Iterable):
# check if the first element of the cast list is different from the object itself (object is castable to list)
try: # try is needed because pandas objects return a sequence for != operator
if list(obj)[0] != obj:
return True
except (ValueError, IndexError):
pass
# check if the object is array like
_shape = np.array(obj).shape
# 1d arrays are list like
if len(_shape) == 1:
return True
elif len(_shape) == 2:
# 2d arrays are list like if the 2nd dimension contains only one entry (e.g. single column DataFrame)
if _shape[1] == 1:
return True
# Other
return False
[docs]@export
def assert_list(*args: Any, default: SequenceOrScalar = None) -> list:
"""
Takes any python object(s) and turns them into an iterable list.
:param args: Any python object
:param default: What to return if args are Empty or None
:return: List
"""
# -- init
# - handle default
if default is None:
default = []
elif is_list_like(default):
default = list(default)
else:
default = [default]
# -- main
args = list(args)
# Empty case
if len(args) == 0:
return default
# None case
if len(args) == 1:
if args[0] is None:
return default
# Regular case
for _it, _arg in enumerate(args):
if is_list_like(_arg):
# require direct casts
if isinstance(_arg, (Sequence, {}.keys().__class__, {}.values().__class__, pd.Index)):
_arg = list(_arg)
elif isinstance(_arg, Iterable):
# not all iterables implement list() in the same way -> cast to np.array and flatten
_arg = list(np.array(_arg).flatten())
else: # other cases: direct cast
_arg = list(_arg)
else:
_arg = [_arg]
args[_it] = _arg
# depending on whether just one argument was passed or list of arguments we need to return differently
if len(args) == 1:
args = args[0]
else:
args = tuple(args)
return args
[docs]@export
def assert_tuple(*args: Any, **kwargs) -> tuple:
"""
Takes any python object(s) and turns them into an iterable tuple.
:param args: Any python object
:param kwargs: Keyword arguments passed to :~func: assert_list
:return: List
"""
return tuple(assert_list(*args, **kwargs))
[docs]def force_list(*args, **kwargs):
warnings.warn('force_list is deprecated, please use assert_list instead', DeprecationWarning)
return assert_list(*args, **kwargs)
[docs]@export
def assert_scalar(obj: Any, warn: bool = True, default: Scalar = None) -> Scalar:
"""
Takes any python object and turns it into a scalar object.
:param obj: Any python object
:param warn: Whether to trigger a warning when objects are being truncated
:param default: What to return if obj is None
:return: List
"""
if obj is None:
obj = default
_lst = assert_list(obj)
_len = len(_lst)
if _len == 0:
warnings.warn("empty list cannot be cast to scalar, returning None")
return None
if warn and _len > 1:
warnings.warn(f"assert_scalar: object {obj} has length {_len}, retaining only first entry")
return _lst[0]
[docs]def force_scalar(*args, **kwargs):
warnings.warn('force_scalar is deprecated, please use assert_list instead', DeprecationWarning)
return assert_scalar(*args, **kwargs)
# noinspection PyShadowingBuiltins
[docs]@export
def to_hdf(df: pd.DataFrame, file: str, groupby: Union[str, List[str]] = None, write_groupby: bool = True,
key: str = None, replace: bool = False, format: str = 'table', do_print=True, **kwargs) -> None:
"""
saves a pandas DataFrame as h5 file, if groupby is supplied will save each group with a different key.
Needs with groupby OR key to be supplied. Extends on pandas.DataFrame.to_hdf.
:param df: DataFrame to save
:param file: filename to save the DataFrame as
:param groupby: if supplied will save each sub-DataFrame as a different key [optional]
:param write_groupby: Whether groupby columns should be written to hdf [optional]
:param key: The key to write as. Ignored if groupby is supplied [optional]
:param replace: Whether to replace or append to existing files. Defaults to append [optional]
:param format: Table format to use, passed to pandas.DataFrame.to_hdf. Defaults to 'table' while pandas defaults
to 'fixed' [optional]
:param do_print: Whether to print intermediate steps to console [optional]
:param kwargs: Other keyword arguments passed to pandas.DataFrame.to_hdf [optional]
:return: None
"""
assert (groupby is not None) or (key is not None), "You must supply either groupby or key"
# -- init
# - no inplace
df = pd.DataFrame(df).copy()
# - defaults
# groupby
if groupby is None:
groupby = GROUPBY_DUMMY
df[groupby] = 1
# -- main
# remove old file
if replace and os.path.exists(file):
os.remove(file)
if do_print:
tprint()
print('removed old {}'.format(file))
_i_max = df[groupby].drop_duplicates().shape[0]
for _it, (_index, _df_i) in enumerate(df.groupby(groupby)):
if key is None:
_key = qformat(_index, int_format='', float_format='.2f')
else:
_key = str(key)
if do_print:
progressbar(_it, _i_max, print_prefix=f"writing key {_key:<30}: ", p_step=2)
if write_groupby:
if GROUPBY_DUMMY in _df_i.columns:
_df_i = _df_i.drop(GROUPBY_DUMMY, axis=1)
else:
_df_i = _df_i.drop(groupby, axis=1)
pd.DataFrame.to_hdf(_df_i, file, key=_key, format=format, **kwargs)
if do_print:
tprint()
tprint('{}saved to {}'.format('\n', file))
[docs]@export
def get_hdf_keys(file: str) -> List[str]:
"""
Reads all keys from an hdf file and returns as list
:param file: The path of the file to read the keys of
:return: List of keys
"""
with h5py.File(file, 'r') as _file:
_keys = list(_file.keys())
return _keys
[docs]@export
def read_hdf(file: str, key: Union[str, List[str]] = None, sample: int = None, random_state: int = None,
key_to_col: Union[bool, str] = False, do_print: bool = True, catch_error: bool = True,
**kwargs) -> pd.DataFrame:
"""
read a DataFrame from hdf file based on pandas.read_hdf but with default option to read all keys (since we're
expecting a DataFrame)
:param file: The path to the file to read from
:param key: The key(s) to read, if not specified all keys are read [optional]
:param sample: If specified will read sample keys at random from the file, ignored if key is specified [optional]
:param random_state: Random state for sample [optional]
:param key_to_col: Whether to save the key value to a column, if a string then used as column name [optional]
:param do_print: Whether to print intermediate steps [optional]
:param catch_error: Whether to catch errors when reading [optional]
:param kwargs: Other keyword arguments passed to pandas.read_hdf [optional]
:return: pandas DataFrame
"""
if not os.path.exists(file):
raise ValueError('{} does not exist'.format(file))
# if key was not specified: read all keys
if key is None:
_keys = get_hdf_keys(file)
_read_keys = 'all'
if sample is not None:
np.random.seed(random_state)
_keys = np.random.sample(_keys, sample)
_read_keys = ','.join(_keys)
else:
if not isinstance(key, list):
_keys = [key]
else:
_keys = key
_read_keys = ','.join(_keys)
_df = []
for _it, _key in enumerate(_keys):
if do_print:
tprint('reading {} - key {} / {} : {}...'.format(file, _it+1, len(_keys), _key))
if catch_error:
try:
_df_i = pd.read_hdf(file, key=_key, **kwargs)
except KeyboardInterrupt:
raise KeyboardInterrupt
except Exception as _e:
tprint('')
print(f"{_e.__class__.__name__}: '{_e}' while reading key {_key}")
continue
else:
_df_i = pd.read_hdf(file, key=_key, **kwargs)
if key_to_col:
if isinstance(key_to_col, str):
_key_name = key_to_col
else:
_key_name = '_key'
# noinspection PyUnresolvedReferences
_df_i[_key_name] = _key
# append to list
_df.append(_df_i)
if do_print:
tprint('concat...')
try:
_df = pd.concat(_df, ignore_index=True, sort=False)
except Exception as _e:
tprint('')
print(f"{_e.__class__.__name__}: {_e} during pandas.concat")
return pd.DataFrame()
if do_print:
tprint('read {} ; keys: {}'.format(file, _read_keys))
return _df
[docs]@export
def rounddown(x: Any, digits: int) -> Any:
"""
convenience wrapper for np.floor with digits option
:param x: any python object that supports np.floor
:param digits: amount of digits
:return: rounded x
"""
return np.floor(x * 10**digits) / 10**digits
[docs]@export
def roundup(x: Any, digits: int) -> Any:
"""
convenience wrapper for np.ceil with digits option
:param x: any python object that supports np.ceil
:param digits: amount of digits
:return: rounded x
"""
return np.ceil(x * 10**digits) / 10**digits
[docs]@export
def dict_inv(dct: Mapping, key_as_str: bool = False, duplicates: str = 'keep') -> dict:
"""
Returns an inverted copy of a given dictionary (if it is invertible)
:param dct: Dictionary to be inverted
:param key_as_str: Whether all keys of the inverted dictionary should be forced to string
:param duplicates: Whether to 'adjust' or 'drop' duplicates. In case of 'adjust' duplicates are suffixed with '_'
:return: Inverted dictionary
"""
# -- assert
if duplicates not in validations['dict_inv__duplicates']:
raise ValueError(f"duplicates must be one of {validations['dict_inv__duplicates']}")
# -- init
_dct_inv = {}
# -- main
for _key, _value in dct.items():
# assert scalar
if not is_scalar(_value):
raise ValueError(f'A non-scalar dictionary value is not invertible, found at key {_key}')
# assert non-duplicate value
if duplicates == 'adjust':
_warn = True
while _value in _dct_inv.keys():
if _warn:
_warn = False
warnings.warn(f'duplicate value found at "{_key}: {_value}", appending _')
_value = str(_value) + '_'
elif (duplicates == 'drop') and (_value in _dct_inv.keys()):
continue
# if applicable: convert value to string
if key_as_str:
_value = str(_value)
# assign
_dct_inv[_value] = _key
return _dct_inv
[docs]@export
def copy_function(f: FunctionType) -> FunctionType:
"""
return a copy of a function, based on this StackOverflow answer
https://stackoverflow.com/questions/13503079/how-to-create-a-copy-of-a-python-function
:param f: a function
:return: copy of function
"""
_f = FunctionType(f.__code__, f.__globals__, name=f.__name__, argdefs=f.__defaults__, closure=f.__closure__)
_f = functools.update_wrapper(_f, f)
_f.__kwdefaults__ = f.__kwdefaults__
return _f
[docs]@export
def get_else_key(dct: Mapping, key: Any, exclude: SequenceOrScalar = None) -> Any:
"""
Returns a value from a dictionary if the key is present, if not returns the key
:param dct: dictionary or similar Mapping
:param key: Key of value to attempt get
:param exclude: Keys to not get the value from (always return as is)
:return: Value if key in dictionary keys, else key
"""
if key in list_exclude(dct.keys(), exclude):
return dct[key]
else:
return key