Amazon SageMaker Pipelines is a robust workflow orchestration software that permits machine studying (ML) practitioners to construct, execute, and monitor repeatable end-to-end automated ML workflows. Whereas SageMaker supplies varied built-in steps for various ML duties comparable to information processing, mannequin coaching, fine-tuning, analysis and deployment, in some situations, we’d need to leverage some exterior AWS companies to carry out specialised duties. For instance, AWS Batch is superior to AWS SageMaker for large-scale batch processing duties, providing extra flexibility and management over compute sources, price optimisation, and job orchestration. By integrating it with SageMaker Pipelines, we will utilise AWS Batch’s scalability and flexibility for processing duties that may be run asynchronously and independently.
On this information, you’ll learn to :
- Submit AWS Batch Jobs from a SageMaker Pipeline
- Monitor AWS Batch Job Standing with Lambda
- Course of Amazon SQS messages with Lambda
- Deal with the Callback mechanism inside pipelines
So as to observe by way of this information, it’s essential have a fundamental understanding of the next extra AWS companies:
- Amazon SQS: A completely managed message queue service that permits you to ship, retailer, and obtain messages between software program parts. It is going to be used as the important thing connecting element between AWS Batch and SageMaker Pipelines.
- AWS Lambda: A serverless compute service that runs code in response to occasions and may robotically handle the compute sources. On this context, we use lambda capabilities to carry out a number of duties comparable to triggering batch jobs, monitoring the standing of the roles in addition to dealing with messages from the SQS queue.
Please be certain that along with the usual SageMaker permissions, your IAM Function has the next permissions:
- to create an SQS queue
- to create a lambda operate
- submit Batch Jobs
We’ll create one other lambda function that may:
- learn from the queue to then return the outcomes to SageMaker
- fetch Batch Job standing
For Batch Job function, we want:
- to have the ability to learn and write to S3
⚠️ NOTE: To ensure that Lambda to correctly assume your execution function, the function’s belief coverage should specify lambda.amazonaws.com as a trusted service.
⚠️ NOTE: In a manufacturing setting, we might want to create totally different IAM Roles for the totally different duties and observe the precept of least priviledge by granting solely permissions which might be wanted as talked about above.
Nonetheless, for the aim of the tutorial, we will create an IAM Function from the IAM console and add the next AWS Managed insurance policies:
💡 Giving Full Entry allows us to have the ability to delete the sources afterward.
It’s assumed that you have already got an current pipeline in SageMaker that must be prolonged to incorporate AWS Batch jobs in the identical orchestration. For an in depth tutorial on Sagemaker Pipelines, you may check with the documentation here.
For the tutorial, we’ll create a easy AWS Batch job to repeat information from one location on s3 to a different. To try this, observe the steps under:
Within the AWS Batch console, we first have to create a compute setting. We are able to select between a managed setting, the place AWS handles provisioning and scaling, or an unmanaged setting, the place we handle the cases.
Subsequent, we arrange a job queue to submit the roles and affiliate this queue with the compute setting. The job queue determines the precedence and order of job execution.
The following step is to create job definitions that specify how the roles ought to run. This consists of configuration of the Docker picture, instructions run by the container, required vCPUs and reminiscence, information volumes, setting variables, and IAM roles.
You may as well use the code snippet under to register the job definition.
import boto3
consumer = boto3.consumer("batch")
container_properties = {
"picture": "public.ecr.aws/aws-cli/aws-cli:newest",
"command": [
"s3", "cp", "Ref::s3Input", "Ref::s3Output",
],
"jobRoleArn": "",
"executionRoleArn": "",
"resourceRequirements": [
{
"value": "1.0", "type": "VCPU"
},
{
"value": "2048", "type": "MEMORY"
}
],
"fargatePlatformConfiguration": {"platformVersion": "LATEST"},
"runtimePlatform": {
"operatingSystemFamily": "LINUX",
"cpuArchitecture": "X86_64"
},
"networkConfiguration": {
"assignPublicIp": "ENABLED"
},
}
tags = {
# Add tags you need to affiliate with the roles
# The tags can embody info on
# Useful resource homeowners, monetary ids, mission ids and many others.
}
outcome = consumer.register_job_definition(
jobDefinitionName=,
kind="container",
containerProperties=container_properties,
tags=tags,
propagateTags=True,
platformCapabilities=["FARGATE"],
)
4. Submit Jobs:
Lastly, we have to submit the roles to the job queue. We are able to achieve this through the use of the AWS Administration Console, AWS CLI, or AWS SDKs. On this tutorial, job submission is completed by a lambda and we’ll be taught extra about it within the subsequent part.
NOTE: For extra complicated batch jobs, you may check with the detailed tutorial here.
The working of the CallBack mechanism is illustrated within the determine under.
For the pipeline integration, we first have to create the next sources earlier than we will embody them within the pipeline:
1. Create a Lambda (to set off AWS Batch Jobs)
So as to add the pliability of connecting to companies exterior of Sagemaker, we make use of a LambdaStep within the pipeline. A Lambda Step runs an AWS Lambda operate that can be utilized to name any piece of customized code.
In our case, the lambda operate ought to:
- submit the AWS Batch jobs
- return the job IDs of all of the triggered jobs
You may adapt the code snippet under to your lambda.
import boto3def submit_job(job_definition, s3_input_path, s3_output_path, job_name_prefix):
consumer = boto3.consumer("batch")
job_name = f"{job_name_prefix}_my_batch_job"
response = consumer.submit_job(
jobDefinition="",
jobName=job_name,
jobQueue="",
parameters={
"s3Input": s3_input_path,
"s3Output": s3_output_path,
},
)
return response["jobId"]
def lambda_handler(occasion, context):
s3_input_uri = occasion["input_uri"]
s3_output_uri = occasion["output_uri"]
all_job_ids = []
for i in vary(1, occasion["max_jobs"]+1):
batch_num = "{:02d}".format(i)
s3_base_input_path = f"{s3_input_uri}/file_{batch_num}.json"
s3_base_output_path = f"{s3_output_uri}/output_{batch_num}.json"
job_id = submit_job(
job_definition="",
s3_input_path=s3_base_input_path,
s3_output_path=s3_base_output_path,
job_name_prefix=f"sample-aws-batch-{batch_num}",
)
all_job_ids.append(job_id)
return {"statusCode": 200, "jobIds": " ".be part of(all_job_ids)}
Subsequent, we want a performance within the pipeline to pause, till the batch jobs end operating and both resume or cease the pipeline execution primarily based on the standing of the roles. That is achieved by including a CallBackStep within the pipeline. For the Callback mechanism, we have to create some extra AWS sources.
2. Create SQS Queue (for CallBack communication)
An SQS Queue is utilized by the CallBackStep of the pipeline to ship a message containing a singular token and a customized listing of enter parameters. The pipeline execution pauses until the message within the queue is consumed (by any service, for eg: a lambda) and waits till it’s responded to.
You may create an SQS Queue from the console or you may even use AWS SDK for Python (Boto3) as within the following code snippet.
sqs_client = boto3.consumer("sqs")
sqs_client.create_queue(
QueueName=="",
Attributes={
"VisibilityTimeout": "900", # 15 minutes ready time for the msg to be processed. Please edit as required.
},
)
queue_url = sqs_client.get_queue_url(QueueName=queue_name)["QueueUrl"]
queue_arn = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])["Attributes"]["QueueArn"]
print("Created queue arn: {queue_arn}")
We are able to set the worth of the visibility timeout of the queue to an applicable worth (0–12h) primarily based on how lengthy we count on the batch job to run. The visibility timeout parameter, makes the message invisible to different shoppers for the set time interval. This successfully enforces a “ready” time interval earlier than the message is processed. You may learn extra concerning the totally different configuration settings for the queue here.
3. Create a Lambda (to deal with messages in queue, monitor job standing)
Subsequent, we have to create one other lambda that may learn the messages from the queue, fetch the standing of all of the batch jobs IDs acquired within the message and take one of many following actions primarily based on the job standing:
- In Progress — If any of the roles remains to be operating, the lambda code raises an error, which causes the incoming SQS message to be despatched again to the queue (as if the “shopper” had failed); which means as soon as the message from SQS turns into seen once more (which occurs when the queue visibility timeout is lastly reached), the identical lambda can be triggered once more and fetch the job standing. This acts like an event-based monitoring loop that runs whereas there are jobs nonetheless “in progress”.
- Failed — If any of the roles are in a Failed state, the lambda code calls the SendPipelineExecutionStepFailure API which makes the callback step fail and the pipeline execution stops.
- Accomplished — If all the roles are profitable, the lambda code calls the SendPipelineExecutionStepSuccess API and the callback step execution is accomplished and the pipeline execution continues.
Learn extra about AWS Batch Job states here.
def _check_job_status(occasion, sm_client, batch_client):
for document in occasion["Records"]:
payload = json.hundreds(document["body"])
token = payload["token"]
job_ids = payload["arguments"]["job_ids"].break up()
job_status_list = []
print("Checking standing of the next batch jobs with Ids:")
print(", ".be part of(job_ids))
response = batch_client.describe_jobs(jobs=job_ids)
for job_obj in response["jobs"]:
print(f"Standing of batch job: {job_obj['jobName']} - {job_obj['status']}")
if job_obj["status"] in ["FAILED"]:
print(f"A minimum of one job ({job_obj['jobName']}) failed.")
# Stops Pipeline Execution
sm_client.send_pipeline_execution_step_failure(
CallbackToken=token, FailureReason=response["statusReason"]
)
if job_obj["status"] in ["SUBMITTED", "PENDING", "RUNNABLE", "STARTING", "RUNNING"]:
# Jobs are nonetheless operating
# Put the token again to the queue
print("Jobs should not accomplished...")
increase RuntimeError("Jobs should not accomplished...")
else:
job_status_list.append(job_obj["status"])
if all(x == "SUCCEEDED" for x in job_status_list): # sanity examine
# All Jobs ran efficiently
print("All jobs are accomplished.")
sm_client.send_pipeline_execution_step_success(
CallbackToken=token, OutputParameters=[{"Name": "job_status", "Value": "Completed"}]
)
return responsedef handler(occasion, context):
"""Principal lambda entrypoint."""
sm_client = boto3.consumer("sagemaker")
batch_client = boto3.consumer("batch")
if "Data" in occasion:
_check_job_status(occasion, sm_client, batch_client)
return {"statusCode": 200}
4. Create Occasion Supply Mapping
Lastly, we have to create a hyperlink between the lambda and SQS queue through an occasion supply mapping in order that the lambda operate to watch job standing is triggered on arrival of a brand new message to the SQS queue.
# Create Occasion Supply Mapping with Lambda to examine job standing and deal with sqs messageslambda_client = boto3.consumer("lambda")
event_source_mapping = lambda_client.create_event_source_mapping(
EventSourceArn=””, FunctionName=“”
, Enabled=True
)
print(f'Mapping Lambda operate: {lambda_fn_name} and SQS queue: {queue_name} by way of UUID: {event_source_mapping["UUID"]}')
You may learn extra about configuring an Amazon SQS queue to set off an AWS Lambda operate here.
5. Construct and Execute Pipeline
We now have the required parts to construct and lengthen a SageMaker Pipeline.
The next code snippet creates the LambdaStep and CallbackStep wanted for the combination.
from sagemaker.workflow.lambda_step import LambdaStep, LambdaOutput, LambdaOutputTypeEnum status_code = LambdaOutput(output_name="statusCode" output_type=LambdaOutputTypeEnum.String)
job_ids = LambdaOutput(output_name="jobIds" output_type=LambdaOutputTypeEnum.String)
step_aws_batch_lambda = LambdaStep(
title=,
lambda_func=,
inputs={
"input_uri": ,
"output_uri": ,
},
outputs=[status_code, job_ids],
depends_on=[],
)
step_aws_batch_call_back = CallbackStep(
title=self.callback_name,
sqs_queue_url=self.sqs_queue_url,
inputs={
"job_ids": step_aws_batch_lambda.properties.Out["jobIds"]
},
outputs=[
CallbackOutput(output_name="job_status", output_type=CallbackOutputTypeEnum.String),
],
)
Subsequent, we will simply add the lambda and callback steps within the pipeline. The order of execution may be explicitly set when defining the steps or be decided by the information stream. On this case, the jobIds are handed from step_aws_batch_lambda to step_aws_batch_call_back and therefore determines the order of execution.
from sagemaker.workflow.pipeline import Pipelinepipeline = Pipeline(
title=,
steps=[,
step_aws_batch_lambda,
step_aws_batch_call_back,
],
)
Lastly, we will deploy and begin the pipeline execution as follows:
pipeline.upsert(role_arn=function) # Go the iam function for pipeline execution together with the extra insurance policies
execution = pipeline.begin()
In the event you head over to the SageMaker Studio, and click on on the Pipelines within the left menu, you may try the present execution standing in addition to monitor the progress of every step by clicking on it.
This integration provides complexity to the pipeline definition because it requires extra parts like Lambda, SQS, other than relying on different exterior AWS Batch sources. Making certain strong monitoring and failure dealing with for the AWS Batch jobs may be difficult.
Whereas establishing retries for a LambdaStep will not be doable in SageMaker, we will achieve this immediately in AWS Batch job definition itself and may mitigate a few of the errors that may happen primarily based on the exit code of the job script or because of cases shutting down.
Learn extra about automated job retries on AWS Batch here.
Integrating AWS Batch with SageMaker Pipelines utilizing a mix of Lambda and Callback steps supplies a versatile approach to lengthen the capabilities of your machine studying workflows. Whereas this strategy introduces extra complexity, it’s extremely useful for duties requiring heavy computational workloads or customized processing steps. By understanding the design paradigm, advantages and customary challenges, you may design a sturdy and scalable answer tailor-made to your particular wants. You should use the identical design paradigm to combine different exterior jobs and approval capabilities with Pipelines.
💬 Let the Dialog Start
Have you ever used related methods to allow distributed information processing? Let me know within the feedback, or submit over in our LinkedIn Group.