AWS Machine Learning Blog 01月07日
Efficiently build and tune custom log anomaly detection models with Amazon SageMaker
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍了如何使用Amazon SageMaker构建自动化机制,处理日志数据,进行训练迭代以获得最佳异常检测模型,并将其注册到Amazon SageMaker模型注册表中。该方案涉及将日志数据解析为向量或机器可理解的标记,然后使用这些数据训练自定义机器学习算法来检测异常。通过SageMaker Pipelines,可以将数据加载、解析、处理、模型训练和超参数调优等步骤自动化为一个执行流程。文章还详细介绍了如何使用ScriptProcessor进行分散式数据处理,以及如何构建和推送自定义Docker容器。此外,还涵盖了如何将这些步骤整合到SageMaker Pipeline中。

📦 使用Amazon SageMaker构建自动化日志异常检测机制,包括数据处理、模型训练和模型注册。

⚙️ 通过SageMaker Pipelines自动化数据处理、超参数调优和模型训练等步骤,提高效率和可扩展性。

🚀 介绍了分散式数据处理方法,使用ScriptProcessor在SageMaker容器中运行自定义脚本,并提供了构建自定义Docker容器的详细步骤。

🛠️ 详细阐述了如何使用SageMaker处理步骤加载S3数据、解析日志,以及如何进行模型训练和注册,为用户提供了实践指导。

🎯 强调了超参数调优在机器学习中的重要性,并通过SageMaker自动调优,找到性能最佳的异常检测模型。

In this post, we walk you through the process to build an automated mechanism using Amazon SageMaker to process your log data, run training iterations over it to obtain the best-performing anomaly detection model, and register it with the Amazon SageMaker Model Registry for your customers to use it.

Log-based anomaly detection involves identifying anomalous data points in log datasets for discovering execution anomalies, as well as suspicious activities. It usually comprises parsing log data into vectors or machine-understandable tokens, which you can then use to train custom machine learning (ML) algorithms for determining anomalies.

You can adjust the inputs or hyperparameters for an ML algorithm to obtain a combination that yields the best-performing model. This process is called hyperparameter tuning and is an essential part of machine learning. Choosing appropriate hyperparameter values is crucial for success, and it’s usually performed iteratively by experts, which can be time-consuming. Added to this are the general data-related processes such as loading data from appropriate sources, parsing and processing them with custom logic, storing the parsed data back to storage, and loading them again for training custom models. Moreover, these tasks need to be done repetitively for each combination of hyperparameters, which doesn’t scale well with increasing data and new supplementary steps. You can use Amazon SageMaker Pipelines to automate all these steps into a single execution flow. In this post, we demonstrate how to set up this entire workflow.

Solution overview

Contemporary log anomaly detection techniques such as Drain-based detection [1] or DeepLog [2] consist of the following general approach: perform custom processing on logs, train their anomaly detection models using custom models, and obtain the best-performing model with an optimal set of hyperparameters. To build an anomaly detection system using such techniques, you need to write custom scripts for processing as well for training. SageMaker provides support for developing scripts by extending in-built algorithm containers, or by building your own custom containers. Moreover, you can combine these steps as a series of interconnected stages using SageMaker Pipelines. The following figure shows an example architecture:

The workflow consists of the following steps:

    The log training data is initially stored in an Amazon Simple Storage Service (Amazon S3) bucket, from where it’s picked up by the SageMaker processing step of the SageMaker pipeline. After the pipeline is started, the processing step loads the Amazon S3 data into SageMaker containers and runs custom processing scripts that parse and process the logs before uploading them to a specified Amazon S3 destination. This processing could be either decentralized with a single script running on one or more instances, or it could be run in parallel over multiple instances using a distributed framework like Apache Spark. We discuss both approaches in this post. After processing, the data is automatically picked up by the SageMaker tuning step, where multiple training iterations with unique hyperparameter combinations are run for the custom training script. Finally, the SageMaker model step creates a SageMaker model using the best-trained model obtained from the tuning step and registers it to the SageMaker Model Registry for consumers to use. These consumers, for example, could be testers, who use models trained on different datasets by different pipelines to compare their effectiveness and generality, before deploying them to a public endpoint.

We walk through implementing the solution with the following high-level steps:

    Perform custom data processing, using either a decentralized or distributed approach. Write custom SageMaker training scripts that automatically tune the resulting models with a range of hyperparameters. Select the best-tuned model, create a custom SageMaker model from it, and register it to the SageMaker Model Registry. Combine all the steps in a SageMaker pipeline and run it.

Prerequisites

You should have the following prerequisites:

Process the data

To start, upload the log dataset to an S3 bucket in your AWS account. You can use the AWS Command Line Interface (AWS CLI) using Amazon S3 commands, or use the AWS Management Console. To process the data, you use a SageMaker processing step as the first stage in your SageMaker pipeline. This step spins up a SageMaker container and runs a script that you provide for custom processing. There are two ways to do this: decentralized or distributed processing. SageMaker provides Processor classes for both approaches. You can choose either approach for your custom processing depending on your use case.

Decentralized processing with ScriptProcessor

In the decentralized approach, a single custom script runs on one or more standalone instances and processes the input data. The SageMaker Python SDK provides the ScriptProcessor class, which you can use to run your custom processing script in a SageMaker processing step. For small datasets, a single instance can usually suffice for performing data processing. Increasing the number of instances is recommended if your dataset is large and can be split into multiple independent components, which can all be processed separately (this can be done using the ShardedByS3Key parameter, which we discuss shortly).

If you have custom dependencies (which can often be the case during R&D processes), you can extend an existing container and customize it with your dependencies before providing it to the ScriptProcessor class. For example, if you’re using the Drain technique, you need the logparser Python library for log parsing, in which case you write a simple Dockerfile that installs it along with the usual Python ML libraries:

FROM python:3.7-slim-busterRUN pip3 install pandas==0.25.3 scikit-learn==0.21.3 logparser3 boto3ENV PYTHONUNBUFFERED=TRUEENTRYPOINT ["python3"]

You can use a Python SageMaker notebook instance in your AWS account to create such a Dockerfile and save it to an appropriate folder, such as docker. To build a container using this Dockerfile, enter the following code into a main driver program in a Jupyter notebook on your notebook instance:

import boto3from sagemaker import get_execution_roleregion = boto3.session.Session().region_namerole = get_execution_role()account_id = boto3.client("sts").get_caller_identity().get("Account")ecr_repository = "sagemaker-processing-my-container"tag = ":latest"uri_suffix = "amazonaws.com"if region in ["cn-north-1", "cn-northwest-1"]:uri_suffix = "amazonaws.com.cn"processing_repository_uri = "{}.dkr.ecr.{}.{}/{}".format(account_id, region, uri_suffix, ecr_repository + tag)# Create ECR repository and push docker image!docker build -t $ecr_repository docker!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)!aws ecr create-repository --repository-name $ecr_repository!docker tag {ecr_repository + tag} $processing_repository_uri!docker push $processing_repository_uri

This code creates an Amazon Elastic Container Registry (Amazon ECR) repository where your custom container image will be stored (the repository will be created if it’s not already present). The container image is then built, tagged with the repository name (and :latest), and pushed to the ECR repository.

The next step is writing your actual processing script. For more information on writing a processing script using ScriptProcessor, refer to Amazon SageMaker Processing – Fully Managed Data Processing and Model Evaluation. The following are a few key points to remember:

When your script is ready, you can instantiate the SageMaker ScriptProcessor class for running it on your custom container (created in the previous step) by adding the following code to your driver program:

from sagemaker.processing import (ProcessingInput,ProcessingOutput,ScriptProcessor,)from sagemaker.workflow.pipeline_context import PipelineSessionfrom sagemaker.workflow.steps import ProcessingSteppipeline_session = PipelineSession()script_processor = ScriptProcessor(command=["python3"],image_uri=processing_repository_uri,role=role,instance_count=1,instance_type="ml.m5.xlarge",sagemaker_session=pipeline_session,)script_processor_run_args = script_processor.run(code="preprocessing.py",inputs=[ProcessingInput(source="s3://amzn-s3-demo-bucket-pca-detect/processing_input/", destination="/opt/ml/processing/input")],outputs=[ProcessingOutput(output_name="training", source="/opt/ml/processing/train")])step_processing = ProcessingStep(name="PreprocessStep",step_args=script_processor_run_args,)

In the preceding code, a ScriptProcessor class is being instantiated to run the python3 command for running your custom Python script. You provide the following information:

You can also specify an additional parameter in run() named distribution, and it can either be ShardedByS3Key or FullyReplicated, depending on whether you’re splitting and sending your S3 dataset to multiple ScriptProcessor instances or not. You can specify the number of instances in the instance_count parameter of your ScriptProcessor class.

Once instantiated, you can pass the ScriptProcessor class as an argument to the SageMaker processing step along with an appropriate name.

Distributed processing with PySparkProcessor

An alternative to the decentralized processing is distributed processing. Distributed processing is particularly effective when you need to process large amounts of log data. Apache Spark is a popular engine for distributed data processing. It uses in-memory caching and optimized query execution for fast analytic queries against datasets of all sizes. SageMaker provides the PySparkProcessor class within the SageMaker Python SDK for running Spark jobs. For an example of performing distributed processing with PySparkProcessor on SageMaker processing, see Distributed Data Processing using Apache Spark and SageMaker Processing. The following are a few key points to note:

import subprocesssubprocess.run(["pip3", "install", "scipy", "scikit-learn", "logparser3"])

When your Spark processing script is ready, you can instantiate the SageMaker PySparkProcessor class for running it by adding the following lines to your driver program:

from sagemaker.processing import (ProcessingInput,ProcessingOutput,PySparkProcessor)from sagemaker.workflow.steps import ProcessingSteppipeline_session = PipelineSession()spark_processor = PySparkProcessor(base_job_name="hdfs-spark-job",framework_version="3.1",role=role,sagemaker_session=pipeline_session,instance_count=3,instance_type="ml.m5.xlarge",max_runtime_in_seconds=6000,)spark_processor.run(submit_app="./sagemaker_spark_processing.py",spark_event_logs_s3_uri="s3://amzn-s3-demo-bucket-pca-detect/logs/spark_event_logs",logs=True,)step_processing = ProcessingStep(name="SparkPreprocessStep",step_args=spark_processor_run_args,)

The preceding code instantiates a PySparkProcessor instance with three nodes in the SageMaker cluster with Spark v3.1 installed in them. You submit your Spark processing code to it along with the Amazon S3 location where your event logs would be uploaded. These logs can be useful for debugging.

In the run() method invocation, you don’t need to specify your inputs and outputs, which can be the case if these are fixed Amazon S3 destinations already known to your processing code. Otherwise, you can specify them using the ProcessingInput and ProcessingOutput parameters just like in the decentralized example.

Post-instantiation, the PySparkProcessor class is passed to a SageMaker processing step with an appropriate name. Its execution won’t be triggered until the pipeline is created.

Train and tune the model

Now that your processing steps are complete, you can proceed to the model training step. The training algorithm could either be a classical anomaly detection model like Drain-based detection or a neural-network based model like DeepLog. Every model takes in certain hyperparameters that influence how the model is trained. To obtain the best-performing model, the model is usually executed and validated multiple times over a wide range of hyperparameters. This can be a time-consuming manual process and can instead be automated using SageMaker hyperparameter tuning jobs. Tuning jobs perform hyperparameter optimization by running your training script with a specified range of hyperparameter values and obtaining the best model based on the metrics you specify. You can predefine these metrics if you use built-in SageMaker algorithms or define them for your custom training algorithm.

You first need to write your training script for your anomaly detection model. Keep the following in mind:

When your training script is ready, you can use it inside a SageMaker container. SageMaker provides a wide range of built-in algorithm containers that you can use to run your training code. However, there might be cases when you need to build your own training containers. This could be the case when you need custom libraries installed or if you plan to use a new algorithm not built in by SageMaker. In such a case, you can build your own containers in two ways:

After you create your training container image, you need to define the hyperparameter ranges for your tuning job. For example, if you’re using a custom adaptation of the PCA algorithm (like in Drain-based detection), you add the following lines to your driver program:

from sagemaker.tuner import (IntegerParameter,)hyperparameter_ranges = {"max_components": IntegerParameter(1, 30, scaling_type="Auto")}

The preceding code indicates that your hyperparameter max_components is an integer and it ranges from 1–30. The auto scaling type indicates that SageMaker will choose the best scale for hyperparameter changes. For more details on other scaling options, see Hyperparameter scaling types.

Then you can use the following code to fully configure your training and tuning steps in the driver program:

estimator = Estimator(image_uri= training_image_uri,role=role,base_job_name='new_training_job',sagemaker_session=pipeline_session,instance_count=1,instance_type='ml.m5.large',output_path='s3://amzn-s3-demo-bucket-pca-detect/models/',metric_definitions=[{'Name': custom_metric, 'Regex': "custom_metric_value: ([0-9\\.]+)"}])parameter_tuner = HyperparameterTuner(estimator,objective_metric_name ="custom_metric",hyperparameter_ranges,metric_definitions=[{'Name': custom_metric, 'Regex': "custom_metric_value: ([0-9\\.]+)"}],max_jobs=30,max_parallel_jobs=5,strategy="Bayesian",objective_type="Maximize",early_stopping_type="Auto")hpo_args = parameter_tuner.fit(inputs={"training": TrainingInput(s3_data= step_processing.properties.ProcessingOutputConfig.Outputs["training"].S3Output.S3Uri,s3_data_type="S3Prefix",distribution="FullyReplicated")})step_tuning = TuningStep(name="AnomalyDetectionTuning",step_args=hpo_args,)

In the preceding code, a SageMaker Estimator instance is created using your custom training image’s ECR URI. SageMaker Estimators help in training your models and orchestrating their training lifecycles. The Estimator is provided with a suitable role and the PipelineSession is designated as its SageMaker session.

You provide the location where your trained model should be stored to the Estimator and supply it with custom metric definitions that you created. For the example metric custom_metric_value: 91, the definition to the Estimator includes its name along with its regex. The regex informs SageMaker how to pick up the metric’s values from training logs in Amazon CloudWatch. The tuning job uses these values to find the best-performing model. You also specify where the output model should be uploaded in the output_path parameter.

You then use this Estimator to instantiate your HyperparameterTuner. Its parameters include the total and maximum parallel number of training jobs, search strategy (for more details on strategies, see Understand the hyperparameter tuning strategies available in Amazon SageMaker AI), and whether you want to use early stopping. Early stopping can be set to Auto so that SageMaker automatically stops model training when it doesn’t see improvements in your custom logged metric.

After the HyperparameterTuner is instantiated, you can call its fit() method. In its input parameter, you specify the output Amazon S3 URI from the processing step as the input location for obtaining training data in your tuning step. This way, you don’t need to specify the Amazon S3 URI yourself and it’s passed between steps implicitly. You can then specify your s3prefix and distribution depending on whether you’re using multiple instances or not.

Once instantiated, the HyperparameterTuner is passed to the tuning step, where it becomes part of your SageMaker pipeline. The training configuration is now complete!

Register the model

You can now choose the best model from the tuning step to create a SageMaker model and publish it to the SageMaker Model Registry. You can use the following driver program code:

from sagemaker import PipelineModelfrom sagemaker.workflow.model_step import ModelStepbest_model = sagemaker.model.Model(image_uri=training_image_uri,model_data=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket="amzn-s3-demo-bucket-pca-detect",prefix="models"))pipeline_model = PipelineModel(models=[best_model],role=role,sagemaker_session=pipeline_session,)register_model_step_args = pipeline_model.register(content_types=["text/csv"],response_types=["text/csv"],model_package_group_name="PCAAnomalyDetection",)step_model_registration = ModelStep(name="NewRegistry",step_args=register_model_step_args,)

The code instantiates a SageMaker model using the Amazon S3 URI of the best model obtained from the tuning step. The top_k attribute of the get_top_model_s3_uri() method indicates that you’re interested in only obtaining the best-trained model.

After the model is instantiated, you can use it to create a SageMaker PipelineModel so that your pipeline can work directly with your model. You then call the register() method of PipelineModel to register your model to the SageMaker Model Registry. In the register() call, you specify the name of the new model package group where your model will be registered and specify its input and output request and response prediction types.

Finally, a SageMaker ModelStep is invoked with the instantiated PipelineModel to carry out the model registration process.

Create and run a pipeline

You’ve now reached the final step where all your steps will be tied together in a SageMaker pipeline. Add the following code to your driver program to complete your pipeline creation steps:

from sagemaker.workflow.pipeline import Pipelinepipeline = Pipeline(name="Anomaly-Detection-Pipeline",steps=[step_processing,step_tuning,step_model_registration],sagemaker_session=pipeline_session,)pipeline.upsert(role_arn=role)pipeline.start()

This code instantiates the SageMaker Pipeline construct and provides it with all the steps defined until now—processing, tuning, and registering the model. It’s provided with a role and then invoked with the start() method.

The pipeline invocation could be on-demand using code (using pipeline.start() as shown earlier) or it could be event-driven using Amazon EventBridge rules. For example, you can create an EventBridge rule that triggers when new training data is uploaded to your S3 buckets and specify your SageMaker pipeline as the target for this rule. This makes sure that when new data is uploaded to your training bucket, your SageMaker pipeline is automatically invoked. For more details on SageMaker and EventBridge integration, refer to Schedule Pipeline Runs.

On invocation, your SageMaker pipeline runs your custom processing script in the processing step and uploads the processed data to your specified Amazon S3 destination. It then starts a tuning job with your custom training code and iteratively trains multiple models with your supplied hyperparameters and selects the best model based on your custom provided metric. The following screenshot shows that it selected the best model when tuning was complete:

Finally, the best model is selected and a model package resource is created with it in your model registry. Your customers can use it to deploy your model:

You have now completed all the steps in processing, training, tuning, and registering your custom anomaly detection model automatically with the aid of a SageMaker pipeline that was initiated using your driver program.

Clean up

To avoid incurring future charges, complete the following steps:

    Delete the SageMaker notebook instance used for this post. Delete the model package resource that was created using the best-tuned model. Delete any Amazon S3 data that was used for this post.

Conclusion

In this post, we demonstrated the building, training, tuning, and registering of an anomaly detection system with custom processing code, custom training code, and custom training metrics. We ran these steps automatically with the aid of a SageMaker pipeline, which was run by invoking a single main driver program. We also discussed the different ways of processing our data, and how it could be done using the various constructs and tools that SageMaker provides in a user-friendly and straightforward manner.

Try this approach for building your own custom anomaly detection model, and share your feedback in the comments.

References

[1] https://ieeexplore.ieee.org/document/8029742

[2] https://dl.acm.org/doi/pdf/10.1145/3133956.3134015


About the Author

Nitesh Sehwani is an SDE with the EC2 Threat Detection team where he’s involved in building large-scale systems that provide security to our customers. In his free time, he reads about art history and enjoys listening to mystery thrillers.

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Amazon SageMaker 异常检测 机器学习 自动化 日志分析
相关文章