Skip to the content.

Machine Learning Models to Production Part 1

Build your own Scikit-learn pipelines

This is first part of a multi part series on how to build machine learning models, converting them to packages and deploying them in production environments. There are many ways to do this, the approach presented here is just one of these.

Codes:

Part a and Part b is just a showcase of how this is done, and the code will not run on its own. You will need to do things like downloading dataset and importing ibraries, and the code snippets are expected to serve as guidelines only.

Part a: A very basic implementation of pipeline for feature engineering and prediction Part b: Implementing multiple algorithms using pre-built pipelines for a quick model building
Part c

  • Implementing a prediction algorithm using functions
  • Converting functions into classes to follow OOP paradigm and building a custom-made pipelines

What are pipelines: Pipelines are one of the ways of implementing procedural programming. In the procedural programming paradigm, procedures (functions or subroutines) are carried out as a series of computation steps.

There are many steps in building a machine learning model. The data is almost never clean, and you need to do some preprocessing (like normalizing) to ensure the speed and accuracy of your model is upto the mark. To implement the procedural programming, one way is to write individual functions for all the individual processed, and call them in sequence, for training and testing datasets. Another way is to leverage the power of Scikit-learn pipeline to make this process easier, reproducible, easy to understand, easy to debug, and enforceable (to ensure no step is missed). Also, this way, the model becomes easier to deploy into a production environment.

SKlearn Pipeline: The Scikit-learn library in python is a powerful and one of the most used libraries in machine learning. It provides as efficient implementation of a host of algorithms, ranging from data transformations, preprocessing, and the entire suite of machine learning models. Its written in a form such that most of its algorithms follow the same functionality. This means if you know the code to implement a Logistic Regression, you can run SVM or Decision trees classifier but just changing the name of the classifier and a few parameters (more or less), and the code will run just fine. Scikit-learn is so well established that new packages in other libraries (like Keras) are designed keeping in mind scikit-learn functionality.

Part a : Basic Pipeline Codes

This is just to showcase how a prediction model using pipelines look like. There are more detailed explanations over the internet.

Data: Sonar Mines Rocks Dataset [Source UCI ML Repo and Kaggle]

I’m not going into detail of data, it’s a very simple classification data without any missing values or different data type, you can run a crude model on this data in just a few lines.

import pandas as pd
data = pd.read_csv(filename)

## Separate Training & Validation Dataset
from sklearn.model_selection import train_test_split
X = data.values[:,0:60]
Y = data.values[:,60]
X_train, X_val, Y_train, Y_val = train_test_split(X,Y, test_size = 0.2, random_state=42)



# Build Pipelines – import necessary libraries


#1. Single Pipeline for Prediction
pipe = Pipeline([
                ('LR', LogisticRegression())
                ])
    
pipe.fit(X_train,Y_train) 
pred = pipe.predict(X_val)
print(accuracy_score(Y_val, pred))

#2 Single Pipeline with data scaling

pipe = Pipeline([
                ('Scaler', StandardScaler()),
                ('LR', LogisticRegression())
                ])
    
pipe.fit(X_train,Y_train) 
pred = pipe.predict(X_val)
print(accuracy_score(Y_val, pred))

Another Example: chaining the fit and predict method together (source: StackOverFlow)

Word Vectors: without pipelines

Vect = CountVectorizer()
Tfidf = TfidfTransformer()
Clf = SGDClassifier()

Vx = vect.fit_transform(X_train)
Tfidfx = tfidf.fit_transform(vx)
Predicted = clf.fit_predict(tfidf)

#Evaluate on text
Vx = vect.transform(X_test)
Tfidf = tfidf.transform(vx)
Predicted = clf.predict(tfidfX)

Word Vectors: Using pipelines

Pipeline = Pipeline([
(‘vect’, CountVectorizer()),
(‘tfidf’,Tfidftransformer()),
(‘clf’,SGDClassifier()),
])

Predicted= pipeline.fit(X_train).predict(X_train)
#Evaluate all steps on Test
Predicted = pipeline.predict(X_test)	

Part b:

This is just an extension of above codes, and is sourced from the awesome blog by Jason Brownlee (machinelearningmastery.com – you should have a look at his blogs). This is a way of chaining multiple classifiers together to quickly evaluate multiple algorithms in one shot.

Note – import necessary libraries before running this on any dataset. Assume X_train, X_test, Y_train and Y_test from the somarminesrocks dataset in Part a

pipelines = []
pipelines.append(('ScaledLR', Pipeline([('Scaler', StandardScaler()),('LR', LogisticRegression())])))
pipelines.append(('ScaledLDA', Pipeline([('Scaler', StandardScaler()),('LDA', KNeighborsClassifier())])))
pipelines.append(('ScaledKNN', Pipeline([('Scaler', StandardScaler()),('KNN', LogisticRegression())])))
pipelines.append(('ScaledCART', Pipeline([('Scaler', StandardScaler()),('CART', DecisionTreeClassifier())])))
pipelines.append(('ScaledNB', Pipeline([('Scaler', StandardScaler()),('NB', GaussianNB())])))
pipelines.append(('ScaledSVM', Pipeline([('Scaler', StandardScaler()),('SVM', SVC())])))

results = []
names=[]
for name, model in pipelines:
    kfold = KFold(n_splits=10, random_state=42)
    cv_results = cross_val_score(model, X_train, Y_train, cv=kfold, scoring='accuracy')
    results.append(cv_results)
    names.append(name)
    msg = "%s: %f (%f)" % (name, cv_results.mean(), cv_results.std())
    print(msg)

# Compare Algorithms
fig = plt.figure()
fig.suptitle('Algorithm Comparison')
ax = fig.add_subplot(111)
plt.boxplot(results)
ax.set_xticklabels(names)
plt.show()

Great way to reduce your code and ensure that the train and test follow the same procedures.

There is one problem in this approach – these are prebuilt functions and modules, and though they provide a level of flexibility in terms of defining the parameters, it does not allow you to modify the way these functions are run, or if you want to do things in a different way. Most of the times, in almost 99% of the data science work, you have to write certain custom functions before your dataset can be fed to these pipelines. These would include processes like imputing missing values, label encoding of categorical variables, treating date variables correctly (converting date to months, or taking difference in number of days between two dates in different columns), Log transformation (or any other transformation) of certain features which are not Guassian, Dropping certain features, or any other preprocessing step which needs to be run before you can call any model. Creating custom pipelines are a key to do this effectively.

Building your own Scikit-learn Pipelines

Advantages:

  • Define the preprocessing the way you want – the way it should be done since every data is different
  • This is implemented in a robust Object Oriented way, so this approach is very structured
  • Handle exceptions in the data if and when they occur, and take necessary action
  • Ideal for production grade code, and for converting model into a package

One key advantage is to break the entire code into different modules – one file for config variables, one code for building pipelines for each preprocessing step, once code to data import/export & saving/loading models, and one main code for calling and running a pipeline to train and save the model, and one code for running prediction on any data. This modular approach divides the entire code into chunks, and makes the maintenance and debugging easy. Furthermore, if you want to add a new feature transformer, or modify something else, you can do it without going through the entire code.

Components of a Scikit-learn object:

Transformers: classes that have a fit and transform method for transforming data

  • Examples: Scalers, feature selectors or onehot encoders Predictor: classes that have a fit and predict methods – for prediction
  • Examples – ML algorithms like LogisticRegression, Lasso, SVC etc Pipeline: class that runs transformers and predictors in a sequence
  • All steps should be transformers except the last one
  • Last step should be a predictor

Code Example: Converting a function to a Sklearn class

Data: [Kaggle] Data : House Price Prediction https://www.kaggle.com/c/house-prices-advanced-regression-techniques/data

Objective: Given a bunch of numerical, categorical and temporal features, predict the SalePrice of the house.

Challenges: The data has a lot of missing variables, and also different data types (numeric and categorical). Also some features are skewed so need to be transformed. Categorical variables need to be encoded. More preprocessing can be applied, this is just a demo version of the code.

Example 1:

The function below takes into input a dataframe (‘X’), and a list of categorical features (‘features’), and returns the dataframe with missing values replaced by ‘Missing’. This function can be called for any dataframe to replace missing value in categorical variables

def categorical_imputer(X, features):
    X=X.copy()
    for var in features:
        X[var]= X[var].fillna('Missing')
    return X

Scikit-learn Class for converting missing values in categorical data to ‘Missing’

from sklearn.base import BaseEstimator, TransformerMixin
class CategoricalImputer(BaseEstimator, TransformerMixin):
    def __init__(self, variables=None):
#Check if the variables passed are in a list format, if not convert #to list format and assign it to self.variables to be used in later #methods
        if not isinstance(variables,list):
            self.variables = [variables]
        else:
            self.variables = variables
    
    def fit(self, X:pd.DataFrame,y:pd.Series=None):
        #Nothing to do here, just return the dataframe as is
  return self
    
    def transform(self, X:pd.DataFrame):
	  #Fill missing values and return the modified dataframe
        X=X.copy()
        for feature in self.variables:
            X[feature] = X[feature].fillna("Missing")
        return X

Key Components:

  • BaseEstimator and TransformerMixin : Classes inherited from sklearn.base module which enables the pipeline functionality
  • init : Class constructor, assign the list of variables to be transform to a class object

Example 2:

Encoding Categorical Variables – a standard label encoder, with the labels being assigned in order of the frequencies of target variable for each category

Function:

def categorical_encoder(X,y, features):
    temp = pd.concat([X,y],axis=1)
    temp.columns = list(X.columns)+['target']
    
    encoder_dict_={}
    for var in features:
        t=temp.groupby([var])['target'].mean().sort_values(ascending=True).index
        encoder_dict_[var] ={k: i for i,k in enumerate(t,0)}
    
    ## Encode variables
    for var in features:
        X[var] = X[var].map(encoder_dict_[var])
    
    return X

Sklearn Class:

class CategoricalEncoder(BaseEstimator, TransformerMixin):
    """String to numbers categorical encoder."""

    def __init__(self, variables=None):
        if not isinstance(variables, list):
            self.variables = [variables]
        else:
            self.variables = variables

    def fit(self, X, y):
        temp = pd.concat([X, y], axis=1)
        temp.columns = list(X.columns) + ['target']

        # persist transforming dictionary
        self.encoder_dict_ = {}

        for var in self.variables:
            t = temp.groupby([var])['target'].mean().sort_values(
                ascending=True).index
            self.encoder_dict_[var] = {k: i for i, k in enumerate(t, 0)}

        return self

    def transform(self, X):
        # encode labels
        X = X.copy()
        for feature in self.variables:
            X[feature] = X[feature].map(self.encoder_dict_[feature])
        return X

Complete Code:

  1. Config variables – for defining variable list. This can be obtained after running the model first, understanding data and identifying features
  2. Implementing functions – Data Processing and Prediction using Functions
  3. Implementing Pipelines – Data Processing and Prediction using Pipelines

Config Variables: common to both functional codes and pipeline codes

TESTING_DATA_FILE = 'test.csv'
TRAINING_DATA_FILE = 'train.csv'
TARGET = 'SalePrice'

FEATURES = ['MSSubClass', 'MSZoning', 'Neighborhood',
            'OverallQual', 'OverallCond', 'YearRemodAdd',
            'RoofStyle', 'MasVnrType', 'BsmtQual', 'BsmtExposure',
            'HeatingQC', 'CentralAir', '1stFlrSF', 'GrLivArea',
            'BsmtFullBath', 'KitchenQual', 'Fireplaces', 'FireplaceQu',
            'GarageType', 'GarageFinish', 'GarageCars', 'PavedDrive',
            'LotFrontage',
            # this one is only to calculate temporal variable:
            'YrSold']

DROP_FEATURES = ['YrSold']

NUMERICAL_VARS_WITH_NA = ['LotFrontage']
CATEGORICAL_VARS_WITH_NA = ['MasVnrType', 'BsmtQual', 'BsmtExposure',
                            'FireplaceQu', 'GarageType', 'GarageFinish']

TEMPORAL_VARS = ['YearRemodAdd']

NUMERICALS_LOG_VARS = ['LotFrontage', '1stFlrSF', 'GrLivArea']

CATEGORICAL_VARS = ['MSZoning', 'Neighborhood', 'RoofStyle', 'MasVnrType',
                    'BsmtQual', 'BsmtExposure', 'HeatingQC', 'CentralAir',
                    'KitchenQual', 'FireplaceQu', 'GarageType', 'GarageFinish',
                    'PavedDrive']

NUMERICAL_NA_NOT_ALLOWED = [
    feature for feature in FEATURES
    if feature not in CATEGORICAL_VARS + NUMERICAL_VARS_WITH_NA
]

CATEGORICAL_NA_NOT_ALLOWED = [
    feature for feature in CATEGORICAL_VARS
    if feature not in CATEGORICAL_VARS_WITH_NA
]


PIPELINE_NAME = 'lasso_regression'

Implementing functional code for prediction (written in a bit crude form to ensure all steps are visible)

import pandas as pd
def read_data(path):
    train = pd.read_csv(path+'train.csv')
    test = pd.read_csv(path+'test.csv')
    return [train,test]


train,test = read_data(datapath)


## Split Data
import numpy as np
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(train[FEATURES], train[TARGET],
                                                    test_size=0.1, random_state=42)

y_train = np.log(y_train)
y_test = np.log(y_test)


### Categorical Imputer
##Variables: CATEGORICAL_VARS_WITH_NA
def categorical_imputer(X, features):
    X=X.copy()
    for var in features:
        X[var]= X[var].fillna('Missing')
    return X

### Numerical Imputer
### Variables: NUMERICAL_VARS_WITH_NA
def numerical_imputer(X,features):
    
    #Get a list of mode values for each numerical variable & create a dict
    imputer_dict_ = {}
    for var in features:
        imputer_dict_[var] = X[var].mode()[0]
    
    X=X.copy()
    
    
    for var in features:
        X[var].fillna(imputer_dict_[var], inplace=True)
    
    return X  


### Temporal Variables
def temporal_variable(X,features, reference_var):
    X = X.copy()
    for var in features:
        X[var] = X[var]-X[reference_var]
    return X

### Rare label Categorical Imputer

def rare_label_encoder(X,features,tol=0.05):
    ##Create a dictionary of variables with rare labels
    encoder_dict_={}
    for var in features:
        t=pd.Series(X[var].value_counts()/np.float(len(X)))
        #frequent labels
        encoder_dict_[var] = list(t[t>tol].index)
    
    X = X.copy()
    for var in features:
        X[var] = np.where(X[var].isin(encoder_dict_[var]), X[var], "Rare")
    return X

### Categorical Encoder
## Label encoding based on frequnecy 
def categorical_encoder(X,y, features):
    temp = pd.concat([X,y],axis=1)
    temp.columns = list(X.columns)+['target']
    
    encoder_dict_={}
    for var in features:
        t=temp.groupby([var])['target'].mean().sort_values(ascending=True).index
        encoder_dict_[var] ={k: i for i,k in enumerate(t,0)}
    
    ## Encode variables
    for var in features:
        X[var] = X[var].map(encoder_dict_[var])
    
    return X

def log_transformer(X, features):
    for var in features:
        X[var] = np.log(X[var])
    
    return X

def drop_features(X,features):
    X = X.copy()
    X = X.drop(features, axis=1)
    return X

## Build Model

X1 = categorical_imputer(X_train, CATEGORICAL_VARS_WITH_NA)     
X1 = numerical_imputer(X1,NUMERICAL_VARS_WITH_NA)
X1=  temporal_variable(X1, TEMPORAL_VARS, DROP_FEATURES)  
X1= rare_label_encoder(X1,CATEGORICAL_VARS, 0.05)
X1 = categorical_encoder(X1, y_train,CATEGORICAL_VARS )
X1 = log_transformer(X1,NUMERICALS_LOG_VARS)
X1=drop_features(X1,DROP_FEATURES)
X1=drop_features(X1,['YearRemodAdd'])

from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler()
X2=scaler.fit_transform(X1)



model = Lasso(alpha=0.005, random_state=0)
model.fit(X2,y_train)

## Predict
X3 = categorical_imputer(X_test, CATEGORICAL_VARS_WITH_NA)     
X3 = numerical_imputer(X3,NUMERICAL_VARS_WITH_NA)
X3=  temporal_variable(X3, TEMPORAL_VARS, DROP_FEATURES)  
X3= rare_label_encoder(X3,CATEGORICAL_VARS, 0.05)
X3 = categorical_encoder(X3, y_train,CATEGORICAL_VARS )
X3 = log_transformer(X3,NUMERICALS_LOG_VARS)
X3=drop_features(X3,DROP_FEATURES)

scaler = MinMaxScaler()
X4=scaler.fit_transform(X3)

pred = model.predict(X4)

Pipeline Code: Converting each function to a sklearn object and putting them in a pipeline

from sklearn.pipeline import Pipeline

def read_data(path):
    train = pd.read_csv(path+'train.csv')
    test = pd.read_csv(path+'test.csv')
    return [train,test]

### Define Pipeline Classes for each operations
from sklearn.base import BaseEstimator, TransformerMixin


### Categorical Imputer

class CategoricalImputer(BaseEstimator, TransformerMixin):
    def __init__(self, variables=None):
        if not isinstance(variables,list):
            self.variables = [variables]
        else:
            self.variables = variables
    
    def fit(self, X:pd.DataFrame,y:pd.Series=None):
        return self
    
    def transform(self, X:pd.DataFrame):
        X=X.copy()
        for feature in self.variables:
            X[feature] = X[feature].fillna("Missing")
        return X
    


### Numerical Imputer
### Variables: NUMERICAL_VARS_WITH_NA

class NumericalImputer(BaseEstimator, TransformerMixin):
    def __init__(self, variables=None):
        if not isinstance(variables,list):
            self.variables = [variables]
        else:
            self.variables = variables
            
    def fit(self, X, y=None):
        #persist mode in a dictionary
        self.imputer_dict_ = {}
        for feature in self.variables:
            self.imputer_dict_[feature] = X[feature].mode()
        return self
    
    def transform(self, X):
        X=X.copy()
        for feature in self.variables:
            X[feature].fillna(self.imputer_dict_[feature], inplace=True)
        return X

### Temporal Variables

class TemporalVariableEstimator(BaseEstimator, TransformerMixin):
    def __init__(self, variables=None, reference_variable=None):
        if not isinstance(variables,list):
            self.variables = [variables]
        else:
            self.variables = variables
        self.reference_variable = reference_variable
        
    def fit(self, X,y=None):
        return self
    
    def transform(self,X):
        X=X.copy()
        for feature in self.variables:
            X[feature] = X[self.reference_variable]-X[feature]
        return X

### Rare label Categorical Imputer
class RareLabelCategoricalEncoder(BaseEstimator, TransformerMixin):
    """Rare label categorical encoder"""

    def __init__(self, tol=0.05, variables=None):
        self.tol = tol
        if not isinstance(variables, list):
            self.variables = [variables]
        else:
            self.variables = variables

    def fit(self, X, y=None):
        # persist frequent labels in dictionary
        self.encoder_dict_ = {}

        for var in self.variables:
            # the encoder will learn the most frequent categories
            t = pd.Series(X[var].value_counts() / np.float(len(X)))
            # frequent labels:
            self.encoder_dict_[var] = list(t[t >= self.tol].index)

        return self

    def transform(self, X):
        X = X.copy()
        for feature in self.variables:
            X[feature] = np.where(X[feature].isin(
                self.encoder_dict_[feature]), X[feature], 'Rare')

        return X


"""

### Categorical Encoder

class CategoricalEncoder(BaseEstimator, TransformerMixin):
    """String to numbers categorical encoder."""

    def __init__(self, variables=None):
        if not isinstance(variables, list):
            self.variables = [variables]
        else:
            self.variables = variables

    def fit(self, X, y):
        temp = pd.concat([X, y], axis=1)
        temp.columns = list(X.columns) + ['target']

        # persist transforming dictionary
        self.encoder_dict_ = {}

        for var in self.variables:
            t = temp.groupby([var])['target'].mean().sort_values(
                ascending=True).index
            self.encoder_dict_[var] = {k: i for i, k in enumerate(t, 0)}

        return self

    def transform(self, X):
        # encode labels
        X = X.copy()
        for feature in self.variables:
            X[feature] = X[feature].map(self.encoder_dict_[feature])
        return X

class LogTransformer(BaseEstimator, TransformerMixin):
    """Logarithm transformer."""

    def __init__(self, variables=None):
        if not isinstance(variables, list):
            self.variables = [variables]
        else:
            self.variables = variables

    def fit(self, X, y=None):
        # to accomodate the pipeline
        return self

    def transform(self, X):
        X = X.copy()

        for feature in self.variables:
            X[feature] = np.log(X[feature])

        return X

class DropUnecessaryFeatures(BaseEstimator, TransformerMixin):

    def __init__(self, variables_to_drop=None):
        self.variables = variables_to_drop

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        # encode labels
        X = X.copy()
        X = X.drop(self.variables, axis=1)

        return X
    




price_pipe = Pipeline(
    [
        ('categorical_imputer',
            CategoricalImputer(variables=CATEGORICAL_VARS_WITH_NA)),
        ('numerical_inputer',
            NumericalImputer(variables=NUMERICAL_VARS_WITH_NA)),
        ('temporal_variable',
            TemporalVariableEstimator(
                variables=TEMPORAL_VARS,
                reference_variable=DROP_FEATURES)),
        ('rare_label_encoder',
            RareLabelCategoricalEncoder(
                tol=0.01,
                variables=CATEGORICAL_VARS)),
        ('categorical_encoder',
            CategoricalEncoder(variables=CATEGORICAL_VARS)),
        ('drop_features',
            DropUnecessaryFeatures(variables_to_drop=DROP_FEATURES)),
         ('drop_features2',
            DropUnecessaryFeatures(variables_to_drop='YearRemodAdd')),
        ('scaler', MinMaxScaler()),
        ('Linear_model', Lasso(alpha=0.005, random_state=0))
    ]
)


#Save the pipeline
def save_pipeline(*, pipeline_to_persist) -> None:
    """Persist the pipeline.
    “””

    # Prepare versioned save file name
    save_file_name = 'priceprediction_pipeline.pkl'
    save_path = “path_to_save/”
    joblib.dump(pipeline_to_persist, save_path+save_file_name)


#Train the model
def run_training():    
    train,test = read_data(datapath)    
    X_train, X_test, y_train, y_test = train_test_split(train[FEATURES], train[TARGET],
                                                    test_size=0.1, random_state=42)

    y_train = np.log(y_train)
    y_test = np.log(y_test)
    
    price_pipe.fit(X_train[FEATURES],y_train)
    save_pipeline(pipeline_to_persist = price_pipe)
    
run_training()

I will provide link to github repo containing the entire codebase here which can be run directly. This will be covered in Part 2 of the article where I will write on deploying this code as a package so that anyone can install it and use it out of the box. Writing the code in a clean and proper manner is a key component of building a package and hence will be covered in detail in Part 2. Part 3: Building a Flask app using the package developed in part 2 to serve the model via a prediction API Part 4: Deploying the App (Heroku, Docker, AWS)

Written on September 27, 2019
]