Source code for melusine.utils.transformer_scheduler

"""
Useful class to define its own transformer using specific functions
in a specific order to apply along a row of DataFrame (axis=1).

It is compatible with scikit-learn API (i.e. contains fit, transform methods).
"""

import pandas as pd
import copy
from sklearn.base import BaseEstimator, TransformerMixin
from melusine.utils.multiprocessing import apply_by_multiprocessing


def __check_function_type(func):
    """Check if it is a function-like object."""
    if not callable(func):
        raise TypeError(
            "First item of the tuple (func, args, cols) must be a function-like \
object not a {} object".format(
                type(func)
            )
        )
    else:
        return func


def __check_args_type(args):
    """Check if it is a tuple-like object."""
    if args is None or args == ():
        return None
    elif isinstance(args, int) or isinstance(args, str) or isinstance(args, list):
        # manage the case of 1 element in tuple (example: args=(4))
        return (args,)
    elif not isinstance(args, tuple):
        raise TypeError(
            "Second item of the tuple (func, args, cols) must be tuple-like \
object not a {} object".format(
                type(args)
            )
        )
    else:
        return args


def __check_colnames_type(cols):
    """Check if it is a list-like object."""
    if cols is None or cols == []:
        return None
    elif not isinstance(cols, list):
        raise TypeError(
            "Third item of the tuple (func, args, cols) must be list-like \
object not a {} object".format(
                type(cols)
            )
        )
    else:
        return cols


def _check_tuple(func, args=None, cols=None):
    """Complete checking of each element for the 'function_scheduler'
    parameter."""
    # check types of each parameters
    func = __check_function_type(func)
    args = __check_args_type(args)
    cols = __check_colnames_type(cols)

    return (func, args, cols)


[docs]class TransformerScheduler(BaseEstimator, TransformerMixin): """ This class aims to provide a good way to define its own transformer. It takes a list of function defined in a specific order to apply along a row of DataFrame (axis=1). Transformer returned is compatible with scikit-learn API (i.e. contains fit, transform methods). Parameters ---------- functions_scheduler : list of tuples, (function, tuple, list) List of function to be applied in a specific order. Each element of the list has to be defined as follow: (`function`, `argument(s) used by the function (optional)`, `colname(s) returned (optional)`) mode : str {'apply', 'apply_by_multiprocessing'}, optional Define mode to apply function along a row axis (axis=1). Default value, 'apply'. If set to 'apply_by_multiprocessing', it uses multiprocessing tool to parallelize computation. n_jobs : int, optional Number of cores used for computation. Default value, 1. progress_bar : boolean, optional Whether to print a progress bar from tqdm package. Default value, True. Works only when mode is set to 'apply_by_multiprocessing'. copy : boolean, optional Make a copy of DataFrame. Default value, True. verbose : int, optional Verosity mode, print loggers. Default value, 0. Attributes ---------- function_scheduler, mode, n_jobs, progress_bar Examples -------- >>> from melusine.utils.transformer_scheduler import TransformerScheduler >>> MelusineTransformer = TransformerScheduler( >>> functions_scheduler=[ >>> (my_function_1, (argument1, argument2), ['return_col_A']), >>> (my_function_2, None, ['return_col_B', 'return_col_C']) >>> (my_function_3, (), ['return_col_D']) >>> ]) """ def __init__( self, functions_scheduler, mode="apply", n_jobs=1, progress_bar=True, copy=True, verbose=0, ): self.functions_scheduler = functions_scheduler self.mode = mode self.n_jobs = n_jobs self.progress_bar = True self.copy = copy self.verbose = verbose # check input parameters type for tuple_ in functions_scheduler: func, args, cols = _check_tuple(*tuple_)
[docs] def fit(self, X, y=None): """Unused method. Defined only for compatibility with scikit-learn API.""" return self
[docs] def transform(self, X): """Apply functions defined in the `function_scheduler` parameter. Parameters ---------- X : pandas.DataFrame, Data on which transformations are applied. Returns ------- pandas.DataFrame """ # Case input is a dict if isinstance(X, dict): if self.copy: X_ = copy.deepcopy(X) else: X_ = X apply_func = self.apply_dict # Case input is a DataFrame else: if self.copy: X_ = X.copy() else: X_ = X # Multiprocessing (or progress bar or both) if self.mode == "apply_by_multiprocessing": apply_func = self.apply_pandas_multiprocessing # Single process (no progress bar) else: apply_func = self.apply_pandas for tuple_ in self.functions_scheduler: func_, args_, cols_ = _check_tuple(*tuple_) X_ = apply_func( X_, func_, args_, cols_, n_jobs=self.n_jobs, progress_bar=self.progress_bar, ) return X_
[docs] @staticmethod def apply_pandas(X_, func_, args_=None, cols_=None, **kwargs): """Apply a function on a pandas DataFrame. Parameters ---------- X_ : pandas.DataFrame, Data on which transformations are applied. args_ : list or tuple List of arguments of the function to apply cols_ : list or tuple List of columns created by the transformation func_ : func Function to apply Returns ------- pandas.DataFrame """ if cols_ is None: X_ = X_.apply(func_, args=args_, axis=1) elif len(cols_) == 1: X_[cols_[0]] = X_.apply(func_, args=args_, axis=1) else: X_[cols_] = X_.apply(func_, args=args_, axis=1).apply(pd.Series) return X_
[docs] @staticmethod def apply_pandas_multiprocessing( X_, func_, args_=None, cols_=None, n_jobs=1, progress_bar=False, **kwargs ): if cols_ is None: X_ = apply_by_multiprocessing( df=X_, func=func_, args=args_, axis=1, workers=n_jobs, progress_bar=progress_bar, ) elif len(cols_) == 1: X_[cols_[0]] = apply_by_multiprocessing( df=X_, func=func_, args=args_, axis=1, workers=n_jobs, progress_bar=progress_bar, ) else: X_[cols_] = apply_by_multiprocessing( df=X_, func=func_, args=args_, axis=1, workers=n_jobs, progress_bar=progress_bar, ).apply(pd.Series) return X_
[docs] @staticmethod def apply_dict(X_, func_, args_=None, cols_=None, **kwargs): """Apply a function on a dictionary. Parameters ---------- X_ : dict, Data on which transformations are applied. args_ : list or tuple List of arguments of the function to apply cols_ : list or tuple List of columns created by the transformation func_ : func Function to apply Returns ------- dict """ if not cols_: if not args_: X_ = func_(X_) else: X_ = func_(X_, *args_) elif len(cols_) == 1: if not args_: X_[cols_[0]] = func_(X_) else: X_[cols_[0]] = func_(X_, *args_) else: X_[cols_] = X_.apply(func_, args=args_, axis=1).apply(pd.Series) if not args_: X_.update(list(zip(cols_, func_(X_)))) else: X_.update(list(zip(cols_, func_(X_, *args_)))) return X_