AWS Machine Learning Blog 2024年09月04日
Use LangChain with PySpark to process documents at massive scale with Amazon SageMaker Studio and Amazon EMR Serverless
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

Amazon EMR Serverless 与 SageMaker Studio 集成,为大数据处理工作流提供无缝集成。通过将 EMR Serverless 与 Apache Livy 端点集成,SageMaker Studio 用户可以在其 Jupyter 笔记本中直接进行交互式数据准备、探索和机器学习。这种集成简化了基础设施管理,并提供了与 SageMaker 生态系统的无缝体验,从而使数据科学家能够专注于数据驱动的应用程序开发。

📕 **简化的基础设施管理:**通过抽象化设置和管理 Spark 集群的复杂性,EMR Serverless 集成允许您快速启动大数据工作负载所需的计算资源,而无需配置底层基础设施。

💻 **与 SageMaker 的无缝集成:**作为 SageMaker 平台的内置功能,EMR Serverless 集成为数据科学家和工程师提供了统一且直观的体验。您可以在 SageMaker Studio 环境中直接访问和使用此功能,从而实现更加简化和高效的开发工作流程。

📊 **成本优化:**集成的无服务器特性意味着您只需为使用的计算资源付费,而无需配置和维护持久性集群。这可以节省大量成本,尤其适用于具有可变或间歇性使用模式的工作负载。

📃 **可扩展性和性能:**EMR Serverless 集成会根据您的工作负载需求自动扩展或缩减计算资源,确保您始终拥有处理大数据任务所需的必要处理能力。这种灵活性有助于优化性能并最大程度地降低瓶颈或资源限制的风险。

📑 **降低运营开销:**EMR Serverless 与 AWS 的集成通过管理底层基础设施来简化大数据处理,从而释放您的团队时间和资源。SageMaker Studio 中的此功能使数据科学家、工程师和分析师能够专注于开发数据驱动的应用程序,简化基础设施管理、降低成本并提高可扩展性。

📢 **身份验证机制:**在 SageMaker Studio 中集成 EMR Serverless 时,您可以使用运行时角色。运行时角色是 AWS 身份和访问管理 (IAM) 角色,您可以在向 EMR Serverless 应用程序提交作业或查询时指定。这些运行时角色为您的工作负载提供了访问 AWS 资源(如 Amazon Simple Storage Service (Amazon S3) 存储桶)所需的权限。

📄 **EMR Serverless 集群的成本归因:**在 SageMaker Studio 中创建的 EMR Serverless 集群会自动标记带有系统默认标记,具体而言是 domain-arn 和 user-profile-arn 标记。这些系统生成的标记简化了 Amazon EMR 资源的成本分配和归因。

📒 **先决条件:**在开始之前,请完成本节中的先决条件步骤。

📈 **创建 SageMaker Studio 域:**本文档将引导您完成 SageMaker Studio 与 EMR Serverless 之间的集成,使用交互式 SageMaker Studio 笔记本。我们假设您已经拥有一个配备了 UserProfile 和 ExecutionRole 的 SageMaker Studio 域。如果您没有可用的 SageMaker Studio 域,请参阅快速设置 Amazon SageMaker 以配置一个。

📉 **创建 EMR Serverless 作业运行时角色:**EMR Serverless 允许您指定 IAM 角色权限,EMR Serverless 作业运行时可以在代表您调用其他服务时承担这些权限。这包括访问 Amazon S3 用于数据源和目标,以及其他 AWS 资源,如 Amazon Redshift 集群和 Amazon DynamoDB 表。要详细了解如何创建角色,请参阅创建作业运行时角色。

📛 **更新 SageMaker 角色以允许 EMR Serverless 访问:**此一次性任务使 SageMaker Studio 用户能够创建、更新、列出、启动、停止和删除 EMR Serverless 集群。我们首先创建一个内联策略,该策略授予对 EMR Serverless 集群执行这些操作所需的权限,然后将该策略附加到 Studio 域或用户配置文件角色:

📜 **使用 EMR Serverless 创建 Spark 集群:**在本节中,我们将使用 SageMaker Studio 中的内置形式来创建 EMR Serverless 集群。此功能使您能够在 SageMaker Studio 环境中直接与 EMR Serverless 集成,而无需手动配置或设置。

📝 **使用 EMR Serverless 运行 PySpark 代码:**创建 EMR Serverless 集群后,您现在可以从 SageMaker Studio 笔记本中运行 PySpark 代码。这使您能够直接与 EMR Serverless 集群进行交互,并利用其分布式计算能力来处理大型数据集。

📞 **清理:**完成本教程后,您可以删除创建的资源以避免产生不必要的费用。

📟 **结论:**Amazon EMR Serverless 与 SageMaker Studio 的集成提供了一个强大的解决方案,用于简化大数据处理工作流程。通过无缝集成和直观的界面,数据科学家可以专注于开发数据驱动的应用程序,而无需担心底层基础设施的复杂性。EMR Serverless 集成简化了基础设施管理、提高了成本效益、增强了可扩展性并降低了运营开销,从而使您能够充分利用数据并获得有形的业务成果。

Harnessing the power of big data has become increasingly critical for businesses looking to gain a competitive edge. From deriving insights to powering generative artificial intelligence (AI)-driven applications, the ability to efficiently process and analyze large datasets is a vital capability. However, managing the complex infrastructure required for big data workloads has traditionally been a significant challenge, often requiring specialized expertise. That’s where the new Amazon EMR Serverless application integration in Amazon SageMaker Studio can help.

With the introduction of EMR Serverless support for Apache Livy endpoints, SageMaker Studio users can now seamlessly integrate their Jupyter notebooks running sparkmagic kernels with the powerful data processing capabilities of EMR Serverless. This allows SageMaker Studio users to perform petabyte-scale interactive data preparation, exploration, and machine learning (ML) directly within their familiar Studio notebooks, without the need to manage the underlying compute infrastructure. By using the Livy REST APIs, SageMaker Studio users can also extend their interactive analytics workflows beyond just notebook-based scenarios, enabling a more comprehensive and streamlined data science experience within the Amazon SageMaker ecosystem.

In this post, we demonstrate how to leverage the new EMR Serverless integration with SageMaker Studio to streamline your data processing and machine learning workflows.

Benefits of integrating EMR Serverless with SageMaker Studio

The EMR Serverless application integration in SageMaker Studio offers several key benefits that can transform the way your organization approaches big data:

Solution overview

SageMaker Studio is a fully integrated development environment (IDE) for ML that enables data scientists and developers to build, train, debug, deploy, and monitor models within a single web-based interface. SageMaker Studio runs inside an AWS managed virtual private cloud (VPC), with network access for SageMaker Studio domains, in this setup configured as VPC-only. SageMaker Studio automatically creates an elastic network interface within your VPC’s private subnet, which connects to the required AWS services through VPC endpoints. This same interface is also used for provisioning EMR clusters. The following diagram illustrates this solution.

An ML platform administrator can manage permissioning for the EMR Serverless integration in SageMaker Studio. The administrator can configure the appropriate privileges by updating the runtime role with an inline policy, allowing SageMaker Studio users to interactively create, update, list, start, stop, and delete EMR Serverless clusters. SageMaker Studio users are presented with built-in forms within the SageMaker Studio UI that don’t require additional configuration to interact with both EMR Serverless and Amazon Elastic Compute Cloud (Amazon EC2) based clusters.

Apache Spark and its Python API, PySpark, empower users to process massive datasets effortlessly by using distributed computing across multiple nodes. These powerful frameworks simplify the complexities of parallel processing, enabling you to write code in a familiar syntax while the underlying engine manages data partitioning, task distribution, and fault tolerance. With scalability as a core strength, Spark and PySpark allow you to handle datasets of virtually any size, eliminating the constraints of a single machine.

Empowering knowledge retrieval and generation with scalable Retrieval Augmented Generation (RAG) architecture is increasingly important in today’s era of ever-growing information. Effectively using data to provide contextual and informative responses has become a crucial challenge. This is where RAG systems excel, combining the strengths of information retrieval and text generation to deliver comprehensive and accurate results. In this post, we explore how to build a scalable and efficient RAG system using the new EMR Serverless integration, Spark’s distributed processing, and an Amazon OpenSearch Service vector database powered by the LangChain orchestration framework. This solution enables you to process massive volumes of textual data, generate relevant embeddings, and store them in a powerful vector database for seamless retrieval and generation.

Authentication mechanism

When integrating EMR Serverless in SageMaker Studio, you can use runtime roles. Runtime roles are AWS Identity and Access Management (IAM) roles that you can specify when submitting a job or query to an EMR Serverless application. These runtime roles provide the necessary permissions for your workloads to access AWS resources, such as Amazon Simple Storage Service (Amazon S3) buckets. When integrating EMR Serverless in SageMaker Studio, you can configure the IAM role to be used by SageMaker Studio. By using EMR runtime roles, you can make sure your workloads have the minimum set of permissions required to access the necessary resources, following the principle of least privilege. This enhances the overall security of your data processing pipelines and helps you maintain better control over the access to your AWS resources.

Cost attribution of EMR Serverless clusters

EMR Serverless clusters created within SageMaker Studio are automatically tagged with system default tags, specifically the domain-arn and user-profile-arn tags. These system-generated tags simplify cost allocation and attribution of Amazon EMR resources. See the following code:

# domain tagsagemaker:domain-arn: arn:aws:sagemaker:<region>:<account-id>:domain/<domain-id># user profile tagsagemaker:user-profile-arn: arn:aws:sagemaker:<region>:<account-id>:user-profile/<domain-id>/<user-profile-name>

To learn more about enterprise-level cost allocation for ML environments, refer to Set up enterprise-level cost allocation for ML environments and workloads using resource tagging in Amazon SageMaker.

Prerequisites

Before you get started, complete the prerequisite steps in this section.

Create a SageMaker Studio domain

This post walks you through the integration between SageMaker Studio and EMR Serverless using an interactive SageMaker Studio notebook. We assume you already have a SageMaker Studio domain provisioned with a UserProfile and an ExecutionRole. If you don’t have a SageMaker Studio domain available, refer to Quick setup to Amazon SageMaker to provision one.

Create an EMR Serverless job runtime role

EMR Serverless allows you to specify IAM role permissions that an EMR Serverless job run can assume when calling other services on your behalf. This includes access to Amazon S3 for data sources and targets, as well as other AWS resources like Amazon Redshift clusters and Amazon DynamoDB tables. To learn more about creating a role, refer to Create a job runtime role.

The sample following IAM inline policy attached to a runtime role allows EMR Serverless to assume a runtime role that provides access to an S3 bucket and AWS Glue. You can modify the role to include any additional services that EMR Serverless needs to access at runtime. Additionally, make sure you scope down the resources in the runtime policies to adhere to the principle of least privilege.

{  "Version": "2012-10-17",  "Statement": [    {      "Sid": "ReadAccessForEMRSamples",      "Effect": "Allow",      "Action": [        "s3:GetObject",        "s3:ListBucket"      ],      "Resource": [        "arn:aws:s3:::*.elasticmapreduce",        "arn:aws:s3:::*.elasticmapreduce/*"      ]    },    {      "Sid": "FullAccessToOutputBucket",      "Effect": "Allow",      "Action": [        "s3:PutObject",        "s3:GetObject",        "s3:ListBucket",        "s3:DeleteObject"      ],      "Resource": [        "arn:aws:s3:::<emrs-sample-s3-bucket-name>",        "arn:aws:s3:::<emrs-sample-s3-bucket-name>/*"      ]    },    {      "Sid": "GlueCreateAndReadDataCatalog",      "Effect": "Allow",      "Action": [        "glue:GetDatabase",        "glue:CreateDatabase",        "glue:GetDataBases",        "glue:CreateTable",        "glue:GetTable",        "glue:UpdateTable",        "glue:DeleteTable",        "glue:GetTables",        "glue:GetPartition",        "glue:GetPartitions",        "glue:CreatePartition",        "glue:BatchCreatePartition",        "glue:GetUserDefinedFunctions"      ],      "Resource": [        "*"      ]    }  ]}

Lastly, make sure your role has a trust relationship with EMR Serverless:

{    "Version": "2012-10-17",    "Statement": [        {            "Effect": "Allow",            "Principal": {                "Service": "emr-serverless.amazonaws.com"            },            "Action": "sts:AssumeRole"        }    ]}

Optionally, you can create a runtime role and policy using infrastructure as code (IaC), such as with AWS CloudFormation or Terraform, or using the AWS Command Line Interface (AWS CLI).

Update the SageMaker role to allow EMR Serverless access

This one-time task enables SageMaker Studio users to create, update, list, start, stop, and delete EMR Serverless clusters. We begin by creating an inline policy that grants the necessary permissions for these actions on EMR Serverless clusters, then attach the policy to the Studio domain or user profile role:

{  "Version": "2012-10-17",  "Statement": [    {      "Sid": "EMRServerlessUnTaggedActions",      "Effect": "Allow",      "Action": [        "emr-serverless:ListApplications"      ],      "Resource": "arn:aws:emr-serverless:<region>:<aws-account-id>:/*"    },    {      "Sid": "EMRServerlessPassRole",      "Effect": "Allow",      "Action": "iam:PassRole",      "Resource": "arn:aws:iam:<region>:<aws-account-id>:role/SM-EMRServerless-RunTime-role",      "Condition": {        "StringLike": {          "iam:PassedToService": "emr-serverless.amazonaws.com"        }      }    },    {      "Sid": "EMRServerlessCreateApplicationAction",      "Effect": "Allow",      "Action": [        "emr-serverless:CreateApplication",        "emr-serverless:TagResource"      ],      "Resource": "arn:aws:emr-serverless:<region>:<aws-account-id>:/*",      "Condition": {        "ForAllValues:StringEquals": {          "aws:TagKeys": [            "sagemaker:domain-arn",            "sagemaker:user-profile-arn",            "sagemaker:space-arn"          ]        },        "Null": {          "aws:RequestTag/sagemaker:domain-arn": "false",          "aws:RequestTag/sagemaker:user-profile-arn": "false",          "aws:RequestTag/sagemaker:space-arn": "false"        }      }    },    {      "Sid": "EMRServerlessDenyPermissiveTaggingAction",      "Effect": "Deny",      "Action": [        "emr-serverless:TagResource",        "emr-serverless:UntagResource"      ],      "Resource": "arn:aws:emr-serverless:<region>:<aws-account-id>:/*",      "Condition": {        "Null": {          "aws:ResourceTag/sagemaker:domain-arn": "true",          "aws:ResourceTag/sagemaker:user-profile-arn": "true",          "aws:ResourceTag/sagemaker:space-arn": "true"        }      }    },    {      "Sid": "EMRServerlessActions",      "Effect": "Allow",      "Action": [        "emr-serverless:StartApplication",        "emr-serverless:StopApplication",        "emr-serverless:GetApplication",        "emr-serverless:DeleteApplication",        "emr-serverless:AccessLivyEndpoints",        "emr-serverless:GetDashboardForJobRun"      ],      "Resource": "arn:aws:emr-serverless:<region>:<aws-account-id>:/applications/*",      "Condition": {        "Null": {          "aws:ResourceTag/sagemaker:domain-arn": "false",          "aws:ResourceTag/sagemaker:user-profile-arn": "false",          "aws:ResourceTag/sagemaker:space-arn": "false"        }      }    }  ]}

Update the domain with EMR Serverless runtime roles

SageMaker Studio supports access to EMR Serverless clusters in two ways: in the same account as the SageMaker Studio domain or across accounts.

To interact with EMR Serverless clusters created in the same account as the SageMaker Studio domain, create a file named same-account-update-domain.json:

{    "DomainId": "<emr-s-sm-studio-domain-id>",    "DefaultUserSettings": {        "JupyterLabAppSettings": {            "EmrSettings": {                 "ExecutionRoleArns": [ "arn:aws:iam:<region>:<aws-account-id>:role/<same-account-emr-runtime-role>" ]            }        }    }}

Then run an update-domain command to allow all users inside a domain to allow users to use the runtime role:

aws –region <region> \sagemaker update-domain \--cli-input-json file://same-account-update-domain.json

For EMR Serverless clusters created in a different account, create a file named cross-account-update-domain.json:

{    "DomainId": "<emr-s-sm-studio-domain-id>",    "DefaultUserSettings": {        "JupyterLabAppSettings": {            "EmrSettings": {                 "AssumableRoleArns": [ "arn:aws:iam:<region>:<aws-account-id>:role/<cross-account-emr-runtime-role>" ]            }        }    }}

Then run an update-domain command to allow all users inside a domain to allow users to use the runtime role:

aws --region <region> \sagemaker update-domain \--cli-input-json file://cross-account-update-domain.json

Update the user profile with EMR Serverless runtime roles

Optionally, this update can be applied more granularly at the user profile level instead of the domain level. Similar to domain update, to interact with EMR Serverless clusters created in the same account as the SageMaker Studio domain, create a file named same-account-update-user-profile.json:

{    "DomainId": "<emr-s-sm-studio-domain-id>",    "UserProfileName": "<emr-s-sm-studio-user-profile-name>",    "UserSettings": {        "JupyterLabAppSettings": {            "EmrSettings": {                 "ExecutionRoleArns": [ "arn:aws:iam:<region>:<aws-account-id>:role/<same-account-emr-runtime-role>" ]            }        }    }}

Then run an update-user-profile command to allow this user profile use this run time role:

aws –region <region> \sagemaker update-domain \--cli-input-json file://same-account-update-user-profile.json

For EMR Serverless clusters created in a different account, create a file named cross-account-update-user-profile.json:

{    "DomainId": "<emr-s-sm-studio-domain-id>",    "UserProfileName": "<emr-s-sm-studio-user-profile-name>",    "UserSettings": {        "JupyterLabAppSettings": {            "EmrSettings": {                 "AssumableRoleArns": [ "arn:aws:iam:<region>:<aws-account-id>:role/<cross-account-emr-runtime-role>" ]            }        }    }}

Then run an update-user-profile command to allow all users inside a domain to allow users to use the runtime role:

aws --region <region> \sagemaker update-user-profile \--cli-input-json file://cross-account-update-user-profile.json

Grant access to the Amazon ECR repository

The recommended way to customize environments within EMR Serverless clusters is by using custom Docker images.

Make sure you have an Amazon ECR repository in the same AWS Region where you launch EMR Serverless applications. To create an ECR private repository, refer to Creating an Amazon ECR private repository to store images.

To grant users access to your ECR repository, add the following policies to the users and roles that create or update EMR Serverless applications using images from this repository:

{    "Version": "2012-10-17",    "Statement": [        {            "Sid": "ECRRepositoryListGetPolicy",            "Effect": "Allow",            "Action": [                "ecr:GetDownloadUrlForLayer",                "ecr:BatchGetImage",                "ecr:DescribeImages"            ],            "Resource": "ecr-repository-arn"        }    ]}

Customize the runtime environment in EMR Serverless clusters

Customizing cluster runtimes in advance is crucial for a seamless experience. As mentioned earlier, we use custom-built Docker images from an ECR repository to optimize our cluster environment, including the necessary packages and binaries. The simplest way to build these images is by using the SageMaker Studio built-in Docker functionality, as discussed in Accelerate ML workflows with Amazon SageMaker Studio Local Mode and Docker support. In this post, we build a Docker image that includes the Python 3.11 runtime and essential packages for a typical RAG workflow, such as langchain, sagemaker, opensearch-py, PyPDF2, and more.

Complete the following steps:

    Start by launching a SageMaker Studio JupyterLab notebook. Install Docker in your JupyterLab environment. For instructions, refer to Accelerate ML workflows with Amazon SageMaker Studio Local Mode and Docker support. Open a new terminal within your JupyterLab environment and verify the Docker installation by running the following:
    docker --version#ORdocker info

    Create a Docker file (refer to Using custom images with EMR Serverless) and publish the image to an ECR repository:
    # example docker file for EMR ServerlessFROM --platform=linux/amd64 public.ecr.aws/emr-serverless/spark/emr-7.0.0:latestUSER rootRUN dnf install python3.11 python3.11-pipWORKDIR /tmpRUN jar xf /usr/lib/livy/repl_2.12-jars/livy-repl_2.12-0.7.1-incubating.jar fake_shell.py && \    sed -ie 's/version < \"3\.8\"/version_info < \(3,8\)/' fake_shell.py && \    jar uvf /usr/lib/livy/repl_2.12-jars/livy-repl_2.12-0.7.1-incubating.jar fake_shell.pyWORKDIR /home/hadoopENV PYSPARK_PYTHON=/usr/bin/python3.11RUN python3.11 -m pip install cython numpy matplotlib requests boto3 pandas PyPDF2 pikepdf pycryptodome langchain==0.0.310 opensearch-py seaborn plotly dashUSER hadoop:hadoop
    From your JupyterLab terminal, run the following command to log in to the ECR repository:
    aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin 123456789012.dkr.ecr.us-east-1.amazonaws.com
    Run the following set of Docker commands to build, tag, and push the Docker image to the ECR repository:
    docker build --network sagemaker -t emr-serverless-langchain .docker tag emr-serverless-langchain:latest 123456789012.dkr.ecr.us-east-1.amazonaws.com/emr-serverless-langchain:latestdocker push --network sagemaker 123456789012.dkr.ecr.us-east-1.amazonaws.com/emr-serverless-langchain:latest

Use the EMR Serverless integration with SageMaker Studio

In this section, we demonstrate the integration of EMR Serverless into SageMaker Studio and how you can effortlessly interact with your clusters, whether they are in the same account or across different accounts. To access SageMaker Studio, complete the following steps:

    On the SageMaker console, open SageMaker Studio. Depending on your organization’s setup, you can log in to Studio either through the IAM console or using AWS IAM Identity Center.

The new Studio experience is a serverless web UI, which makes sure any updates occur seamlessly and asynchronously, without interrupting your development experience.

    Under Data in the navigation pane, choose EMR Clusters.

You can navigate to two different tabs: EMR Serverless Applications or EMR Clusters (on Amazon EC2). For this post, we focus on EMR Serverless.

Create an EMR Serverless cluster

To create a new EMR Serverless cluster, complete the following steps:

    On the EMR Serverless Applications tab, choose Create.
    In the Network connections section, you can optionally select Connect to your VPC and nest your EMR Serverless cluster within a VPC and private subnet. To customize your cluster runtime, choose a compatible custom image from your ECR repository and make sure your user profile role has the necessary permissions to pull from this repository.

Interact with EMR Serverless clusters

EMR Serverless clusters can automatically scale down to zero when not in use, eliminating costs associated with idling resources. This feature makes EMR Serverless clusters highly flexible and cost-effective. You can list, view, create, start, stop, and delete all your EMR Serverless clusters directly within SageMaker Studio.

You can also interactively attach an existing cluster to a notebook by choosing Attach to new notebook.

Build a RAG document processing engine using PySpark

In this section, we use the SageMaker Studio cluster integration to parallelize data processing at a massive scale. A typical RAG framework consists of two main components:

In the following sections, we focus on the offline document embedding generation process and explore how to use PySpark on EMR Serverless using an interactive SageMaker Studio JupyterLab notebook to efficiently parallel process PDF documents.

Deploy an embeddings model

For this use case, we use the Hugging Face All MiniLM L6 v2 embeddings model from Amazon SageMaker JumpStart. To quickly deploy this embedding model, complete the following steps:

    In SageMaker Studio, choose JumpStart in the navigation pane. Search for and choose All MiniLM L6 v2. On the model card, choose Deploy.

Your model will be ready within a few minutes. Alternatively, you can choose any other embedding models from SageMaker JumpStart by filtering Task type to Text embedding.

Interactively build an offline document embedding generator

In this section, we use code from the following GitHub repo and interactively build a document processing engine using LangChain and PySpark. Complete the following steps:

    Create a SageMaker Studio JupyterLab development environment. For more details, see Boost productivity on Amazon SageMaker Studio: Introducing JupyterLab Spaces and generative AI tools. Choose an appropriate instance type and EBS storage volume for your development environment.

You can change the instance type at any time by stopping and restarting the space.

    Clone the sample code from the following GitHub repository and use the notebook available under use-cases/pyspark-langchain-rag-processor/Offline_RAG_Processor_on_SageMaker_Studio_using_EMR-Serverless.ipynb In SageMaker Studio, under Data in the navigation pane, choose EMR Clusters. On the EMR Serverless Applications tab, choose Create to create a cluster. Select your cluster and choose Attach to new notebook.
    Attach this cluster to a JupyterLab notebook running inside a space.

Alternatively, you can attach your cluster to any notebook within your JupyterLab space by choosing Cluster and selecting the EMR Serverless cluster you want to attach to the notebook.

Make sure you choose the SparkMagic PySpark kernel when interactively running PySpark workloads.

A successful cluster connection to a notebook should result in a useable Spark session and links to the Spark UI and driver logs.

When a notebook cell is run within a SparkMagic PySpark kernel, the operations are, by default, run inside a Spark cluster. However, if you decorate the cell with %%local, it allows the code to be run on the local compute where the JupyterLab notebook is hosted. We begin by reading a list of PDF documents from Amazon S3 directly into the cluster memory, as illustrated in the following diagram.

    Use the following code to read the documents:
    default_bucket = sess.default_bucket()destination_prefix = "test/raw-pdfs"# send default bucket context to spark using send_to_spark command%%send_to_spark -i default_bucket -t str -n SRC_BUCKET_NAME%%send_to_spark -i destination_prefix -t str -n SRC_FILE_PREFIX...def list_files_in_s3_bucket_prefix(bucket_name, prefix):        s3 = boto3.client('s3')    # Paginate through the objects in the specified bucket and prefix, and collect all keys (file paths)    paginator = s3.get_paginator('list_objects_v2')    page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix)    file_paths = []    for page in page_iterator:        if "Contents" in page:            for obj in page["Contents"]:                if os.path.basename(obj["Key"]):                    file_paths.append(obj["Key"])    return file_pathsdef load_pdf_from_s3_into_memory(row):    """    Load a PDF file from an S3 bucket directly into memory.    """    try:        src_bucket_name, src_file_key = row         s3 = boto3.client('s3')        pdf_file = io.BytesIO()        s3.download_fileobj(src_bucket_name, src_file_key, pdf_file)        pdf_file.seek(0)        pdf_reader = PdfReader(pdf_file)        return (src_file_key, pdf_reader, len(pdf_reader.pages))        except Exception as e:            return (os.path.basename(src_file_key), str(e))# create a list of file references in S3all_pdf_files = list_files_in_s3_bucket_prefix(    bucket_name=SRC_BUCKET_NAME,     prefix=SRC_FILE_PREFIX)print(f"Found {len(all_pdf_files)} files ---> {all_pdf_files}")# Found 3 files ---> ['Lab03/raw-pdfs/AmazonSageMakerDeveloperGuide.pdf', 'Lab03/raw-pdfs/EC2DeveloperGuide.pdf', 'Lab03/raw-pdfs/S3DeveloperGuide.pdf']   # load documents into memory and return a single list of text-documents - map-reduce oppdfs_in_memory = pdfs_rdd.map(load_pdf_from_s3_into_memory).collect()

Next, you can visualize the size of each document to understand the volume of data you’re processing.

    You can generate charts and visualize your data within your PySpark notebook cell using static visualization tools like matplotlib and seaborn. See the following code:
    import numpy as npimport matplotlib.pyplot as pltx_labels = [pdfx.split('/')[-1] for pdfx, _, _ in pdfs_in_memory]y_values = [pages_count for _, _, pages_count in pdfs_in_memory]x = range(len(y_values))...# Adjust the layoutplt.tight_layout()# Show the plotplt.show()%matplot plt

Every PDF document contains multiple pages to process, and this task can be run in parallel using Spark. Each document is split page by page, with each page referencing the global in-memory PDFs. We achieve parallelism at the page level by creating a list of pages and processing each one in parallel. The following diagram provides a visual representation of this process.

The extracted text from each page of multiple documents is converted into a LangChain-friendly Document class.

    The CustomDocument class, shown in the following code, is a custom implementation of the Document class that allows you to convert custom text blobs into a format recognized by LangChain. After conversion, the documents are split into chunks and prepared for embedding.
    class CustomDocument:    def __init__(self, text, path, number):     ...documents_custom = [    CustomDocument(text=text, path=doc_source, number=page_num)     for text, doc_source, page_num in documents]global_text_splitter = RecursiveCharacterTextSplitter(    chunk_size=500,    chunk_overlap=50)docs = global_text_splitter.split_documents(documents_custom)print(f"Total number of docs pre-split {len(documents_custom)} | after split {len(docs)}")
    Next, you can use LangChain’s built-in OpenSearchVectorSearch to create text embeddings. However, we use a custom EmbeddingsGenerator class that parallelizes (using PySpark) the embeddings generation process using a load-balanced SageMaker hosted embeddings model endpoint:
    import timefrom langchain.vectorstores import OpenSearchVectorSearchendpoint_name = 'jumpstart-all-MiniLM-L6-v2-endpoint'interface_component = 'jumpstart-all-MiniLM-L6-v2-endpoint-comp'client = boto3.client('runtime.sagemaker', region_name=REGION)def generate_embeddings(input):    body = input.encode('utf-8')        response = client.invoke_endpoint(       ...        class EmbeddingsGenerator:     @staticmethod    def embed_documents(input_text, normalize=True):        assert isinstance(input_text, list), "Input type must me list to embed_documents function"            input_text_rdd = spark.sparkContext.parallelize(input_text)        embeddings_generated = input_text_rdd.map(generate_embeddings).collect()        ...        @staticmethod    def embed_query(input_text):        status_code, embedding = generate_embeddings(input_text)        if status_code == 200:            return embedding        else:             return Nonestart = time.time()docsearch = OpenSearchVectorSearch.from_documents(    docs,     EmbeddingsGenerator,     opensearch_url=OPENSEARCH_DOMAIN_URL,    bulk_size=len(docs),    http_auth=(user, pwd),    index_name=INDEX_NAME_OSE,    engine="faiss")end = time.time()print(f"Total Time for ingestion: {round(end - start, 2)} secs")

The custom EmbeddingsGenerator class can generate embeddings for approximately 2,500 pages (12,000 chunks) of documents in under 180 seconds using just two concurrent load-balanced SageMaker embedding model endpoints and 10 PySpark worker nodes. This process can be further accelerated by increasing the number of load-balanced embedding endpoints and worker nodes in the cluster.

Conclusion

The integration of EMR Serverless with SageMaker Studio represents a significant leap forward in simplifying and enhancing big data processing and ML workflows. By eliminating the complexities of infrastructure management, enabling seamless scalability, and optimizing costs, this powerful combination empowers organizations to use petabyte-scale data processing without the overhead typically associated with managing Spark clusters. The streamlined experience within SageMaker Studio enables data scientists and engineers to focus on what truly matters—driving insights and innovation from their data. Whether you’re processing massive datasets, building RAG systems, or exploring other advanced analytics, this integration opens up new possibilities for efficiency and scale, all within the familiar and user-friendly environment of SageMaker Studio.

As data continues to grow in volume and complexity, adopting tools like EMR Serverless and SageMaker Studio will be key to maintaining a competitive edge in the ever-evolving landscape of data-driven decision-making. We encourage you to try this feature today by setting up SageMaker Studio using the SageMaker quick setup guide. To learn more about the EMR Serverless integration with SageMaker Studio, refer to Prepare data using EMR Serverless. You can explore more generative AI samples and use cases in the GitHub repository.


About the authors

Raj Ramasubbu is a Senior Analytics Specialist Solutions Architect focused on big data and analytics and AI/ML with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS. Raj provided technical expertise and leadership in building data engineering, big data analytics, business intelligence, and data science solutions for over 18 years prior to joining AWS. He helped customers in various industry verticals like healthcare, medical devices, life science, retail, asset management, car insurance, residential REIT, agriculture, title insurance, supply chain, document management, and real estate.

Pranav Murthy is an AI/ML Specialist Solutions Architect at AWS. He focuses on helping customers build, train, deploy and migrate machine learning (ML) workloads to SageMaker. He previously worked in the semiconductor industry developing large computer vision (CV) and natural language processing (NLP) models to improve semiconductor processes using state of the art ML techniques. In his free time, he enjoys playing chess and traveling. You can find Pranav on LinkedIn.

Naufal Mir is an Senior GenAI/ML Specialist Solutions Architect at AWS. He focuses on helping customers build, train, deploy and migrate machine learning (ML) workloads to SageMaker. He previously worked at financial services institutes developing and operating systems at scale. He enjoys ultra endurance running and cycling.

Kunal Jha is a Senior Product Manager at AWS. He is focused on building Amazon SageMaker Studio as the best-in-class choice for end-to-end ML development. In his spare time, Kunal enjoys skiing and exploring the Pacific Northwest. You can find him on LinkedIn.

Ashwin Krishna is a Senior SDE working for SageMaker Studio at Amazon Web Services. He is focused on building interactive ML solutions for AWS enterprise customers to achieve their business needs. He is a big supporter of Arsenal football club and spends spare time playing and watching soccer.

Harini Narayanan is a software engineer at AWS, where she’s excited to build cutting-edge data preparation technology for machine learning at SageMaker Studio. With a keen interest in sustainability, interior design, and a love for all things green, Harini brings a thoughtful approach to innovation, blending technology with her diverse passions.

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Amazon EMR Serverless SageMaker Studio 大数据 机器学习 数据处理
相关文章