CI/CD for Multi-Model Endpoints in AWS | by Andrew Charabin | Jun, 2023

1. Custom SageMaker Studio image for PostgreSQL querying

While SageMaker pipelines allows input data from s3, what if new input data resides in a data warehouse like AWS Redshift or Google BigQuery? Of course, an ETL or comparable process can be used to move data to s3 in batches, but that simply adds unnecessary complexity/rigidity in comparison to querying the data directly from the data warehouse in the pipeline.

SageMaker Studio provides several default images to initialize an environment, one example being ‘Data Science’ which includes common packages like numpy and pandas. However, to connect to a PostgreSQL database in Python, a driver or adapter is required. Psycopg2 is the most popular PostgreSQL database adapter for the Python programming language. Fortunately, custom images can be used to initialize a Studio environment although there are specific requirements. I’ve prepackaged a Docker image that meets these requirements and build on top of the Python Julia-1.5.2 image by adding the psycopg2 driver. The image can be found in this git repository. The steps outlined here can then be used to make the image accessible in a Studio domain.

2. Dynamic warm start hyperparameter tuning

Model retraining is different in nature than initial model training. It isn’t practical to invest the same amount of resources to search for the best model hyperparameters and over the same large search space when retraining a model. This is especially true when only minor adjustments to the best hyperparameters are expected from the last production model.

For this reason, the hyperparameter tuning solution recommended for CI/CD in this article doesn’t try to blitz retuning with K fold cross-validation, warm pools, etc. All that can work great for an initial model training. For retraining, however, we want to start with what worked great in production already, and make small adjustments to account for newly available data. As such, using warm start hyperparameter tuning is the perfect solution. Going further, a dynamic warm start tuning system can be created that uses the latest production tuning job as the parent. The solution can look as follows for an example XGBoost baysian tuning job:

# Set Run Parameters


# Set Max Jobs

if testing==False: max_jobs=hyperparam_jobs
else: max_jobs=1

# Load Packages

from sagemaker.xgboost.estimator import XGBoost
from sagemaker.tuner import IntegerParameter
from sagemaker.tuner import ContinuousParameter
from sagemaker.tuner import HyperparameterTuner
from sagemaker.tuner import WarmStartConfig, WarmStartTypes

# Configure Warm Start

# Can be up to 5, but currently only a value of 1 is supported in the code
# Note base_dir needs to be set, can also be set blank

try: eligible_parent_tuning_jobs=pd.read_csv(f"""{base_dir}logs/tuningjobhistory.csv""")
if eligible_parent_tuning_jobs_count>0:
warm_start_config = WarmStartConfig(
WarmStartTypes.TRANSFER_LEARNING, parents={parent_tuning_jobs})
# Note that WarmStartTypes.IDENTICAL_DATA_AND_ALGORITHM can be used when applicable

print(f"""Warm starting using tuning job: {parent_tuning_jobs[0]}""")

else: warm_start_config = None

# Define exploration boundaries (default suggested values from Amazon SageMaker Documentation)

hyperparameter_ranges = {
'eta': ContinuousParameter(0.1, 0.5, scaling_type='Logarithmic'),
'max_depth': IntegerParameter(0,10,scaling_type='Auto'),
'num_round': IntegerParameter(1,4000,scaling_type='Auto'),
'subsample': ContinuousParameter(0.5,1,scaling_type='Logarithmic'),
'colsample_bylevel': ContinuousParameter(0.1, 1,scaling_type="Logarithmic"),
'colsample_bytree': ContinuousParameter(0.5, 1, scaling_type='Logarithmic'),
'alpha': ContinuousParameter(0, 1000, scaling_type="Auto"),
'lambda': ContinuousParameter(0,100,scaling_type='Auto'),
'max_delta_step': IntegerParameter(0,10,scaling_type='Auto'),
'min_child_weight': ContinuousParameter(0,10,scaling_type='Auto'),
'gamma':ContinuousParameter(0, 5, scaling_type='Auto'),
tuner_log = HyperparameterTuner(

# Note a SageMaker XGBoost estimater needs to be instantiated in advance
training_input_config = sagemaker.TrainingInput("s3://{}/{}/{}".format(bucket,prefix,filename), content_type='csv')
validation_input_config = sagemaker.TrainingInput("s3://{}/{}/{}".format(bucket,prefix,filename), content_type='csv')

# Note bucket, prefix, and filename objects/aliases need to be set

# Starts the hyperparameter tuning job{'train': training_input_config, 'validation': validation_input_config})

# Prints the status of the latest hyperparameter tuning job


Tuning job history will be saved in a log file in the base directory, with example output as follows:

Chart by author

The date/time stamp as well as the name of the tuning job and metadata are stored in .csv format, with new tuning jobs being appended to the file.

The system will dynamically warm start using the latest tuning job that meets required conditions. In this example the conditions are noted in the following line of code:


Because we’ll want to test the pipeline works, a testing=True run option is available that forces only one hyperparameter tuning job. A condition is added to only consider jobs with only 1 tuned model as parents, given these jobs were for testing. Furthermore, the tuning job log file can be used across different models, as one could in theory use a parent job across models. In this case the model is tracked with the ‘metric’ field, and eligible tuning jobs are filtered to match the metric in the current training instance.

Once the retraining has been done, we’ll then append the log file with the new hyperparameter tuning job and write it locally as well as to s3 with versioning turned on.

# Append Last Parent Job for Next Warm Start

updatetuningjobhistory=pd.concat([eligible_parent_tuning_jobs,pd.DataFrame({'datetime':["%Y/%m/%d %H:%M:%S")],'tuningjob':[latest_tuning_job['HyperParameterTuningJobName']],'metric':[metric],'layer':prefix,'objective':[trainingobjective],'eval_metric':[latest_tuning_job['BestTrainingJob']['FinalHyperParameterTuningJobObjectiveMetric']['MetricName']],'eval_metric_value':latest_tuning_job['BestTrainingJob']['FinalHyperParameterTuningJobObjectiveMetric']['Value'],'trainingjobcount':[latest_tuning_job['HyperParameterTuningJobConfig']['ResourceLimits']['MaxNumberOfTrainingJobs']]})],axis=0)

# Write locally


# Upload to s3


3. Register multiple models to the Model Registry in a single interactive python notebook

Often, organizations will have multiple AWS accounts for different use cases (i.e. sandbox, QA, and production). You’ll need to determine which account to use for each for each step of the CI/CD solution, then add the cross-account permissions noted in this guide.

The recommendation is to perform model training and model registration in the same account, specifically a sandbox or testing account. So in the below chart the ‘Data Science’ and ‘Shared Services’ account will be the same. Within this account an s3 bucket will be needed to house model artifacts and track lineage on other files related to the pipeline. Models/endpoints will be deployed separately within each ‘deployment’ account (i.e. sandbox, QA, production) by referencing the model artifacts and the registry in the training/registration account.

Chart from AWS Documentation

Now that we’ve decided which AWS account will be used for training and to house the model registry, we can now build an initial model and develop the CI/CD solution.

When using SageMaker Pipelines, separate pipeline steps are created for data preprocessing, training/tuning, evaluation, registration, and any post-processing. While that’s fine for a single model pipeline, it creates a lot of pipeline code duplicity when there are multiple models required for a machine learning solution.

As a result, the recommended solution is instead to build and schedule three interactive python notebooks in SageMaker Studio. They are run in sequence and together accomplish the CI/CD pipeline once automated with a notebook job:

A. Data preparation

B. Model training, evaluation, and registration

C. Endpoint refresh with the latest approved models

A. Data preparation

Here we will query and load data from the data warehouse and write it locally and to s3. We can set dynamic date/time conditions using the current date and pass the resulting date floor and ceiling into the SQL query.

# Connect to Data Warehouse

dbname='<insert here>'
host='<insert here>'
password='<insert here>'
port='<insert here>'
search_path='<insert here>'
user='<insert here>'

import psycopg2
data_warehouse= psycopg2.connect(f"""host={host} port={port} dbname={dbname} user={user} password={password} options = '-c search_path={search_path}'""")

# Set Dataset Date Floor and Ceiling Applied to Pass in & Apply to Query

datestart=date(2000, 1, 1)
pushbackdays=30 - timedelta(days=pushbackdays)

# Query data warehouse

modelbuildingset=pd.read_sql_query(f"""<insert query>""",data_warehouse)

# Write .csv

modelbuildingset.to_csv(f"{base_dir}datasets/{filename}", index=False)

# Upload to s3 for Lineage Tracking

s3 = boto3.client('s3')

This step ends with saving the prepared data for training locally as well as in s3 for lineage tracking.

B. Model training, evaluation, and registration

By using an interactive python notebook in Studio, we can now complete model training, evaluation, and registration all in one notebook. All these steps can be built into a function and that is applied for additional models that need to be retrained. For illustrative purposes, the code has been provided without using a function.

Prior to proceeding, model package groups need to be created in the Registry (either in the console or via Python) for each model that’s part of the solution.

# Get the Best Training Job

best_overall_training_job_name = latest_tuning_job['BestTrainingJob']['TrainingJobName']

# latest_tuning_job was obtained from the hyperparameter tuning section

# Install XGBoost

! pip install xgboost

# Download the Best Model

s3 = boto3.client('s3')
s3.download_file('<s3 bucket>', f"""output/{best_overall_training_job_name}/output/model.tar.gz""", f"""{base_dir}models/{metric}/model.tar.gz""")

# Open and Load the Downloaded Model Artifact in Memory

tar ="""{base_dir}models/{metric}/model.tar.gz""")
model = pkl.load(open(f"""{base_dir}models/{layer}/{metric}/xgboost-model""", 'rb'))

# Perform Model Evaluation

import json
import pathlib
import joblib
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
import math
evaluationset['prediction']=model.predict(xgboost.DMatrix(evaluationset.drop(evaluationset.columns.values[0], axis=1), label=evaluationset[[evaluationset.columns.values[0]]]))

# In the Example a Regression Problem is Used with MAE & RMSE as Eval Metrics

mae = mean_absolute_error(evaluationset[evaluationset.columns.values[0]], evaluationset['prediction'])
rmse = math.sqrt(mean_squared_error(evaluationset[evaluationset.columns.values[0]], evaluationset['prediction']))
stdev_error = np.std(evaluationset[evaluationset.columns.values[0]] - evaluationset['prediction'])
evaluation_report=pd.DataFrame({'datetime':["%Y/%m/%d %H:%M:%S")], 'testing':[testing], 'trainingjob': [best_overall_training_job_name], 'objective':[trainingobjective], 'hyperparameter_tuning_metric':[objective_metric_name], 'mae':[mae], 'rmse':[rmse], 'stdev_error':[stdev_error]})

# Load Past Evaluation Reports

try: past_evaluation_reports=pd.read_csv(f"""{base_dir}models/{metric}/evaluationhistory.csv""")
except: past_evaluation_reports=pd.DataFrame({'datetime':[],'testing':[], 'trainingjob': [], 'objective':[], 'hyperparameter_tuning_metric':[], 'mae':[], 'rmse':[], 'stdev_error':[]})

# Write .csv


# Write to s3

s3.upload_file(f"""{base_dir}models/{metric}/evaluationhistory.csv""",'<s3 bucket>',f"""{layer}/{metric}/evaluationhistory.csv""")

# Note Can Also Associate a Registered Model with Eval Metrics, But Will Skip it Here

report_dict = {}

# Register Model

modelpackage_inference_specification = {
"InferenceSpecification": {
"Containers": [
"Image": xgboost_container,
"ModelDataUrl": f"""s3://{s3 bucket}/output/{best_overall_training_job_name}/output/model.tar.gz"""
"SupportedContentTypes": [ "text/csv" ],
"SupportedResponseMIMETypes": [ "text/csv" ],
create_model_package_input_dict = {
"ModelPackageGroupName" : model_package_group_name,
"ModelPackageDescription" : "<insert description here>",
"ModelApprovalStatus" : "PendingManualApproval",
"ModelMetrics" :report_dict
sm_client = boto3.client('sagemaker')
create_model_package_response = sm_client.create_model_package(**create_model_package_input_dict)
model_package_arn = create_model_package_response["ModelPackageArn"]
print('ModelPackage Version ARN : {}'.format(model_package_arn))

By opening the model package group in the registry, you can see all the model versions that have been registered, the date they were registered, and their approval status.

Chart from AWS Documentation

The supervisor of the pipeline can then review the evaluation report saved locally in the previous step, which contains the history of all past model evaluations, and determine if they’d like to approve or deny the model based on the testing set evaluation metrics. Later on, criteria can be set to only update production (or QA) endpoints with the latest model if it was approved.

4. Refreshing a multi-model endpoint with new models

SageMaker has a MultiDataModel class that allows deploying SageMaker endpoints that can host more than one model. The rationale is that multiple models can be loaded in the same compute instance, sharing resources and saving costs. Furthermore, it simplifies model retraining/admin as only one endpoint needs to be reflected with the new models and managed, vs. having to duplicate steps across each dedicated endpoint (which can be done as an alternative). The MultiDataModel class can also be used to deploy a single model, which could make sense if there are plans to add additional models to the solution in the future.

We’ll need to create the model and endpoint on the first go in the training account. The MultiDataModel class requires a location to store model artifacts that can be loaded into the endpoint when they are invoked; below we’ll used the ‘model’ directory in the s3 bucket being used.

# Load Container

from sagemaker.xgboost.estimator import XGBoost
xgboost_container = sagemaker.image_uris.retrieve("xgboost", region, "1.2-2")

# One Time: Build Multi Model

estimator = sagemaker.estimator.Estimator.attach('sagemaker-xgboost-220611-1453-011-699894eb')
xgboost_container = sagemaker.image_uris.retrieve("xgboost", region, "1.2-2")
model = estimator.create_model(role=role, image_uri=xgboost_container)
from sagemaker.multidatamodel import MultiDataModel

# This is where our MME will read models from on S3.

model_data_prefix = f"s3://{bucket}/models/"
mme = MultiDataModel(
model=model, # passing our model - passes container image needed for the endpoint

# One Time: Deploy the MME

ENDPOINT_NAME = "<insert here>"
predictor = mme.deploy(
initial_instance_count=1, instance_type=ENDPOINT_INSTANCE_TYPE, endpoint_name=ENDPOINT_NAME,kms_key='<insert here if desired>'

After that, the MultiDataModel can be referenced as follows:


from sagemaker.multidatamodel import MultiDataModel

# This is where our MME will read models from on S3.
model_data_prefix = f"s3://{bucket}/models/"

mme = MultiDataModel(
model=model, # passing our model - passes container image needed for the endpoint

Models can be added to the MultiDataModel by copying the artifact to the {s3 bucket}/models directory which the endpoint will use to load models. All we need is the model package group name and the Model Registry will provide the respective source artifact location and approval status.

We can add a condition to only add the latest model if it’s approved, illustrated below. This condition may be omitted in the sandbox account in case an immediate deploy is needed for data science QA and to ultimately approve the model.

# Get the latest model version and associated artifact location for a given model package group

ModelPackageGroup = 'model_package_group'

list_model_packages_response = client.list_model_packages(ModelPackageGroupName=f"arn:aws:sagemaker:{region}:{aws_account_id}:model-package-group/{ModelPackageGroup}")

latest_model_version_arn = list_model_packages_response["ModelPackageSummaryList"][0][



# Add model if approved

if list_model_packages_response["ModelPackageSummaryList"][0]['ModelApprovalStatus']=="Approved":
mme.add_model(model_data_source=artifact_path, model_data_path=model_artifact_name)

We can then list the models that have been added with the following function:


# Output we'd see if we added the following two models

To remove a model, one can navigate to the associated s3 directory in the console and delete any of them; they will be gone when relisting the available models.

A model can be invoked in the deployed endpoint once it’s been added by using the following code:

response = runtime_sagemaker_client.invoke_endpoint(
EndpointName = "<endpoint_name>",
ContentType = "text/csv",
TargetModel = "<model_name>.tar.gz",
Body = body)

Upon the first invocation of a model, the endpoint will load the target model, resulting in additional latency. For future invocations where the model is already loaded, inferences will be obtained immediately. In the multi-model endpoint developer guide, AWS notes that models which that haven’t been invoked recently will be ‘unloaded’ when the endpoint reaches a memory utilization threshold. The models will then be reloaded upon their next invocation.

4. Refresh a multi-model endpoint with new models

When an existing model artifact is overwritten via mme.add_model() or in the s3 console, the deployed endpoint won’t be reflected immediately. To force the endpoint to reload the latest model artifacts upon their next invocation, we can use a trick of updating the endpoint with an arbitrary new endpoint configuration. This creates a new endpoint where models need to be loaded, and safely manages the transition between the old and new endpoint. Because each endpoint configuration requires a unique name, we can add a suffix with the date stamp.

# Get datetime for endpoint configuration


# Create new endpoint config in order to 'refresh' loaded models to account for new deployments
create_endpoint_config_api_response = client.create_endpoint_config(
EndpointConfigName=f"""<endpoint name>-{time}""",
'VariantName': model_name,
'ModelName': model_name,
'InitialInstanceCount': 1,
'InstanceType': instance_type

# Update endpoint with new config

response = client.update_endpoint(

Once this code is run, you’ll see that the associated endpoint will have an ‘updating’ status when viewing it in the console. During this updating period, the previous endpoint will be available for use, and it will be swapped with the new endpoint once it’s ready, after which the status will adjust to ‘in service.’ The new models added will then be loaded upon their next invocation.

We’ve now built out the three notebooks required for the CI/CD solution — data preparation, training/evaluation, and endpoint updating. However, these files are currently only in the training AWS account. We need to adapt the third notebook to work in any deployment AWS account where a respective endpoint will be created/updated.

To do this we can add conditional logic based on the AWS Account ID. s3 buckets will also be required in the new AWS accounts to house model artifacts. Since s3 bucket names need to be unique across AWS, such conditional logic can be used for this. It can also be applied to adjust the endpoint instance type and conditions for adding new models (i.e. approval status).

# Get AWS Account ID

aws_account_id = boto3.client("sts").get_caller_identity()["Account"]

# Set Bucket & Instance Type Across Accounts

if aws_account_id=='<insert AWS Account_ID 1>':
bucket='<insert s3 bucket name 1>'
elif aws_account_id=='<insert AWS Account_ID 2>':
bucket='<insert s3 bucket name 2>'
elif aws_account_id=='<insert AWS Account_ID 3>':
bucket='<insert s3 bucket name 3>'
training_account_bucket='<insert training account bucket name>'
bucket_path = 'https://s3-{}{}'.format(region,bucket)

The steps to initially create and deploy the MultiDataModel will need to be repeated in each new deployment account.

Now that we have one working notebook that references the AWS Account ID and can be run across different AWS accounts, we’ll want to set up a git repo that contains this notebook (and likely the other two for lineage tracking), then clone the repo in the SageMaker Studio domains of these accounts. Fortunately, with the Studio/Git integration these steps are straightforward/seamless and are outlined in the following document. Based on my experience, it’s recommended to create the repo outside of SageMaker Studio and clone it within each AWS account domain.

Any future changes to the notebooks can be done in the training account and pushed to the repo. They can then be reflected in the other deployment accounts by pulling in the changes. Make sure to create a .gitignore file so only the 3 notebooks are considered vs. any of the log or other files; lineage there will be tracked in s3. Furthermore, one should recognize that anytime a notebook is run the console output will change. To avoid conflicts when pulling the file changes in the other deployments accounts, any file changes since the last pull in these accounts should be restored prior to pulling the latest updates.

5. Schedule retrain/redeploy notebooks to run on a set cadence

Finally we can schedule all three notebooks to run concurrently in the training account. We can use the new SageMaker Studio notebook jobs feature to do this. The schedules should be considered as environment/account dependent — i.e. in the deployment accounts we can create separate notebook jobs but now just to update endpoints with the latest models, and provide some lag-time between when newly approved models are automatically deployed in the sandbox, QA, and production accounts. The beauty is that the only manual part of the process once the solution is released becomes model approval/denying in the registry. And if anything goes wrong with a newly deployed model, the model can be denied in the registry after which the endpoint update notebook can be manually run to revert to the previous production model version, buying time for further investigation. In this case, we set the pipeline to run on set time intervals (i.e. monthly/quarterly), although this solution can be adapted to work upon conditions (i.e. data drift or declining production model accuracy).

Source link

Leave a Comment