Linee guida e modelli di progettazione per la creazione di flussi di lavoro di machine learning con Amazon SageMaker Pipelines

Linee guida e modelli per flussi di lavoro di machine learning con Amazon SageMaker Pipelines

Amazon SageMaker Pipelines è un servizio completamente gestito di AWS per la creazione e l’orchestrazione di flussi di lavoro di apprendimento automatico (ML). SageMaker Pipelines offre agli sviluppatori di applicazioni ML la possibilità di orchestrare diversi passaggi del flusso di lavoro ML, inclusi il caricamento dei dati, la trasformazione dei dati, l’addestramento, l’ottimizzazione e il deployment. È possibile utilizzare SageMaker Pipelines per orchestrare i job ML in SageMaker e la sua integrazione con l’ecosistema più ampio di AWS consente anche di utilizzare risorse come le funzioni AWS Lambda, i job Amazon EMR e altro ancora. Ciò consente di creare una pipeline personalizzata e riproducibile per requisiti specifici nei flussi di lavoro ML.

In questo post, forniamo alcune best practice per massimizzare il valore di SageMaker Pipelines e rendere l’esperienza di sviluppo senza interruzioni. Discutiamo anche alcuni scenari e modelli di progettazione comuni durante la creazione di SageMaker Pipelines e forniamo esempi per affrontarli.

Best practice per SageMaker Pipelines

In questa sezione, discutiamo alcune best practice che possono essere seguite durante la progettazione di flussi di lavoro utilizzando SageMaker Pipelines. Adottarle può migliorare il processo di sviluppo e semplificare la gestione operativa di SageMaker Pipelines.

Utilizzare la Pipeline Session per il caricamento ritardato della pipeline

La Pipeline Session consente l’inizializzazione ritardata delle risorse della pipeline (i job non vengono avviati fino all’esecuzione della pipeline). Il contesto PipelineSession eredita la Session di SageMaker e implementa metodi convenienti per interagire con altre entità e risorse di SageMaker, come i job di addestramento, gli endpoint, i dataset di input in Amazon Simple Storage Service (Amazon S3) e così via. Quando si definiscono le SageMaker Pipelines, è necessario utilizzare PipelineSession anziché la Session regolare di SageMaker:

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge’,
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=pipeline_session,
)

Eseguire le pipeline in modalità locale per iterazioni rapide ed economiche durante lo sviluppo

È possibile eseguire una pipeline in modalità locale utilizzando il contesto LocalPipelineSession. In questa modalità, la pipeline e i job vengono eseguiti in locale utilizzando le risorse della macchina locale, anziché le risorse gestite da SageMaker. La modalità locale offre un modo economico per iterare sul codice della pipeline con un subset più piccolo di dati. Dopo aver testato localmente la pipeline, è possibile ridimensionarla per l’esecuzione utilizzando il contesto PipelineSession.

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()
role = sagemaker.get_execution_role()
sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge',
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=local_pipeline_session,
)

Gestire una pipeline di SageMaker tramite versioning

La versioning degli artefatti e delle definizioni delle pipeline è una richiesta comune nel ciclo di sviluppo. È possibile creare più versioni della pipeline nominando gli oggetti della pipeline con un prefisso o un suffisso univoco, il più comune dei quali è un timestamp, come mostrato nel seguente codice:

from sagemaker.workflow.pipeline_context import PipelineSession
import time

current_time = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
pipeline_name = "pipeline_" + current_time
pipeline_session = PipelineSession()
pipeline = Pipeline(
    name=pipeline_name,
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=pipeline_session,
)

Organizzare e tracciare l’esecuzione delle pipeline di SageMaker integrandole con SageMaker Experiments

Le SageMaker Pipelines possono essere facilmente integrate con SageMaker Experiments per organizzare e tracciare l’esecuzione delle pipeline. Ciò viene realizzato specificando PipelineExperimentConfig al momento della creazione di un oggetto pipeline. Con questo oggetto di configurazione, è possibile specificare un nome di esperimento e un nome di prova. I dettagli dell’esecuzione di una pipeline SageMaker vengono organizzati nell’esperimento e nella prova specificati. Se non si specifica esplicitamente un nome di esperimento, viene utilizzato un nome di pipeline come nome dell’esperimento. Allo stesso modo, se non si specifica esplicitamente un nome di prova, viene utilizzato un ID di esecuzione della pipeline come nome di prova o di gruppo di esecuzioni. Vedere il seguente codice:

Pipeline(
    name="MyPipeline",
    parameters=[...],
    pipeline_experiment_config=PipelineExperimentConfig(
        experiment_name = ExecutionVariables.PIPELINE_NAME,
        trial_name = ExecutionVariables.PIPELINE_EXECUTION_ID
        ),
    steps=[...]
)

Esegui in modo sicuro le pipeline di SageMaker all’interno di una VPC privata

Per proteggere i carichi di lavoro di ML, è una buona pratica distribuire i lavori orchestrati da SageMaker Pipelines in una configurazione di rete sicura all’interno di una VPC privata, sotto-reti private e gruppi di sicurezza. Per garantire e imporre l’uso di questo ambiente sicuro, è possibile implementare la seguente politica di AWS Identity and Access Management (IAM) per il ruolo di esecuzione di SageMaker (questo è il ruolo assunto dalla pipeline durante la sua esecuzione). È anche possibile aggiungere la politica per eseguire i lavori orchestrati da SageMaker Pipelines in modalità di isolamento di rete.

# Politica IAM per imporre l'esecuzione all'interno di una VPC privata

{

    "Action": [

        "sagemaker:CreateProcessingJob",
        "sagemaker:CreateTrainingJob",
        "sagemaker:CreateModel"
    ],

    "Resource": "*",
    "Effect": "Deny",
    "Condition": {
        "Null": {
            "sagemaker:VpcSubnets": "true"
        }
    }
}

# Politica IAM per imporre l'esecuzione in modalità di isolamento di rete
{

    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Deny",
            "Action": [
                "sagemaker:Create*"
            ],
            "Resource": "*",
            "Condition": {
                "StringNotEqualsIfExists": {
                    "sagemaker:NetworkIsolation": "true"
                }
            }
        }
    ]
}

Per un esempio di implementazione della pipeline con questi controlli di sicurezza in atto, fare riferimento a Orchestrating Jobs, Model Registration e Continuous Deployment con Amazon SageMaker in un ambiente sicuro.

Monitorare i costi delle esecuzioni delle pipeline utilizzando i tag

L’utilizzo di SageMaker Pipelines di per sé è gratuito; si paga per le risorse di calcolo e archiviazione che si creano come parte dei singoli passaggi delle pipeline come il processing, l’addestramento e l’inferenza batch. Per aggregare i costi per ogni esecuzione della pipeline, è possibile includere dei tag in ogni passaggio della pipeline che crea una risorsa. Questi tag possono quindi essere utilizzati nell’esploratore dei costi per filtrare e aggregare il costo totale dell’esecuzione della pipeline, come mostrato nell’esempio seguente:

sklearn_processor = SKLearnProcessor(
    framework_version=’0.20.0’,
    instance_type=’ml.m5.xlarge,
    instance_count=1,
    base_job_name="sklearn-abalone-process",
    role=role,
    tags=[{'Key':'pipeline-cost-tag', 'Value':'<<tag_parameter>>'}]
)

step_process = ProcessingStep(
    name="AbaloneProcess",
    processor=sklearn_processor,
    ...
)

Dall’esploratore dei costi, è ora possibile ottenere i costi filtrati dal tag:

response = client.get_cost_and_usage(
    TimePeriod={
        'Start': '2023-07-01',
        'End': '2023-07-15'
        },
    Metrics=['BLENDED_COST','USAGE_QUANTITY','UNBLENDED_COST'],
    Granularity='MONTHLY',
    Filter={
        'Dimensions': {
            'Key':'USAGE_TYPE',
            'Values': [
                ‘SageMaker:Pipeline’
            ]
        },
        'Tags': {
            'Key': 'keyName',
            'Values': [
                'keyValue',
                ]
        }
    }
)

Pattern di progettazione per alcuni scenari comuni

In questa sezione, discutiamo dei pattern di progettazione per alcuni casi d’uso comuni con SageMaker Pipelines.

Esegui una funzione Python leggera utilizzando un passaggio Lambda

Le funzioni Python sono onnipresenti nei flussi di lavoro di ML; vengono utilizzate per il preprocessing, il postprocessing, la valutazione e altro ancora. Lambda è un servizio di calcolo serverless che consente di eseguire il codice senza dover provisionare o gestire server. Con Lambda, è possibile eseguire il codice nel proprio linguaggio preferito, incluso Python. È possibile utilizzarlo per eseguire il codice Python personalizzato come parte della propria pipeline. Un passaggio Lambda consente di eseguire funzioni Lambda come parte della pipeline di SageMaker. Inizia con il seguente codice:

%%writefile lambdafunc.py

import json

def lambda_handler(event, context):
    str1 = event["str1"]
    str2 = event["str2"]
    str3 = str1 + str2
    return {
        "str3": str3
    }

Crea la funzione Lambda utilizzando l’aiutante Lambda del SDK Python di SageMaker:

from sagemaker.lambda_helper import Lambda

def create_lambda(nome_funzione, script, handler):
    response = Lambda(
        function_name=nome_funzione,
        execution_role_arn=ruolo,
        script=script,
        handler=handler,
        timeout=600,
        memory_size=10240,
    ).upsert()

    arn_funzione = response['FunctionArn']
    return arn_funzione

fn_arn = create_Lambda("func", "lambdafunc.py", handler="lambdafunc.lambda_handler")

Chiama il passaggio Lambda:

from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum
)

str3 = LambdaOutput(output_name="str3", output_type=LambdaOutputTypeEnum.String)

# Passaggio Lambda
step_lambda1 = LambdaStep(
    name="PassaggioLambda1",
    lambda_func=Lambda(
        function_arn=fn_arn
    ),
    inputs={
        "str1": "Ciao",
        "str2": " Mondo"
    },
    outputs=[str3],
)

Passa i dati tra i passaggi

I dati di input per un passaggio di un flusso di lavoro possono essere una posizione di dati accessibile o dati generati da uno dei passaggi precedenti nel flusso di lavoro. Puoi fornire queste informazioni come parametro ProcessingInput. Vediamo alcuni scenari in cui puoi utilizzare ProcessingInput.

Scenario 1: Passa l’output (tipi di dati primitivi) di un passaggio Lambda a un passaggio di elaborazione

I tipi di dati primitivi si riferiscono a tipi di dati scalari come stringhe, interi, booleani e float.

Il seguente frammento di codice definisce una funzione Lambda che restituisce un dizionario di variabili con tipi di dati primitivi. Il codice della tua funzione Lambda restituirà un JSON di coppie chiave-valore quando viene invocato dal passaggio Lambda all’interno del flusso di lavoro di SageMaker.

def handler(event, context):
    ...
    return {
        "output1": "valore_stringa",
        "output2": 1,
        "output3": True,
        "output4": 2.0,
    }

Nella definizione del flusso di lavoro, puoi quindi definire i parametri del flusso di lavoro di SageMaker che hanno un tipo di dati specifico e impostare la variabile sull’output della funzione Lambda:

from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum
)
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn.processing import SKLearnProcessor

ruolo = sagemaker.get_execution_role()
sessione_flusso_di_lavoro = PipelineSession()

# 1. Definisci i parametri di output del passaggio Lambda

param_output_stringa = LambdaOutput(output_name="output1", output_type=LambdaOutputTypeEnum.String)
param_output_intero = LambdaOutput(output_name="output2", output_type=LambdaOutputTypeEnum.Integer)
param_output_booleano = LambdaOutput(output_name="output3", output_type=LambdaOutputTypeEnum.Boolean)
param_output_float = LambdaOutput(output_name="output4", output_type=LambdaOutputTypeEnum.Float)

# 2. Passaggio Lambda che invoca la funzione lambda e restituisce l'Output

passaggio_lambda = LambdaStep(
    name="MioPassaggioLambda",
    lambda_func=Lambda(
        function_arn="arn:aws:lambda:us-west-2:123456789012:function:sagemaker_test_lambda",
        session=sessione_flusso_di_lavoro,
        ),
    inputs={"arg1": "valore1", "arg2": "valore2"},
    outputs=[
        param_output_stringa, param_output_intero, param_output_booleano, param_output_float
        ],
)

# 3. Estrai l'output del Lambda

param_output_stringa = passaggio_lambda.properties.Outputs["output1"]

# 4. Usalo in un passaggio successivo. Ad esempio, il passaggio di elaborazione

processore_sklearn = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    sagemaker_session=sessione_flusso_di_lavoro,
    role=ruolo
)

argomenti_processore = processore_sklearn.run(
    code="code/preprocess.py", # script Python da eseguire
    arguments=["--input-args", param_output_stringa]
)

passaggio_elaborazione = ProcessingStep(
    name="passaggio_elaborazione1",
    step_args=argomenti_processore,
)

Scenario 2: Passa l’output (tipi di dati non primitivi) di un passaggio Lambda a un passaggio di elaborazione

I tipi di dati non primitivi si riferiscono a tipi di dati non scalari (ad esempio, NamedTuple). Potrebbe capitare che tu debba restituire un tipo di dato non primitivo da una funzione Lambda. Per farlo, devi convertire il tuo tipo di dato non primitivo in una stringa:

# Codice della funzione Lambda che restituisce un tipo di dato non primitivo

from collections import namedtuple

def lambda_handler(event, context):
    Outputs = namedtuple("Outputs", "sample_output")
    named_tuple = Outputs(
                    [
                        {'output1': 1, 'output2': 2},
                        {'output3': 'foo', 'output4': 'foo1'}
                    ]
                )
    return {
        "named_tuple_string": str(named_tuple)
    }

# Passaggio del flusso di lavoro che utilizza l'output della funzione Lambda come "Input del parametro"

output_ref = step_lambda.properties.Outputs["named_tuple_string"]

Quindi puoi utilizzare questa stringa come input per un passaggio successivo nel flusso di lavoro. Per utilizzare la named tuple nel codice, utilizza eval() per analizzare l’espressione Python nella stringa:

# Decifrare la stringa nel tuo codice di logica di elaborazione

import argparse
from collections import namedtuple

Outputs = namedtuple("Outputs", "sample_output")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--named_tuple_string", type=str, required=True)
    args = parser.parse_args()
    # utilizza eval per ottenere la named tuple dalla stringa
    named_tuple = eval(args.named_tuple_string)

Scenario 3: Passare l’output di un passaggio attraverso un file di proprietà

Puoi anche memorizzare l’output di un passaggio di elaborazione in un file JSON di proprietà per il consumo successivo in un ConditionStep o un altro ProcessingStep. Puoi utilizzare la funzione JSONGet per interrogare un file di proprietà. Ecco il codice seguente:

# 1. Definire un Processor con una ProcessingOutput
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="sklearn-abalone-preprocess",
    sagemaker_session=session,
    role=sagemaker.get_execution_role(),
)

step_args = sklearn_processor.run(

                outputs=[
                    ProcessingOutput(
                        output_name="hyperparam",
                        source="/opt/ml/processing/evaluation"
                    ),
                ],
            code="./local/preprocess.py",
            arguments=["--input-data", "s3://my-input"],
)

# 2. Definire un PropertyFile in cui il nome dell'output corrisponde a quello utilizzato nel Processor

hyperparam_report = PropertyFile(
    name="AbaloneHyperparamReport",
    output_name="hyperparam",
    path="hyperparam.json",
)

Supponiamo che il contenuto del file di proprietà sia il seguente:

{
    "hyperparam": {
        "eta": {
            "value": 0.6
        }
    }
}

In questo caso, è possibile interrogarlo per ottenere un valore specifico e utilizzarlo in passaggi successivi utilizzando la funzione JsonGet:

# 3. Interrogare il file di proprietà
eta = JsonGet(
    step_name=step_process.name,
    property_file=hyperparam_report,
    json_path="hyperparam.eta.value",
)

Parametrizzare una variabile nella definizione del flusso di lavoro

La parametrizzazione delle variabili in modo che possano essere utilizzate in fase di esecuzione è spesso desiderabile, ad esempio per costruire un URI S3. È possibile parametrizzare una stringa in modo che venga valutata in fase di esecuzione utilizzando la funzione Join. Il frammento di codice seguente mostra come definire la variabile utilizzando la funzione Join e utilizzarla per impostare la posizione di output in un passaggio di elaborazione:

# Definisci la variabile per memorizzare l'URI S3
s3_location = Join(
    on="/", 
    values=[
        "s3:/",
        ParameterString(
            name="MyBucket", 
            default_value=""
        ),
        "training",
        ExecutionVariables.PIPELINE_EXECUTION_ID
    ]
)

# Definisci il passaggio di elaborazione
sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess",
    sagemaker_session=pipeline_session,
    role=role,
)

# Utilizza l's3uri come posizione di output nel passaggio di elaborazione
processor_run_args = sklearn_processor.run(
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",
            destination=s3_location,
        ),
    ],
    code="code/preprocess.py"
)

step_process = ProcessingStep(
    name="PreprocessingJob”,
    step_args=processor_run_args,
)

Eseguire codice parallelo su un iterabile

Alcuni flussi di lavoro di Machine Learning eseguono codice in parallelo in cicli for su un insieme statico di elementi (un iterabile). Può essere lo stesso codice eseguito su dati diversi o un pezzo di codice diverso che deve essere eseguito per ogni elemento. Ad esempio, se si dispone di un numero molto grande di righe in un file e si desidera accelerare il tempo di elaborazione, è possibile fare affidamento sul primo pattern. Se si desidera eseguire diverse trasformazioni su sottogruppi specifici dei dati, potrebbe essere necessario eseguire un pezzo di codice diverso per ogni sottogruppo dei dati. I due scenari seguenti illustrano come è possibile progettare pipeline SageMaker a questo scopo.

Scenario 1: Implementare una logica di elaborazione su diverse porzioni di dati

È possibile eseguire un job di elaborazione con più istanze (impostando instance_count su un valore maggiore di 1). Questo distribuisce i dati di input da Amazon S3 su tutte le istanze di elaborazione. È quindi possibile utilizzare uno script (process.py) per lavorare su una porzione specifica dei dati in base al numero di istanza e all’elemento corrispondente nella lista di elementi. La logica di programmazione in process.py può essere scritta in modo che venga eseguito un modulo o un pezzo di codice diverso a seconda della lista di elementi che elabora. L’esempio seguente definisce un processore che può essere utilizzato in un ProcessingStep:

sklearn_processor = FrameworkProcessor(
    estimator_cls=sagemaker.sklearn.estimator.SKLearn,
    framework_version="0.23-1",
    instance_type='ml.m5.4xlarge',
    instance_count=4, #numero di esecuzioni parallele / istanze
    base_job_name="parallel-step",
    sagemaker_session=session,
    role=role,
)

step_args = sklearn_processor.run(
    code='process.py',
    arguments=[
        "--items", 
        list_of_items, #struttura dati contenente una lista di elementi
        inputs=[
            ProcessingInput(source="s3://sagemaker-us-east-1-xxxxxxxxxxxx/abalone/abalone-dataset.csv",
                    destination="/opt/ml/processing/input"
            )
        ],
    ]
)

Scenario 2: Eseguire una sequenza di passaggi

Quando si ha una sequenza di passaggi da eseguire in parallelo, è possibile definire ogni sequenza come una pipeline SageMaker indipendente. L’esecuzione di queste pipeline SageMaker può quindi essere attivata da una funzione Lambda che fa parte di un LambdaStep nella pipeline principale. Il seguente pezzo di codice illustra lo scenario in cui vengono attivate due diverse esecuzioni di pipeline SageMaker:

import boto3
def lambda_handler(event, context):
    items = [1, 2]
    #client SageMaker
    sm_client = boto3.client("sagemaker")
    
    #nome della pipeline che deve essere attivata.
    #se ce ne sono più di una, è possibile recuperare le pipeline disponibili utilizzando l'API di boto3
    #e attivare quella appropriata in base alla logica.
    pipeline_name = 'child-pipeline-1'

    #attiva la pipeline per ogni elemento
    response_ppl = sm_client.start_pipeline_execution(
                        PipelineName=pipeline_name,
                        PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s),
                    )
    pipeline_name = 'child-pipeline-2'
    response_ppl = sm_client.start_pipeline_execution(
                        PipelineName=pipeline_name,
                        PipelineExecutionDisplayName=pipeline_name+'-item-%d' %(s),
                    )
return

Conclusioni

In questo post, abbiamo discusso alcune best practice per l’uso efficiente e la manutenzione delle pipeline SageMaker. Abbiamo inoltre fornito alcuni modelli che è possibile adottare durante la progettazione di flussi di lavoro con le pipeline SageMaker, che si stiano creando nuove pipeline o si stiano migrando flussi di lavoro di Machine Learning da altri strumenti di orchestrazione. Per iniziare con le pipeline SageMaker per l’orchestrazione dei flussi di lavoro di Machine Learning, fare riferimento agli esempi di codice su GitHub e alle SageMaker Model Building Pipelines di Amazon.