Mastering the Art of Machine Learning Workflows: A Comprehensive Guide to Transformer, Estimator, and Pipeline | by Andreas Lukita | Jun, 2023


1. Streamlined Workflow. Leveraging Pipeline allows seamless integration of multiple steps in your data preprocessing and modeling journey. It enables you to chain together various transformers and estimators, ensuring a clear, concise, and automated flow from data preprocessing to model training and evaluation. By encapsulating your preprocessing and modeling steps in a Pipeline, your code becomes more organized, modular, and easier to understand. It improves the way your code looks and can be maintained because each step is clearly defined. Treat each step in the Pipeline as independent, you can change or add steps without worrying about how one preprocessing step will affect the other!

Image by Author

2. Prevent Data Leakage. The dreaded antagonist, the nemesis of every analyst. Data Leakage may occur when information from the test dataset unintentionally influences the preprocessing steps or model training, leading to overly optimistic performance estimates. In a way, you are leaking information about what is going to be tested and making your learning model see what is going to be tested in advance. Obviously, “he tryna gas it up”. Generally, the rule of thumb is to fit the training dataset only, then transform both the training and testing dataset. The code below shows where some people went wrong. Also, often you would have multiple preprocessing steps that typically involve transformers such as StandardScaler(), MinMaxScaler(), OneHotEncoder(), etc. Imagine having to do the fit and transform process multiple times throughout your workflow, wouldn’t that be confusing and inconvenient?

ss = StandardScaler()
X_train_scaled = ss.fit_transform(X_train)
X_test_scaled = ss.fit_transform(X_test)

#Some other variations
X_train_scaled = ss.fit(X_train)
X_test_scaled = ss.transform(X_test)

3. Hyperparameter Tuning and Cross-Validation. Easily tune hyperparameters across all the steps in your pipeline using techniques such as GridSearchCV. Error often goes unnoticed in this particular step, however. Let’s look at a simple illustration.

from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.feature_selection import SequentialFeatureSelector
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier

X, y = load_breast_cancer(return_X_y=True, as_frame=True)

#Without Pipeline
select = SequentialFeatureSelector(RandomForestClassifier(n_estimators=100), n_features_to_select=8, direction='forward').fit(X,y)
X_selected = select.transform(X)
logreg = LogisticRegression()
np.mean(cross_val_score(estimator=logreg, X=X_selected, y=y))

#With Pipeline
pipe = Pipeline([("select", SequentialFeatureSelector(RandomForestClassifier(n_estimators=100), n_features_to_select=8, direction='forward')),
("log", LogisticRegression())])
np.mean(cross_val_score(estimator=logreg, X=X, y=y))

Try running both examples: although the cross-validation score is not far off, the part without the Pipeline leaks information since the feature selection step is performed on the entire dataset. When we reach the cross-validation step in which the dataset is separated into training and validation sets, they are essentially from the same source (the training set has information learned previously from the validation set when we perform feature selection). If you find it hard to understand this part, try rereading the paragraph and code it out yourself to internalize.

Before we dive deeper into what Pipeline can do, let’s digress towards the components that form a Pipeline — Estimators. We will touch on the other components—Transformers, Predictors, and Models in the next section.

A lot of people often get confused with the term Estimator in Scikit-learn. People tend to associate estimators with the ability to predict—that is, with the predict method in particular. While there are some truths in that statement, it is unfortunately only a half-truth at best. Estimators are basically the building block of the Scikit-learn library. An estimator is a tool that can learn from your training set to create a model that can make predictions or inferences about new data. Since all estimators have the fit method to learn from the training set, they inherit from BaseEstimator

From BaseEstimator itself, there is no predict method, only fit. An estimator does not necessarily need to have a predict method, although some do. An estimator with predict method attempts to make predictions on new, unseen data based on the learned model. For example, regressors and classifiers such as Linear Regression, Random Forest Classifier, Gradient Boosting Classifier, etc. are estimators with thepredict method.

Going one step further, let’s peek into the original documentation for LogisticRegression class². In the snippet below, we observe that the class inherits from BaseEstimator for the fit method, and LinearClassifierMixin for the predict method.

Scikit-learn GitHub (BSD-3)

Transformer is a type of estimator with a transform method. Note that the word “transformer” here refers to the Scikit-learn context specifically. It should not be confused or mistaken with the transformer in neural network architecture, which has gained more attention and prominence in recent years.

In short, what a transformer does is transform/manipulate the predictors (X) in some ways such that it is ready to be consumed by machine learning algorithms. This could be scaling of continuous predictors using prominent tools such as StandardScaler and MinMaxScaler, or encoding categorical predictors using OneHotEncoder or OrdinalEncoder.

Going a step further, a transformer has a fit-transform mechanism, where it learns from the training data using the fit method and then applies the learned transformations to both the training and test data using the transform method. This ensures that the same transformations are consistently applied throughout.

Going two steps further, to follow the Scikit-learn API implementation rule, a transformer usually inherits from BaseEstimator for its fit method, and TransformerMixin for its transform method. Let’s peek into the original documentation for StandardScaler library³.

Scikit-learn GitHub

ColumnTransformer

At times, you would need to apply transformations specific to certain columns only depending on your needs. For example, applying OneHotEncoder to categorical features with no specific hierarchy, and OrdinalEncoder to categorical features with specific hierarchy and ordering (i.e. for t-shirt sizes, we usually have sizes ordering to follow such as XS<S<M<L<XL). We can achieve this separation using ColumnTransformer.

from sklearn.compose import ColumnTransformer

ohe_categorical_features = ['a', 'b', 'c']
ohe_categorical_transformer = Pipeline(steps=[
('ohe', OneHotEncoder(handle_unknown='ignore', sparse_output=False, drop='first'))
])

orde_categorical_features = ['d', 'e', 'f']
orde_categorical_transformer = Pipeline(steps=[
('orde', OrdinalEncoder(dtype='float'))
])

col_trans = ColumnTransformer(
transformers=[
('ohe_categorical_features', ohe_categorical_transformer, ohe_categorical_features),
('orde_categorical_features', orde_categorical_transformer, orde_categorical_features),
],
remainder='passthrough',
n_jobs=-1,
)

As you might expect, we are going to put the variable col_trans above as part of our big overall Pipeline later on in the code. Simple and elegant.

The Pipeline class executes the estimators in the pipe in a sequential manner, passing the output of one step as the input to the next. This essentially allows the concept of chaining to take place. From the Scikit-learn documentation itself, here are the criteria for an estimator to be eligible to be incorporated as part of a Pipeline.

For an estimator to be usable together with pipeline.Pipeline in any but the last step, it needs to provide a fit or fit_transform function. To be able to evaluate the pipeline on any data but the training set, it also needs to provide a transform function. There are no special requirements for the last step in a pipeline, except that it has a fit function.

Using Pipeline, we remove the redundant steps of having to call the method fit and transform on every estimator and/or transformer. Calling the method fit once directly from the pipeline suffices. How this works behind the scene is that it calls fit on the first estimator, thentransform the input and pass it on to the next estimator. Indeed, the pipeline is as best as what the last estimator can do (it has all the methods of the last estimator in the pipe). If the last estimator is a regressor, then the Pipeline can be used as a regressor. If the last estimator is a transformer, so is the pipeline.

Below is an illustration of how to use the Pipeline class.

imputer = KNNImputer(n_neighbors=5)
feature_select = SequentialFeatureSelector(RandomForestClassifier(n_estimators=100), n_features_to_select=8, direction='forward')
log_reg = LogisticRegression()
pipe = Pipeline([("imputer", imputer),
("select", feature_select),
("log", log_reg)])

In short, the argument to Pipeline is a list of tuples executed sequentially. The first element of the tuple is the arbitrary name you set as per your wish to identify the estimator, sort of like the ID. Meanwhile, the second element is the estimator object. Simple isn’t it? If you are not good with names, Scikit-learn provides the shorthand make_pipeline method that removes the headache of having to come up with names.

from sklearn.pipeline import make_pipeline
imputer = KNNImputer(n_neighbors=5)
feature_select = SequentialFeatureSelector(RandomForestClassifier(n_estimators=100), n_features_to_select=8, direction='forward')
log_reg = LogisticRegression()
make_pipeline(imputer, feature_select, log_reg)

So far, methods such as StandardScaler and MinMaxScaler look good and work for many cases. The question is, what if you have your own customized method to manipulate and preprocess your dataset for example? Can you still incorporate it neatly into the Pipeline class? The answer is a resounding yes! There are two ways of achieving this—leveraging on FunctionTransformer or writing your own custom class.

Let’s say you want to do a Box-Cox transformation on part of your dataset.

from scipy.stats import boxcox
from sklearn.preprocessing import FunctionTransformer
from sklearn.compose import ColumnTransformer

boxcox_features = ['x1', 'x2']
boxcox_transformer = Pipeline(steps=[
('boxcox', FunctionTransformer(lambda x: boxcox(x)[0])
])

col_trans = ColumnTransformer(
transformers=[
('boxcox_features', boxcox_transformer, boxcox_features),
...
],
remainder='passthrough',
n_jobs=-1,
)

The second method is to write your own custom class that inherits from BaseEstimator and TransformerMixin if you are writing a transformer estimator. If you are writing an estimator with a classification task for example, then inherit from ClassifierMixin instead.

Let’s say you want to write a class that removes outliers and incorporates it into your Pipeline.

def outlier_thresholds(df: pd.DataFrame, 
col: str,
q1: float = 0.05,
q3: float = 0.95):
#1.5 as multiplier is a rule of thumb. Generally, the higher the multiplier,
#the outlier threshold is set farther from the third quartile, allowing fewer data points to be classified as outliers

return (df[col].quantile(q1) - 1.5 * (df[col].quantile(q3) - df[col].quantile(q1)),
df[col].quantile(q3) + 1.5 * (df[col].quantile(q3) - df[col].quantile(q1)))

def delete_potential_outlier_list(df: pd.DataFrame,
cols: list) -> pd.DataFrame:

for item in cols:
low, high = outlier_thresholds(df, col)
df.loc[(df[col]>high) | (df[col]<low),col] = np.nan
return df

class OutlierRemove(BaseEstimator, TransformerMixin):

def __init__(self, outlierlist):
self.outlierlist = outlierlist

def fit(self, X, y=None):
return self

def transform(self,X,y=None):
return delete_potential_outlier_list(X, self.outlierlist)

I want to bring your focus, particularly on the OutlierRemove class. Here, we have the fit method that returns self to allow us to continue chaining and thetransform method that does the removal of the outliers. After this, we can simply incorporate the class into our Pipeline like the following

pipe = Pipeline([("remove_outlier", OutlierRemove(["a", "b", "c"])),
("imputer", imputer),
("select", feature_select),
("log", log_reg)])

Here comes the confusing part—FeatureUnion serves the same purpose as Pipeline, but they work quite differently. In FeatureUnion, the fit and transform methods are not done sequentially one after the other. Each transformer estimator is fit independently to the data, and then the transform methods are applied in parallel. The end results are then combined together. Picture the code below. Here, we can run the preprocessing for numerical and categorical predictors in parallel using FeatureUnion as they are independent of one another. This results in faster and more efficient operation.

from sklearn.pipeline import FeatureUnion

standard_numerical_features = ['x1', 'x2']
standard_numerical_transformer = Pipeline(steps=[
('remove_outlier', OutlierTrans(standard_numerical_features)),
('scale', StandardScaler())
])

ohe_categorical_features = ['x3', 'x4']
ohe_categorical_transformer = Pipeline(steps=[
('ohe', OneHotEncoder(handle_unknown='ignore', sparse_output=False, drop='first'))
])

feature_union = FeatureUnion(
transformers=[
('standard_numerical_features', standard_numerical_transformer),
('ohe_categorical_features', ohe_categorical_transformer),
],
n_jobs=-1,
)

pipeline = Pipeline([
('feature_union', feature_union),
('model', RandomForestClassifier())
])

pipeline.fit(X_train, y_train)

Here, I wish to end off by illustrating the contents above using a real-world dataset inspired by a Portuguese Financial Institution. The dataset is available on UCI Machine Learning Repository¹ for public use with citation.

Allow me to skip all the exploratory data analysis and visualization, and zoom straight into the modeling of the Pipeline.

1. Importing the dataset

import pandas as pd

df = (pd
.read_csv('../../dataset/bank_marketing/bank-11k.csv', sep=',')
.rename(columns={'y': 'deposit'})
.pipe(lambda df_: df_.assign(deposit=np.where(df_.deposit == "no", 0, 1)))
)

In short, what the code above does are the followings:

  1. Import the dataset with a comma separator
  2. Rename the column ‘y’ to ‘deposit’
  3. Encode the column deposit from no and yes to 0 and 1

2. Train-Test Split

from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(df.drop(columns=['deposit']),
df[['deposit']].values.ravel(),
test_size=0.2,
random_state=42)

3. Writing additional 3 custom classes

from sklearn.base import BaseEstimator, TransformerMixin

#Custom class #1: switch between classifiers
class ClfSwitcher(BaseEstimator):

#By default, run XGBClassifier
def __init__(self, estimator = XGBClassifier()):
self.estimator = estimator

def fit(self, X, y=None, **kwargs):
self.estimator.fit(X, y)
return self

def predict(self, X, y=None):
return self.estimator.predict(X)

def predict_proba(self, X):
return self.estimator.predict_proba(X)

def score(self, X, y):
return self.estimator.score(X, y)

#Custom class 2: remove outliers
def outlier_thresholds(df: pd.DataFrame,
col: str,
q1: float = 0.05,
q3: float = 0.95):

return (df[col].quantile(q1) - 1.5 * (df[col].quantile(q3) - df[col].quantile(q1)),
df[col].quantile(q3) + 1.5 * (df[col].quantile(q3) - df[col].quantile(q1)))

def delete_potential_outlier_list(df: pd.DataFrame,
cols: list) -> pd.DataFrame:

for item in cols:
low, high = outlier_thresholds(df, col)
df.loc[(df[col]>high) | (df[col]<low),col] = np.nan
return df

class OutlierTrans(BaseEstimator, TransformerMixin):

def __init__(self, outlierlist):
self.outlierlist = outlierlist

def fit(self, X, y=None):
return self

def transform(self,X,y=None):
return delete_potential_outlier_list(X, self.outlierlist)

#Custom class #3: add new columns, drop column, and modify data types
class TweakBankMarketing(BaseEstimator, TransformerMixin):
def fit(self, X, y=None):
return self

def transform(self, X, y=None):
return (X
.assign(pdays_cat=lambda df_: np.where(df_.pdays < 0, "no contact", "contacted"),
previous_cat=lambda df_: np.where(df_.previous == 0, "no contact", "contacted"),
job=lambda df_: np.where(df_.job == "unknown", np.nan, df_.job),
education=lambda df_: np.where(df_.education == "unknown", np.nan, df_.education),
contact=lambda df_:np.where(df_.contact == "unknown", np.nan, df_.contact),
poutcome=lambda df_: np.where(df_.poutcome == "other", np.nan, df_.contact),
) #add new predictors
.drop(columns=['duration']) #drop predictor due to data leakage
.astype({'age': 'int8',
'balance': 'int32',
'day': 'category',
'campaign': 'int8',
'pdays': 'int16',
'previous': 'int16',})
.pipe(lambda df_: df_.astype({column: 'category' for column in (df_.select_dtypes("object").columns.tolist())})) #convert data type from object to category
)

In short, what the code above does are the followings:

  1. The class ClfSwitcher inherits from BaseEstimator. This class serves the purpose of switching between classifiers easily. We set the default classifier to be XGBoost Classifier.
  2. The method outlier_thresholds and delete_potential_outlier_list identify outliers in each column and set them to NaN. The class OutlierTrans is a transformer that inherits from both BaseEstimator and TransformerMixin. The transform method returns the previously mentioned 2 methods.
  3. The class TweakBankMarketing is a custom class to do custom transformations such as creating new columns, dropping undesirable columns, and changing data types accordingly.

4. Preparing Pipeline

from sklearn.preprocessing import StandardScaler, MinMaxScaler, OrdinalEncoder, OneHotEncoder,
from sklearn.impute import KNNImputer

from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

standard_numerical_features = ['age', 'campaign', 'pdays', 'previous'] #drop pdays
standard_numerical_transformer = Pipeline(steps=[
('remove_outlier', OutlierTrans(standard_numerical_features)),
('scale', StandardScaler())
])

minmax_numerical_features = ['balance']
minmax_numerical_transformer = Pipeline(steps=[
('remove_outlier', OutlierTrans(minmax_numerical_features)),
('scale', MinMaxScaler())
])

ohe_categorical_features = ['job', 'marital', 'default', 'housing', 'loan', 'contact', 'poutcome', 'pdays_cat', 'previous_cat']
ohe_categorical_transformer = Pipeline(steps=[
('ohe', OneHotEncoder(handle_unknown='ignore', sparse_output=False, drop='first'))
])

orde_categorical_features = ['education', 'day', 'month']
orde_categorical_transformer = Pipeline(steps=[
('orde', OrdinalEncoder(dtype='float'))
])

col_trans = ColumnTransformer(
transformers=[
('standard_numerical_features', standard_numerical_transformer, standard_numerical_features),
('minmax_numerical_features', minmax_numerical_transformer, minmax_numerical_features),
('ohe_categorical_features', ohe_categorical_transformer, ohe_categorical_features),
('orde_categorical_features', orde_categorical_transformer, orde_categorical_features),
],
remainder='passthrough',
verbose=0,
verbose_feature_names_out=False,
n_jobs=-1,)

pipeline = Pipeline(steps = [
('tweak_bank_marketing', TweakBankMarketing()),
('col_trans', col_trans),
('imputer', KNNImputer(n_neighbors=5)),
('clf', ClfSwitcher()),
])
pipeline

In short, what the code above does are the followings:

  1. Scale numerical columns using StandardScaler and MinMaxScaler
  2. Encode categorical columns using OneHotEncoder and OrdinalEncoder
  3. Use ColumnTransformer to do the transformations for different columns of the datasets separately.
  4. Finally, Pipeline encapsulates everything seamlessly.

At this stage, this is our constructed Pipeline.

Image by Author

5. Define hyperparameters for Grid Search CV

#We define all the hyperparameters for 4 classifiers so that we can easily switch from one to another
params_grid = [
{'clf__estimator': [SGDClassifier()],
'clf__estimator__penalty': ('l2', 'elasticnet', 'l1'),
'clf__estimator__max_iter': [500],
'clf__estimator__tol': [1e-4],
'clf__estimator__loss': ['hinge', 'log_loss', 'modified_huber'],
},

{'clf__estimator': [LogisticRegression()],
'clf__estimator__C': [0.01, 0.1, 1, 10, 100],
'clf__estimator__max_iter': [1000]
},

{'clf__estimator': [RandomForestClassifier(n_estimators=100)],
'clf__estimator__max_features': [3,4,5,6,7],
'clf__estimator__max_depth': [3,4,5]
},

{'clf__estimator': [XGBClassifier()],
'clf__estimator__max_depth': [4,5,6],
'clf__estimator__learning_rate': [0.01, 0.1],
'clf__estimator__n_estimators': [80, 100],
'clf__estimator__booster': ['gbtree'],
'clf__estimator__gamma': [7, 25, 100],
'clf__estimator__subsample': [0.3, 0.6],
'clf__estimator__colsample_bytree': [0.5, 0.7],
'clf__estimator__colsample_bylevel': [0.5, 0.7],
'clf__estimator__eval_metric': ['auc']
},
]

In short, what the code above does are the followings:

  1. Define parameter grids for 4 different classifiers, namely, SGDClassifier, LogisticRegression, RandomForestClassifier, XGBClassifier.

6. Perform Grid Search CV

from sklearn.model_selection import GridSearchCV

%%time
grid = GridSearchCV(pipeline, params_grid, cv=5, n_jobs=-1, return_train_score=False, verbose=0)
grid.fit(X_train, y_train)

In short, what the code above does are the followings:

  1. Placing our pipeline object as the first argument to the GridSearchCV parameter.

7. Printing best estimator

print(f'Best params: {grid.best_params_}')
print(f'Best CV score: {grid.best_score_}')
print(f'Validation-set score: {grid.score(X_test, y_test)}')

print(f'Accuracy score: {accuracy_score(y_test, grid.predict(X_test))}')
print(f'Precision score: {precision_score(y_test, grid.predict(X_test))}')
print(f'Recall score: {recall_score(y_test, grid.predict(X_test))}')
print(f'ROC-AUC score: {roc_auc_score(y_test, grid.predict(X_test))}')

Here, we obtain a validation score of 0.74, with an AUC score of 0.74 as well.

8. Plot the ROC-AUC curve

fpr, tpr, thresholds = skmet.roc_curve(y_test, grid.predict(X_test))
roc_auc = skmet.auc(fpr, tpr)
display = skmet.RocCurveDisplay(fpr=fpr,
tpr=tpr,
roc_auc=roc_auc,
estimator_name='XGBoost Classifier')
display.plot();
Image by Author

There you have it! Pipeline with Estimators and Transformers. Next time when you approach an ML project, consider using this technique. It may seem difficult to adopt at first, but keep practicing and soon you will create robust and efficient Machine Learning pipelines.

If you pick up something useful from this article, do consider giving me a Follow on Medium. Easy, 1 article a week to keep yourself updated and stay ahead of the curve!

  1. Bank Marketing Data Set [Moro et al., 2014] S. Moro, P. Cortez, and P. Rita. A Data-Driven Approach to Predict the Success of Bank Telemarketing. Decision Support Systems, Elsevier, 62:22–31, June 2014: https://archive.ics.uci.edu/ml/datasets/Bank+Marketing (CC BY 4.0)
  2. Scikit-learn Linear Model Logistic: https://github.com/scikit-learn/scikit-learn/blob/364c77e047ca08a95862becf40a04fe9d4cd2c98/sklearn/linear_model/_logistic.py
  3. Scikit-learn Preprocessing: https://github.com/scikit-learn/scikit-learn/blob/364c77e04/sklearn/preprocessing/_data.py#L644
  4. Developing Scikit-learn estimators: https://scikit-learn.org/stable/developers/develop.html
  5. Scikit-learn ColumnTransformer: https://scikit-learn.org/stable/modules/generated/sklearn.compose.ColumnTransformer.html
  6. Scikit-learn Pipeline: https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.Pipeline.html
  7. Scikit-learn FeatureUnion: https://scikit-learn.org/stable/modules/generated/sklearn.pipeline.FeatureUnion.html



Source link

Leave a Comment