Close Menu
    Trending
    • You’re Only Three Weeks Away From Reaching International Clients, Partners, and Customers
    • How Brain-Computer Interfaces Are Changing the Game | by Rahul Mishra | Coding Nexus | Jun, 2025
    • How Diverse Leadership Gives You a Big Competitive Advantage
    • Making Sense of Metrics in Recommender Systems | by George Perakis | Jun, 2025
    • AMD Announces New GPUs, Development Platform, Rack Scale Architecture
    • The Hidden Risk That Crashes Startups — Even the Profitable Ones
    • Systematic Hedging Of An Equity Portfolio With Short-Selling Strategies Based On The VIX | by Domenico D’Errico | Jun, 2025
    • AMD CEO Claims New AI Chips ‘Outperform’ Nvidia’s
    Finance StarGate
    • Home
    • Artificial Intelligence
    • AI Technology
    • Data Science
    • Machine Learning
    • Finance
    • Passive Income
    Finance StarGate
    Home»Machine Learning»How to use SageMaker Pipelines and AWS Batch for Asynchronous Distributed Data Processing | by Urmi Ghosh | Thomson Reuters Labs | Feb, 2025
    Machine Learning

    How to use SageMaker Pipelines and AWS Batch for Asynchronous Distributed Data Processing | by Urmi Ghosh | Thomson Reuters Labs | Feb, 2025

    FinanceStarGateBy FinanceStarGateFebruary 28, 2025No Comments10 Mins Read
    Share Facebook Twitter Pinterest LinkedIn Tumblr Reddit Telegram Email
    Share
    Facebook Twitter LinkedIn Pinterest Email


    Thomson Reuters Labs

    Amazon SageMaker Pipelines is a robust workflow orchestration software that permits machine studying (ML) practitioners to construct, execute, and monitor repeatable end-to-end automated ML workflows. Whereas SageMaker supplies varied built-in steps for various ML duties comparable to information processing, mannequin coaching, fine-tuning, analysis and deployment, in some situations, we’d need to leverage some exterior AWS companies to carry out specialised duties. For instance, AWS Batch is superior to AWS SageMaker for large-scale batch processing duties, providing extra flexibility and management over compute sources, price optimisation, and job orchestration. By integrating it with SageMaker Pipelines, we will utilise AWS Batch’s scalability and flexibility for processing duties that may be run asynchronously and independently.

    On this information, you’ll learn to :

    • Submit AWS Batch Jobs from a SageMaker Pipeline
    • Monitor AWS Batch Job Standing with Lambda
    • Course of Amazon SQS messages with Lambda
    • Deal with the Callback mechanism inside pipelines
    Determine 1: Structure for Sagemaker Pipelines integration with AWS Batch

    So as to observe by way of this information, it’s essential have a fundamental understanding of the next extra AWS companies:

    • Amazon SQS: A completely managed message queue service that permits you to ship, retailer, and obtain messages between software program parts. It is going to be used as the important thing connecting element between AWS Batch and SageMaker Pipelines.
    • AWS Lambda: A serverless compute service that runs code in response to occasions and may robotically handle the compute sources. On this context, we use lambda capabilities to carry out a number of duties comparable to triggering batch jobs, monitoring the standing of the roles in addition to dealing with messages from the SQS queue.

    Please be certain that along with the usual SageMaker permissions, your IAM Function has the next permissions:

    • to create an SQS queue
    • to create a lambda operate
    • submit Batch Jobs

    We’ll create one other lambda function that may:

    • learn from the queue to then return the outcomes to SageMaker
    • fetch Batch Job standing

    For Batch Job function, we want:

    • to have the ability to learn and write to S3

    ⚠️ NOTE: To ensure that Lambda to correctly assume your execution function, the function’s belief coverage should specify lambda.amazonaws.com as a trusted service.

    ⚠️ NOTE: In a manufacturing setting, we might want to create totally different IAM Roles for the totally different duties and observe the precept of least priviledge by granting solely permissions which might be wanted as talked about above.

    Nonetheless, for the aim of the tutorial, we will create an IAM Function from the IAM console and add the next AWS Managed insurance policies:

    💡 Giving Full Entry allows us to have the ability to delete the sources afterward.

    It’s assumed that you have already got an current pipeline in SageMaker that must be prolonged to incorporate AWS Batch jobs in the identical orchestration. For an in depth tutorial on Sagemaker Pipelines, you may check with the documentation here.

    For the tutorial, we’ll create a easy AWS Batch job to repeat information from one location on s3 to a different. To try this, observe the steps under:

    1. Create a Compute Environment:

    Within the AWS Batch console, we first have to create a compute setting. We are able to select between a managed setting, the place AWS handles provisioning and scaling, or an unmanaged setting, the place we handle the cases.

    2. Create a Job Queue:

    Subsequent, we arrange a job queue to submit the roles and affiliate this queue with the compute setting. The job queue determines the precedence and order of job execution.

    3. Define Job Definitions:

    The following step is to create job definitions that specify how the roles ought to run. This consists of configuration of the Docker picture, instructions run by the container, required vCPUs and reminiscence, information volumes, setting variables, and IAM roles.
    You may as well use the code snippet under to register the job definition.

    import boto3
    consumer = boto3.consumer("batch")
    container_properties = {
    "picture": "public.ecr.aws/aws-cli/aws-cli:newest",
    "command": [
    "s3", "cp", "Ref::s3Input", "Ref::s3Output",
    ],
    "jobRoleArn": "",
    "executionRoleArn": "",
    "resourceRequirements": [
    {
    "value": "1.0", "type": "VCPU"
    },
    {
    "value": "2048", "type": "MEMORY"
    }
    ],
    "fargatePlatformConfiguration": {"platformVersion": "LATEST"},
    "runtimePlatform": {
    "operatingSystemFamily": "LINUX",
    "cpuArchitecture": "X86_64"
    },
    "networkConfiguration": {
    "assignPublicIp": "ENABLED"
    },
    }
    tags = {
    # Add tags you need to affiliate with the roles
    # The tags can embody info on
    # Useful resource homeowners, monetary ids, mission ids and many others.
    }
    outcome = consumer.register_job_definition(
    jobDefinitionName=,
    kind="container",
    containerProperties=container_properties,
    tags=tags,
    propagateTags=True,
    platformCapabilities=["FARGATE"],
    )

    4. Submit Jobs:

    Lastly, we have to submit the roles to the job queue. We are able to achieve this through the use of the AWS Administration Console, AWS CLI, or AWS SDKs. On this tutorial, job submission is completed by a lambda and we’ll be taught extra about it within the subsequent part.

    NOTE: For extra complicated batch jobs, you may check with the detailed tutorial here.

    The working of the CallBack mechanism is illustrated within the determine under.

    Determine 2: CallBack mechanism used for AWS Batch integration in Sagemaker Pipelines

    For the pipeline integration, we first have to create the next sources earlier than we will embody them within the pipeline:

    1. Create a Lambda (to set off AWS Batch Jobs)

    So as to add the pliability of connecting to companies exterior of Sagemaker, we make use of a LambdaStep within the pipeline. A Lambda Step runs an AWS Lambda operate that can be utilized to name any piece of customized code.

    In our case, the lambda operate ought to:

    • submit the AWS Batch jobs
    • return the job IDs of all of the triggered jobs

    You may adapt the code snippet under to your lambda.

    import boto3

    def submit_job(job_definition, s3_input_path, s3_output_path, job_name_prefix):
    consumer = boto3.consumer("batch")
    job_name = f"{job_name_prefix}_my_batch_job"
    response = consumer.submit_job(
    jobDefinition="",
    jobName=job_name,
    jobQueue="",
    parameters={
    "s3Input": s3_input_path,
    "s3Output": s3_output_path,
    },
    )
    return response["jobId"]

    def lambda_handler(occasion, context):
    s3_input_uri = occasion["input_uri"]
    s3_output_uri = occasion["output_uri"]
    all_job_ids = []
    for i in vary(1, occasion["max_jobs"]+1):
    batch_num = "{:02d}".format(i)
    s3_base_input_path = f"{s3_input_uri}/file_{batch_num}.json"
    s3_base_output_path = f"{s3_output_uri}/output_{batch_num}.json"

    job_id = submit_job(
    job_definition="",
    s3_input_path=s3_base_input_path,
    s3_output_path=s3_base_output_path,
    job_name_prefix=f"sample-aws-batch-{batch_num}",
    )
    all_job_ids.append(job_id)

    return {"statusCode": 200, "jobIds": " ".be part of(all_job_ids)}

    Subsequent, we want a performance within the pipeline to pause, till the batch jobs end operating and both resume or cease the pipeline execution primarily based on the standing of the roles. That is achieved by including a CallBackStep within the pipeline. For the Callback mechanism, we have to create some extra AWS sources.

    2. Create SQS Queue (for CallBack communication)

    An SQS Queue is utilized by the CallBackStep of the pipeline to ship a message containing a singular token and a customized listing of enter parameters. The pipeline execution pauses until the message within the queue is consumed (by any service, for eg: a lambda) and waits till it’s responded to.

    You may create an SQS Queue from the console or you may even use AWS SDK for Python (Boto3) as within the following code snippet.

    sqs_client = boto3.consumer("sqs")
    sqs_client.create_queue(
    QueueName=="",
    Attributes={
    "VisibilityTimeout": "900", # 15 minutes ready time for the msg to be processed. Please edit as required.
    },
    )
    queue_url = sqs_client.get_queue_url(QueueName=queue_name)["QueueUrl"]
    queue_arn = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=["QueueArn"])["Attributes"]["QueueArn"]
    print("Created queue arn: {queue_arn}")

    We are able to set the worth of the visibility timeout of the queue to an applicable worth (0–12h) primarily based on how lengthy we count on the batch job to run. The visibility timeout parameter, makes the message invisible to different shoppers for the set time interval. This successfully enforces a “ready” time interval earlier than the message is processed. You may learn extra concerning the totally different configuration settings for the queue here.

    3. Create a Lambda (to deal with messages in queue, monitor job standing)

    Subsequent, we have to create one other lambda that may learn the messages from the queue, fetch the standing of all of the batch jobs IDs acquired within the message and take one of many following actions primarily based on the job standing:

    • In Progress — If any of the roles remains to be operating, the lambda code raises an error, which causes the incoming SQS message to be despatched again to the queue (as if the “shopper” had failed); which means as soon as the message from SQS turns into seen once more (which occurs when the queue visibility timeout is lastly reached), the identical lambda can be triggered once more and fetch the job standing. This acts like an event-based monitoring loop that runs whereas there are jobs nonetheless “in progress”.
    • Failed — If any of the roles are in a Failed state, the lambda code calls the SendPipelineExecutionStepFailure API which makes the callback step fail and the pipeline execution stops.
    • Accomplished — If all the roles are profitable, the lambda code calls the SendPipelineExecutionStepSuccess API and the callback step execution is accomplished and the pipeline execution continues.

    Learn extra about AWS Batch Job states here.

    def _check_job_status(occasion, sm_client, batch_client):
    for document in occasion["Records"]:
    payload = json.hundreds(document["body"])
    token = payload["token"]
    job_ids = payload["arguments"]["job_ids"].break up()
    job_status_list = []
    print("Checking standing of the next batch jobs with Ids:")
    print(", ".be part of(job_ids))
    response = batch_client.describe_jobs(jobs=job_ids)
    for job_obj in response["jobs"]:
    print(f"Standing of batch job: {job_obj['jobName']} - {job_obj['status']}")
    if job_obj["status"] in ["FAILED"]:
    print(f"A minimum of one job ({job_obj['jobName']}) failed.")
    # Stops Pipeline Execution
    sm_client.send_pipeline_execution_step_failure(
    CallbackToken=token, FailureReason=response["statusReason"]
    )
    if job_obj["status"] in ["SUBMITTED", "PENDING", "RUNNABLE", "STARTING", "RUNNING"]:
    # Jobs are nonetheless operating
    # Put the token again to the queue
    print("Jobs should not accomplished...")
    increase RuntimeError("Jobs should not accomplished...")
    else:
    job_status_list.append(job_obj["status"])
    if all(x == "SUCCEEDED" for x in job_status_list): # sanity examine
    # All Jobs ran efficiently
    print("All jobs are accomplished.")
    sm_client.send_pipeline_execution_step_success(
    CallbackToken=token, OutputParameters=[{"Name": "job_status", "Value": "Completed"}]
    )
    return response

    def handler(occasion, context):
    """Principal lambda entrypoint."""
    sm_client = boto3.consumer("sagemaker")
    batch_client = boto3.consumer("batch")
    if "Data" in occasion:
    _check_job_status(occasion, sm_client, batch_client)
    return {"statusCode": 200}

    4. Create Occasion Supply Mapping

    Lastly, we have to create a hyperlink between the lambda and SQS queue through an occasion supply mapping in order that the lambda operate to watch job standing is triggered on arrival of a brand new message to the SQS queue.

    # Create Occasion Supply Mapping with Lambda to examine job standing and deal with sqs messages

    lambda_client = boto3.consumer("lambda")
    event_source_mapping = lambda_client.create_event_source_mapping(
    EventSourceArn=””, FunctionName=“”
    , Enabled=True
    )
    print(f'Mapping Lambda operate: {lambda_fn_name} and SQS queue: {queue_name} by way of UUID: {event_source_mapping["UUID"]}')

    You may learn extra about configuring an Amazon SQS queue to set off an AWS Lambda operate here.

    5. Construct and Execute Pipeline

    We now have the required parts to construct and lengthen a SageMaker Pipeline.

    The next code snippet creates the LambdaStep and CallbackStep wanted for the combination.

    from sagemaker.workflow.lambda_step import LambdaStep, LambdaOutput, LambdaOutputTypeEnum 

    status_code = LambdaOutput(output_name="statusCode" output_type=LambdaOutputTypeEnum.String)
    job_ids = LambdaOutput(output_name="jobIds" output_type=LambdaOutputTypeEnum.String)

    step_aws_batch_lambda = LambdaStep(
    title=,
    lambda_func=,
    inputs={
    "input_uri": ,
    "output_uri": ,
    },
    outputs=[status_code, job_ids],
    depends_on=[],
    )

    step_aws_batch_call_back = CallbackStep(
    title=self.callback_name,
    sqs_queue_url=self.sqs_queue_url,
    inputs={
    "job_ids": step_aws_batch_lambda.properties.Out["jobIds"]
    },
    outputs=[
    CallbackOutput(output_name="job_status", output_type=CallbackOutputTypeEnum.String),
    ],
    )

    Subsequent, we will simply add the lambda and callback steps within the pipeline. The order of execution may be explicitly set when defining the steps or be decided by the information stream. On this case, the jobIds are handed from step_aws_batch_lambda to step_aws_batch_call_back and therefore determines the order of execution.

    from sagemaker.workflow.pipeline import Pipeline

    pipeline = Pipeline(
    title=,
    steps=[,
    step_aws_batch_lambda,
    step_aws_batch_call_back,

    ],
    )

    Lastly, we will deploy and begin the pipeline execution as follows:

    pipeline.upsert(role_arn=function) # Go the iam function for pipeline execution together with the extra insurance policies
    execution = pipeline.begin()

    In the event you head over to the SageMaker Studio, and click on on the Pipelines within the left menu, you may try the present execution standing in addition to monitor the progress of every step by clicking on it.

    Determine 3: SageMaker Studio UI for Pipelines

    This integration provides complexity to the pipeline definition because it requires extra parts like Lambda, SQS, other than relying on different exterior AWS Batch sources. Making certain strong monitoring and failure dealing with for the AWS Batch jobs may be difficult.

    Whereas establishing retries for a LambdaStep will not be doable in SageMaker, we will achieve this immediately in AWS Batch job definition itself and may mitigate a few of the errors that may happen primarily based on the exit code of the job script or because of cases shutting down.

    Learn extra about automated job retries on AWS Batch here.

    Integrating AWS Batch with SageMaker Pipelines utilizing a mix of Lambda and Callback steps supplies a versatile approach to lengthen the capabilities of your machine studying workflows. Whereas this strategy introduces extra complexity, it’s extremely useful for duties requiring heavy computational workloads or customized processing steps. By understanding the design paradigm, advantages and customary challenges, you may design a sturdy and scalable answer tailor-made to your particular wants. You should use the identical design paradigm to combine different exterior jobs and approval capabilities with Pipelines.

    💬 Let the Dialog Start

    Have you ever used related methods to allow distributed information processing? Let me know within the feedback, or submit over in our LinkedIn Group.



    Source link

    Share. Facebook Twitter Pinterest LinkedIn Tumblr Email
    Previous ArticleData Privacy Compliance Checklist for AI Projects
    Next Article Announcing the Towards Data Science Author Payment Program
    FinanceStarGate

    Related Posts

    Machine Learning

    How Brain-Computer Interfaces Are Changing the Game | by Rahul Mishra | Coding Nexus | Jun, 2025

    June 14, 2025
    Machine Learning

    Making Sense of Metrics in Recommender Systems | by George Perakis | Jun, 2025

    June 14, 2025
    Machine Learning

    Systematic Hedging Of An Equity Portfolio With Short-Selling Strategies Based On The VIX | by Domenico D’Errico | Jun, 2025

    June 14, 2025
    Add A Comment

    Comments are closed.

    Top Posts

    Can AI Truly Develop a Memory That Adapts Like Ours?

    June 12, 2025

    AI Enhances Social Media Engagement with Smart Scheduling By Daniel Reitberg | by Daniel David Reitberg | Feb, 2025

    February 25, 2025

    5 Digital Marketing Statistics to Improve Your Law Firm’s Strategy in 2025

    February 5, 2025

    Deep Dive into WebSockets and Their Role in Client-Server Communication

    February 5, 2025

    Time Series Forecasting Made Simple (Part 1): Decomposition and Baseline Models

    April 9, 2025
    Categories
    • AI Technology
    • Artificial Intelligence
    • Data Science
    • Finance
    • Machine Learning
    • Passive Income
    Most Popular

    How I Automated My Machine Learning Workflow with Just 10 Lines of Python

    June 6, 2025

    Making airfield assessments automatic, remote, and safe | MIT News

    March 13, 2025

    Expanding robot perception | MIT News

    February 6, 2025
    Our Picks

    Introduction To Linear Regression | by Jeet Mukherjee | May, 2025

    May 20, 2025

    How DeepSeek ripped up the AI playbook—and why everyone’s going to follow it

    February 1, 2025

    Inside the tedious effort to tally AI’s energy appetite

    June 3, 2025
    Categories
    • AI Technology
    • Artificial Intelligence
    • Data Science
    • Finance
    • Machine Learning
    • Passive Income
    • Privacy Policy
    • Disclaimer
    • Terms and Conditions
    • About us
    • Contact us
    Copyright © 2025 Financestargate.com All Rights Reserved.

    Type above and press Enter to search. Press Esc to cancel.