Source code for melusine.prepare_email.metadata_engineering

import re
import copy
import pandas as pd
from collections import Counter
from itertools import chain
from sklearn import preprocessing
from sklearn.base import BaseEstimator, TransformerMixin
from melusine.utils.transformer_scheduler import TransformerScheduler


[docs]class MetaExtension(BaseEstimator, TransformerMixin): """Transformer which creates 'extension' feature extracted from regex in metadata. It extracts extension of mail adresses. Compatible with scikit-learn API. """ def __init__(self, topn_extension=100): self.le_extension = preprocessing.LabelEncoder() self.topn_extension = topn_extension
[docs] def fit(self, X, y=None): if isinstance(X, dict): raise TypeError( "You should not use fit on a dictionary object. Use a DataFrame" ) """ Fit LabelEncoder on encoded extensions.""" X["extension"] = X.apply(self.get_extension, axis=1) self.top_extension = self.get_top_extension(X, n=self.topn_extension) X["extension"] = X.apply( self.encode_extension, args=(self.top_extension,), axis=1 ) # We add the value "other" for the training of the label encoder to deal with unseen values of metadatas during inference ! self.le_extension.fit(pd.concat((X["extension"], pd.Series(["other"])))) return self
[docs] def transform(self, X): """Encode extensions""" if isinstance(X, dict): apply_func = TransformerScheduler.apply_dict else: apply_func = TransformerScheduler.apply_pandas X["extension"] = apply_func(X, self.get_extension) X["extension"] = apply_func( X, self.encode_extension, args_=(self.top_extension,) ) if isinstance(X["extension"], str): X["extension"] = self.le_extension.transform([X["extension"]])[0] else: X["extension"] = self.le_extension.transform(X["extension"]) return X
[docs] @staticmethod def get_extension(row): """Gets extension from email address.""" x = row["from"] try: extension = re.findall(r"\@([^.]+)", x)[0] except Exception: return "" return extension
[docs] @staticmethod def get_top_extension(X, n=100): "Returns list of most common extensions." a = Counter(X["extension"].values) a = a.most_common(n) a = [x[0] for x in a] return a
[docs] @staticmethod def encode_extension(row, top_ext): x = row["extension"] """Encode most common extensions and set the rest to 'other'.""" if x in top_ext: return x else: return "other"
[docs]class MetaDate(BaseEstimator, TransformerMixin): """Transformer which creates new features from dates such as: - hour - minute - dayofweek Compatible with scikit-learn API. Parameters ---------- date_format : str, optional Regex to extract date from text. date_format : str, optional A date format. """ def __init__( self, regex_date_format=r"\w+ (\d+) (\w+) (\d{4}) (\d{2}) h (\d{2})", date_format="%d/%m/%Y %H:%M", ): self.regex_date_format = regex_date_format self.date_format = date_format self.month = { "janvier": "1", "février": "2", "mars": "3", "avril": "4", "mai": "5", "juin": "6", "juillet": "7", "août": "8", "septembre": "9", "octobre": "10", "novembre": "11", "décembre": "12", }
[docs] def fit(self, X, y=None): """Unused method. Defined only for compatibility with scikit-learn API.""" return self
[docs] def transform(self, X): if isinstance(X, dict): apply_func = TransformerScheduler.apply_dict else: apply_func = TransformerScheduler.apply_pandas """Transform date to hour, min, day features.""" X["date"] = apply_func(X, self.date_formatting, args_=(self.regex_date_format,)) X["hour"] = apply_func(X, self.get_hour) X["min"] = apply_func(X, self.get_min) X["dayofweek"] = apply_func(X, self.get_dayofweek) return X
[docs] def date_formatting(self, row, regex_format): """Set a date in the right format""" x = row["date"] try: e = re.findall(regex_format, x)[0] date = e[0] + "/" + e[1] + "/" + e[2] + " " + e[3] + ":" + e[4] for m, m_n in self.month.items(): date = date.replace(m, m_n) date = pd.to_datetime( date, format=self.date_format, infer_datetime_format=False, errors="coerce", ) except Exception: return pd.to_datetime(x) return date
[docs] @staticmethod def get_hour(row): """Get hour from date""" x = row["date"] try: return x.hour except Exception: return 0
[docs] @staticmethod def get_min(row): x = row["date"] """Get minutes from date""" try: return x.minute except Exception: return 0
[docs] @staticmethod def get_dayofweek(row): """Get day of the week from date""" x = row["date"] try: return x.dayofweek except Exception: return 0
[docs]class Dummifier(BaseEstimator, TransformerMixin): """Transformer to dummifies categorial features and list of . Compatible with scikit-learn API. """ def __init__( self, columns_to_dummify=["extension", "dayofweek", "hour", "min", "attachment_type"], copy=True, ): self.columns_to_dummify = columns_to_dummify self.copy = copy pass
[docs] def fit(self, X, y=None): """Store dummified features to avoid inconsistance of new data which could contain new labels (unknown from train data). """ if isinstance(X, dict): raise TypeError( "You should not use fit on a dictionary object. Use a DataFrame" ) X_ = pd.get_dummies( X, columns=[ col for col in self.columns_to_dummify if col != "attachment_type" ], prefix_sep="__", dummy_na=False, ) dummies_ = tuple([col + "__" for col in self.columns_to_dummify]) if "attachment_type" in self.columns_to_dummify: X_attachment = pd.get_dummies( X["attachment_type"].apply(pd.Series).stack().astype(int) ).sum(level=0) X_attachment = X_attachment.add_prefix("attachment_type__") self.dummy_features = [ c for c in pd.concat([X_, X_attachment], axis=1) if c.startswith(dummies_) ] else: self.dummy_features = [c for c in X_ if c.startswith(dummies_)] return self
[docs] def transform(self, X, y=None): """Dummify features and keep only common labels with pretrained data.""" return_dict = False # Case input is a dict if isinstance(X, dict): if self.copy: X_ = copy.deepcopy(X) else: X_ = X X_ = pd.DataFrame([X_]) return_dict = True # Case input is a DataFrame else: if self.copy: X_ = X.copy() else: X_ = X X_ = pd.get_dummies( X_, columns=[ col for col in self.columns_to_dummify if col != "attachment_type" ], prefix_sep="__", dummy_na=False, ) if "attachment_type" in self.columns_to_dummify: X_attachment = pd.get_dummies( X_["attachment_type"].apply(pd.Series).stack().astype(int) ).sum(level=0) X_attachment = X_attachment.add_prefix("attachment_type__") X_ = pd.concat([X_, X_attachment], axis=1) if return_dict: X_ = X_.T.reindex(self.dummy_features).T.fillna(0) return X_[self.dummy_features].to_dict(orient="records")[0] else: X_ = X_.T.reindex(self.dummy_features).T.fillna(0) return X_[self.dummy_features]
[docs]class MetaAttachmentType(BaseEstimator, TransformerMixin): """Transformer which creates 'type' feature extracted from regex in metadata. It extracts types of attached files. Compatible with scikit-learn API. """ def __init__(self, topn_extension=100): self.le_extension = preprocessing.LabelEncoder() self.topn_extension = topn_extension
[docs] def fit(self, X, y=None): if isinstance(X, dict): raise TypeError( "You should not use fit on a dictionary object. Use a DataFrame" ) """ Fit LabelEncoder on encoded extensions.""" X["attachment_type"] = X.apply(self.get_attachment_type, axis=1) self.top_attachment_type = self.get_top_attachment_type( X, n=self.topn_extension ) X["attachment_type"] = X.apply( self.encode_type, args=(self.top_attachment_type,), axis=1 ) # We add the value "other" for the trainign of the label encoder to deal with unseen values during inference ! self.le_extension.fit(pd.concat((X["attachment_type"], pd.Series([['other']]))).sum()) return self
[docs] def transform(self, X): """Encode extensions""" if isinstance(X, dict): apply_func = TransformerScheduler.apply_dict else: apply_func = TransformerScheduler.apply_pandas X["attachment_type"] = apply_func(X, self.get_attachment_type) X["attachment_type"] = apply_func( X, self.encode_type, args_=(self.top_attachment_type,) ) if isinstance(X["attachment_type"], list): X["attachment_type"] = self.le_extension.transform([X["attachment_type"]])[ 0 ] else: X["attachment_type"] = [ self.le_extension.transform(t) for t in X["attachment_type"] ] return X
[docs] @staticmethod def get_attachment_type(row): """Gets type from attachment.""" x = row["attachment"] attached_types = [] try: for file in x: match = re.findall(r".*\.(.*)", file) if match: attached_types.append(match[0]) except Exception: return "" return attached_types
[docs] @staticmethod def get_top_attachment_type(X, n=100): "Returns list of most common types of attachment." type_counter = Counter(chain(*X["attachment_type"])) type_counter = type_counter.most_common(n) top_attachment_type = [x[0] for x in type_counter] return top_attachment_type
[docs] @staticmethod def encode_type(row, top_ext): x = row["attachment_type"] """Encode most common type of attachment and set the rest to 'other'.""" encode = [] if x: for attachment in x: if attachment in top_ext: encode.append(attachment) else: encode.append("other") else: # No attachments encode.append("none") return encode