AWS Machine Learning Blog 05月31日 00:35
Using Amazon OpenSearch ML connector APIs
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍了如何将Amazon OpenSearch与第三方机器学习(ML)服务集成,以增强数据处理能力。文章重点介绍了两种ML连接器:Amazon Comprehend和Amazon Bedrock。通过Amazon Comprehend连接器,可以实现 ingested 文档的语言自动检测。通过Amazon Bedrock连接器,可以调用Amazon Titan Text Embeddings v2模型,从 ingested 文档中创建嵌入向量,从而实现语义搜索。文章提供了详细的配置步骤、代码示例和一个CloudFormation模板,方便用户快速搭建和使用这些功能。

🗣️介绍了使用Amazon Comprehend ML连接器,通过调用LangDetect API来检测 ingested 文档的语言。文章详细说明了如何配置OpenSearch以访问Amazon Comprehend,包括创建IAM角色、设置OpenSearch ML连接器,并注册Amazon Comprehend API连接器。

🔑阐述了如何创建一个OpenSearch的Ingest Pipeline,该Pipeline可以调用Amazon Comprehend API,并将调用结果(即检测到的语言及其置信度)添加到被索引的文档中。通过input_map和output_map的设置,可以灵活地控制发送给API的数据以及处理返回结果的方式。

🛏️介绍了使用Amazon Bedrock ML连接器,通过调用Amazon Titan Text Embeddings v2模型,从 ingested 文档中创建嵌入向量,从而实现语义搜索。文章提供包含英语、法语、德语句子的JSON文件,用于演示在不同语言环境中进行语义搜索的效果。

When ingesting data into Amazon OpenSearch, customers often need to augment data before putting it into their indexes. For instance, you might be ingesting log files with an IP address and want to get a geographic location for the IP address, or you might be ingesting customer comments and want to identify the language they are in. Traditionally, this requires an external process that complicates data ingest pipelines and can cause a pipeline to fail. OpenSearch offers a wide range of third-party machine learning (ML) connectors to support this augmentation.

This post highlights two of these third-party ML connectors. The first connector we demonstrate is the Amazon Comprehend connector. In this post, we show you how to use this connector to invoke the LangDetect API to detect the languages of ingested documents.

The second connector we demonstrate is the Amazon Bedrock connector to invoke the Amazon Titan Text Embeddings v2 model so that you can create embeddings from ingested documents and perform semantic search.

Solution overview

We use Amazon OpenSearch with Amazon Comprehend to demonstrate the language detection feature. To help you replicate this setup, we’ve provided the necessary source code, an Amazon SageMaker notebook, and an AWS CloudFormation template. You can find these resources in the sample-opensearch-ml-rest-api GitHub repo.

The reference architecture shown in the preceding figure shows the components used in this solution. A SageMaker notebook is used as a convenient way to execute the code that is provided in the Github repository provided above.

Prerequisites

To run the full demo using the sample-opensearch-ml-rest-api, make sure you have an AWS account with access to:

Part 1: The Amazon Comprehend ML connector

Set up OpenSearch to access Amazon Comprehend

Before you can use Amazon Comprehend, you need to make sure that OpenSearch can call Amazon Comprehend. You do this by supplying OpenSearch with an IAM role that has access to invoke the DetectDominantLanguage API. This requires the OpenSearch Cluster to have fine grained access control enabled. The CloudFormation template creates a role for this called <Your Region>-<Your Account Id>-SageMaker-OpenSearch-demo-role. Use the following steps to attach this role to the OpenSearch cluster.

    Open the OpenSearch Dashboard console—you can find the URL in the output of the CloudFormation template—and sign in using the username and password you provided.
    Choose Security in the left-hand menu (if you don’t see the menu, choose the three horizontal lines icon at the top left of the dashboard). From the security menu, select Roles to manage the OpenSearch roles. In the search box. enter ml_full_access role. Select the Mapped users link to map the IAM role to this OpenSearch role. On the Mapped users screen, choose Manage mapping to edit the current mappings. Add the IAM role mentioned previously to map it to the ml_full_access role, this will allow OpenSearch to access the needed AWS resources from the ml-commons plugin. Enter your IAM role Amazon Resource Name (ARN) (arn:aws:iam::<your account id>:role/<your region>-<your account id>-SageMaker-OpenSearch-demo-role) in the backend roles field and choose Map.

Set up the OpenSearch ML connector to Amazon Comprehend

In this step, you set up the ML connector to connect Amazon Comprehend to OpenSearch.

    Get an authorization token to use when making the call to OpenSearch from the SageMaker notebook. The token uses an IAM role attached to the notebook by the CloudFormation template that has permissions to call OpenSearch. That same role is mapped to the OpenSearch admin role in the same way you just mapped the role to access Amazon Comprehend. Use the following code to set this up:
awsauth = AWS4Auth(credentials.access_key,credentials.secret_key,region,'es',session_token=credentials.token)
    Create the connector. It needs a few pieces of information:
      It needs a protocol. For this example, use aws_sigv4, which allows OpenSearch to use an IAM role to call Amazon Comprehend. Provide the ARN for this role, which is the same role you used to set up permissions for the ml_full_access role. Provide comprehend as the service_name, and DetectDominateLanguage as the api_name. Provide the URL to Amazon Comprehend and set up how to call the API and what data to pass to it.

The final call looks like:

comprehend = boto3.client('comprehend', region_name='us-east-1')path = '/_plugins/_ml/connectors/_create'url = host + pathpayload = {  "name": "Comprehend lang identification",  "description": "comprehend model",  "version": 1,  "protocol": "aws_sigv4",  "credential": {    "roleArn": sageMakerOpenSearchRoleArn  },  "parameters": {    "region": "us-east-1",    "service_name": "comprehend",    "api_version": "20171127",    "api_name": "DetectDominantLanguage",    "api": "Comprehend_${parameters.api_version}.${parameters.api_name}",    "response_filter": "$"  },  "actions": [    {      "action_type": "predict",      "method": "POST",      "url": "https://${parameters.service_name}.${parameters.region}.amazonaws.com",      "headers": {        "content-type": "application/x-amz-json-1.1",        "X-Amz-Target": "${parameters.api}"      },      "request_body": "{\"Text\": \"${parameters.Text}\"}"     }  ]}comprehend_connector_response = requests.post(url, auth=awsauth, json=payload)comprehend_connector = comprehend_connector_response.json()["connector_id"]

Register the Amazon Comprehend API connector

The next step is to register the Amazon Comprehend API connector with OpenSearch using the Register Model API from OpenSearch.

path = '/_plugins/_ml/models/_register'url = host + pathpayload = {    "name": "comprehend lang id API",    "function_name": "remote",    "description": "API to detect the language of text",    "connector_id": comprehend_connector}headers = {"Content-Type": "application/json"}response = requests.post(url, auth=awsauth, json=payload, headers=headers)comprehend_model_id = response.json()['model_id']

As of OpenSearch 2.13, when the model is first invoked, it’s automatically deployed. Prior to 2.13 you would have to manually deploy the model within OpenSearch.

Test the Amazon Comprehend API in OpenSearch

With the connector in place, you need to test the API to make sure it was set up and configured correctly.

    Make the following call to OpenSearch.
path = '/_plugins/_ml/models/'+ comprehend_model_id + '/_predict'url = host + pathheaders = {"Content-Type": "application/json"}payload = {    "parameters": {        "Text": "你知道厕所在哪里吗"    }}response = requests.post(url, auth=awsauth, json=payload, headers=headers)print(response.json())
    You should get the following result from the call, showing the language code as zh with a score of 1.0:
{   "inference_results":[      {         "output":[            {               "name":"response",               "dataAsMap":{                  "response":{                     "Languages":[                        {                           "LanguageCode":"zh",                           "Score":1.0                        }                     ]                  }               }            }         ],         "status_code":200      }   ]}

Create an ingest pipeline that uses the Amazon Comprehend API to annotate the language

The next step is to create a pipeline in OpenSearch that calls the Amazon Comprehend API and adds the results of the call to the document being indexed. To do this, you provide both an input_map and an output_map. You use these to tell OpenSearch what to send to the API and how to handle what comes back from the call.

path = '/_ingest/pipeline/comprehend_language_identification_pipeline'url = host + pathpayload = {  "description": "ingest identify lang with the comprehend API",  "processors":[    {      "ml_inference": {        "model_id": comprehend_model_id,        "input_map": [            {               "Text": "Text"            }        ],        "output_map": [            {                 "detected_language": "response.Languages[0].LanguageCode",               "language_score": "response.Languages[0].Score"            }        ]      }    }  ]}headers = {"Content-Type": "application/json"}response = requests.put(url, auth=awsauth, json=payload, headers=headers)

You can see from the preceding code that you are pulling back both the top language result and its score from Amazon Comprehend and adding those fields to the document.

Part 2: The Amazon Bedrock ML connector

In this section, you use Amazon OpenSearch with Amazon Bedrock through the ml-commons plugin to perform a multilingual semantic search. Make sure that you have the solution prerequisites in place before attempting this section.

In the SageMaker instance that was deployed for you, you can see the following files: english.json, french.json, german.json.

These documents have sentences in their respective languages that talk about the term spring in different contexts. These contexts include spring as a verb meaning to move suddenly, as a noun meaning the season of spring, and finally spring as a noun meaning a mechanical part. In this section, you deploy Amazon Titan Text Embeddings model v2 using the ml connector for Amazon Bedrock. You then use this embeddings model to create vectors of text in three languages by ingesting the different language JSON files. Finally, these vectors are stored in Amazon OpenSearch to enable semantic searches to be used across the language sets.

Amazon Bedrock provides streamlined access to various powerful AI foundation models through a single API interface. This managed service includes models from Amazon and other leading AI companies. You can test different models to find the ideal match for your specific needs, while maintaining security, privacy, and responsible AI practices. The service enables you to customize these models with your own data through methods such as fine-tuning and Retrieval Augmented Generation (RAG). Additionally, you can use Amazon Bedrock to create AI agents that can interact with enterprise systems and data, making it a comprehensive solution for developing generative AI applications.

The reference architecture in the preceding figure shows the components used in this solution.

(1) First we must create the OpenSearch ML connector via running code within the Amazon SageMaker notebook. The connector essentially creates a Rest API call to any model, we specifically want to create a connector to call the Titan Embeddings model within Amazon Bedrock.

(2) Next, we must create an index to later index our language documents into. When creating an index, you can specify its mappings, settings, and aliases.

(3) After creating an index within Amazon OpenSearch, we want to create an OpenSearch Ingestion pipeline that will allow us to streamline data processing and preparation for indexing, making it easier to manage and utilize the data. (4) Now that we have created an index and set up a pipeline, we can start indexing our documents into the pipeline.

(5 – 6) We use the pipeline in OpenSearch that calls the Titan Embeddings model API. We send our language documents to the titan embeddings model, and the model returns vector embeddings of the sentences.

(7) We store the vector embeddings within our index and perform vector semantic search.

While this post highlights only specific areas of the overall solution, the SageMaker notebook has the code and instructions to run the full demo yourself.

Before you can use Amazon Bedrock, you need to make sure that OpenSearch can call Amazon Bedrock. .

Load sentences from the JSON documents into dataframes

Start by loading the JSON document sentences into dataframes for more structured organization. Each row can contain the text, embeddings, and additional contextual information:

import jsonimport pandas as pddef load_sentences(file_name):    sentences = []    with open(file_name, 'r', encoding='utf-8') as file:        for line in file:            try:                data = json.loads(line)                if 'sentence' in data and 'sentence_english' in data:                    sentences.append({                        'sentence': data['sentence'],                        'sentence_english': data['sentence_english']                    })            except json.JSONDecodeError:                # Skip lines that are not valid JSON (like the index lines)                continue        return pd.DataFrame(sentences)# Usagegerman_df = load_sentences('german.json')english_df = load_sentences('english.json')french_df = load_sentences('french.json')# print(french_df.head())

Create the OpenSearch ML connector to Amazon Bedrock

After loading the JSON documents into dataframes, you’re ready to set up the OpenSearch ML connector to connect Amazon Bedrock to OpenSearch.

    The connector needs the following information.
      It needs a protocol. For this solution, use aws_sigv4, which allows OpenSearch to use an IAM role to call Amazon Bedrock. Provide the same role used earlier to set up permissions for the ml_full_access role. Provide the service_name, model, dimensions of the model, and embedding type.

The final call looks like the following:

payload = {  "name": "Amazon Bedrock Connector: embedding",  "description": "The connector to bedrock Titan embedding model",  "version": 1,  "protocol": "aws_sigv4",  "parameters": {    "region": "us-east-1",    "service_name": "bedrock",    "model": "amazon.titan-embed-text-v2:0",    "dimensions": 1024,    "normalize": True,    "embeddingTypes": ["float"]  },  "credential": {    "roleArn": sageMakerOpenSearchRoleArn  },  "actions": [    {      "action_type": "predict",      "method": "POST",      "url": "https://bedrock-runtime.${parameters.region}.amazonaws.com/model/${parameters.model}/invoke",      "headers": {        "content-type": "application/json",        "x-amz-content-sha256": "required"      },      "request_body": "{ \"inputText\": \"${parameters.inputText}\", \"dimensions\": ${parameters.dimensions}, \"normalize\": ${parameters.normalize}, \"embeddingTypes\": ${parameters.embeddingTypes} }",      "pre_process_function": "connector.pre_process.bedrock.embedding",      "post_process_function": "connector.post_process.bedrock.embedding"    }  ]}bedrock_connector_response = requests.post(url, auth=awsauth, json=payload, headers=headers)bedrock_connector_3 = bedrock_connector_response.json()["connector_id"]print('Connector id: ' + bedrock_connector_3)

Test the Amazon Titan Embeddings model in OpenSearch

After registering and deploying the Amazon Titan Embeddings model using the Amazon Bedrock connector, you can test the API to verify that it was set up and configured correctly. To do this, make the following call to OpenSearch:

headers = {"Content-Type": "application/json"}payload = {  "parameters": {    "inputText": "It's nice to see the flowers bloom and hear the birds sing in the spring"  }}response = requests.post(url, auth=awsauth, json=payload, headers=headers)print(response.json())

You should get a formatted result, similar to the following, from the call that shows the generated embedding from the Amazon Titan Embeddings model:

{'inference_results': [{'output': [{'name': 'sentence_embedding', 'data_type': 'FLOAT32', 'shape': [1024], 'data': [-0.04092199727892876, 0.052057236433029175, -0.03354490175843239, 0.04398418962955475, -0.001235315459780395, -0.03284895047545433, -0.014197427779436111, 0.0098129278048…

The preceding result is significantly shortened compared to the actual embedding result you might receive. The purpose of this snippet is to show you the format.

Create the index pipeline that uses the Amazon Titan Embeddings model

Create a pipeline in OpenSearch. You use this pipeline to tell OpenSearch to send the fields you want embeddings for to the embeddings model.

pipeline_name = "titan_embedding_pipeline_v2"url = f"{host}/_ingest/pipeline/{pipeline_name}"pipeline_body = {    "description": "Titan embedding pipeline",    "processors": [        {            "text_embedding": {                "model_id": bedrock_model_id,                "field_map": {                    "sentence": "sentence_vector"                }            }        }    ]}response = requests.put(url, auth=awsauth, json=pipeline_body, headers={"Content-Type": "application/json"})print(response.text)

Create an index

With the pipeline in place, the next step is to create an index that will use the pipeline. There are three fields in the index:

index_name = 'bedrock-knn-index-v2'url = f'{host}/{index_name}'mapping = {    "mappings": {        "properties": {            "sentence_vector": {                "type": "knn_vector",                "dimension": 1024,                  "method": {                    "name": "hnsw",                    "space_type": "l2",                    "engine": "nmslib"                },                "store":True            },            "sentence":{                "type": "text",                "store": True            },            "sentence_english":{                "type": "text",                "store": True            }        }    },    "settings": {        "index": {            "knn": True,            "knn.space_type": "cosinesimil",            "default_pipeline": pipeline_name        }    }}response = requests.put(url, auth=awsauth, json=mapping, headers={"Content-Type": "application/json"})print(f"Index creation response: {response.text}")

Load dataframes into the index

Earlier in this section, you loaded the sentences from the JSON documents into dataframes. Now, you can index the documents and generate embeddings for them using the Amazon Titan Text Embeddings Model v2. The embeddings will be stored in the sentence_vector field.

index_name = "bedrock-knn-index-v2"def index_documents(df, batch_size=100):    total = len(df)    for start in range(0, total, batch_size):        end = min(start + batch_size, total)        batch = df.iloc[start:end]        bulk_data = []        for _, row in batch.iterrows():            # Prepare the action metadata            action = {                "index": {                    "_index": index_name                }            }            # Prepare the document data            doc = {                "sentence": row['sentence'],                "sentence_english": row['sentence_english']            }                        # Add the action and document to the bulk data            bulk_data.append(json.dumps(action))            bulk_data.append(json.dumps(doc))        # Join the bulk data with newlines        bulk_body = "\n".join(bulk_data) + "\n"        # Send the bulk request        bulk_url = f"{host}/_bulk"        response = requests.post(bulk_url, auth=awsauth, data=bulk_body, headers={"Content-Type": "application/x-ndjson"})        if response.status_code == 200:            print(f"Successfully indexed batch {start}-{end} of {total}")        else:            print(f"Error indexing batch {start}-{end} of {total}: {response.text}")        # Optional: add a small delay to avoid overwhelming the cluster        time.sleep(1)# Index your documentsprint("Indexing German documents:")index_documents(german_df)print("\nIndexing English documents:")index_documents(english_df)print("\nIndexing French documents:")index_documents(french_df)

Perform semantic k-NN across the documents

The final step is to perform a k-nearest neighbor (k-NN) search across the documents.

# Define your OpenSearch host and index nameindex_name = "bedrock-knn-index-v2"def semantic_search(query_text, k=5):    search_url = f"{host}/{index_name}/_search"    # First, index the query to generate its embedding    index_doc = {        "sentence": query_text,        "sentence_english": query_text  # Assuming the query is in English    }    index_url = f"{host}/{index_name}/_doc"    index_response = requests.post(index_url, auth=awsauth, json=index_doc, headers={"Content-Type": "application/json"})        if index_response.status_code != 201:        print(f"Failed to index query document: {index_response.text}")        return []        # Retrieve the indexed query document to get its vector    doc_id = index_response.json()['_id']    get_url = f"{host}/{index_name}/_doc/{doc_id}"    get_response = requests.get(get_url, auth=awsauth)    query_vector = get_response.json()['_source']['sentence_vector']        # Now perform the KNN search    search_query = {        "size": 30,        "query": {            "knn": {                "sentence_vector": {                    "vector": query_vector,                    "k": 30                }            }        },        "_source": ["sentence", "sentence_english"]    }    search_response = requests.post(search_url, auth=awsauth, json=search_query, headers={"Content-Type": "application/json"})        if search_response.status_code != 200:        print(f"Search failed with status code {search_response.status_code}")        print(search_response.text)        return []    # Clean up - delete the temporary query document    delete_url = f"{host}/{index_name}/_doc/{doc_id}"    requests.delete(delete_url, auth=awsauth)    return search_response.json()['hits']['hits']# Example usagequery = "le soleil brille"results = semantic_search(query)if results:    print(f"Search results for: '{query}'")    for result in results:        print(f"Score: {result['_score']}")        print(f"Sentence: {result['_source']['sentence']}")        print(f"English: {result['_source']['sentence_english']}")        print()else:    print("No results found or search failed.")

The example query is in French and can be translated to the sun is shining. Keeping in mind that the JSON documents have sentences that use spring in different contexts, you’re looking for query results and vector matches of sentences that use spring in the context of the season of spring.

Here are some of the results from this query:

Search results for: ' le soleil brille'Score: 0.40515712Sentence: Les premiers rayons de soleil au printemps réchauffent la terre.English: The first rays of spring sunshine warm the earth.Score: 0.40117615Sentence: Die ersten warmen Sonnenstrahlen kitzeln auf der Haut im Frühling.English: The first warm sun rays tickle the skin in spring.Score: 0.3999985Sentence: Die ersten Sonnenstrahlen im Frühling wecken die Lebensgeister.English: The first rays of sunshine in spring awaken the spirits.

This shows that the model can provide results across all three languages. It is important to note that the confidence scores for these results might be low because you’ve only ingested a couple documents with a handful of sentences in each for this demo. To increase confidence scores and accuracy, ingest a robust dataset with multiple languages and plenty of sentences for reference.

Clean Up

To avoid incurring future charges, go to the AWS Management Console for CloudFormation console and delete the stack you deployed. This will terminate the resources used in this solution.

Benefits of using the ML connector for machine learning model integration with OpenSearch

There are many ways you can perform k-nn semantic vector searches; a popular methods is to deploy external Hugging Face sentence transformer models to a SageMaker endpoint. The following are the benefits of using the ML connector approach we showed in this post, and why should you use it instead of deploying models to a SageMaker endpoint:

Conclusion

Now that you’ve seen how you can use the OpenSearch ML connector to augment your data with external REST calls, we recommend that you visit the GitHub repo if you haven’t already and walk through the full demo yourselves. The full demo shows how you can use Amazon Comprehend for language detection and how to use Amazon Bedrock for multilingual semantic vector search, using the ml-connector plugin for both use cases. It also has sample text and JSON documents to ingest so you can see how the pipeline works.


About the Authors

John Trollinger is a Principal Solutions Architect supporting the World Wide Public Sector with a focus on OpenSearch and Data Analytics. John has been working with public sector customers over the past 25 years helping them deliver mission capabilities. Outside of work, John likes to collect AWS certifications and compete in triathlons.

Shwetha Radhakrishnan is a Solutions Architect for Amazon Web Services (AWS) with a focus in Data Analytics & Machine Learning. She has been building solutions that drive cloud adoption and help empower organizations to make data-driven decisions within the public sector. Outside of work, she loves dancing, spending time with friends and family, and traveling.

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Amazon OpenSearch 机器学习 Amazon Comprehend Amazon Bedrock 语义搜索
相关文章