掘金 人工智能 前天 19:28
技术文档 | 使用 Pulsar Functions 构建实时 AI Pipeline
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍了如何利用Apache Pulsar的Serverless计算框架Pulsar Functions,结合Python函数实现机器学习模型的实时推测。通过一个鸢尾花数据集的案例,详细阐述了如何安装配置Pulsar,定义Python函数进行模型训练与推测,并演示了如何部署和触发Pulsar Function以实现单次或批量推测。Pulsar Functions为实时AI应用提供了灵活且强大的解决方案,能够无缝处理大量实时数据,并将推理结果直接用于下游任务,打破了传统机器学习工作流的局限性。

🚀 Pulsar Functions作为Apache Pulsar的Serverless计算框架,能够通过Python函数实现机器学习模型的实时推测,打破了传统机器学习工作流的局限性,为需要实时AI的场景提供了解决方案。

🌸 文章以经典的鸢尾花数据集为例,展示了如何构建一个实时的推理引擎。首先需要安装和配置Pulsar实例,然后定义用于机器学习推测的Python函数,该函数能够加载或训练模型,并根据输入的特征数据进行预测。

🧠 在模型训练部分,使用了`pickle`模块将训练好的决策树分类器序列化保存,以实现模型的复用和高效加载,避免了每次函数调用都重新训练模型。在生产环境中,模型文件的存储可以考虑使用Pulsar Topic或云存储等方式。

💡 通过继承Pulsar的`Function`类并实现`process`方法,可以构建Pulsar Function来处理输入数据并返回推测结果。文章还提供了部署和触发Function的命令示例,包括单次推测和批量推测的实现方式,展示了其灵活性。

🌟 Pulsar Functions能够赋能具有大量实时数据的发布/订阅系统,实现复杂推理模型的无缝处理,并将推理结果直接消费或用于下游任务,是实现实时AI Pipeline的有效途径。

通过 Pulsar Functions ,可以使用 Python 函数发布与订阅消息,实现实时的机器学习推测。

各行各业在决策过程中都有运用 AI 和机器学习的需求。尽管已有强大的算力和实时事件数据支撑,考虑到实时重新训练及评估的难度,实时 AI 这个方向尚未取得长足进展。

Pulsar Functions 是基于 Apache Pulsar 的 Serverless  计算框架,它提供了一个方便而强大的解决方案,打破了传统机器学习工作流的局限性。

利用 Pulsar 固有的 pub/sub 特性,Pulsar Functions 可以为真正的实时 AI 提供一个框架。

Pulsar Functions 有广阔的使用场景,本文将重点关注其灵活性,并探索它如何为需要实时推测和结果的机器学习 pipeline 提供解决方案。

概述

我们的目标是构建一个通过 Pulsar Functions 实现的实时推理引擎,该引擎可以一次性或批量检索低延迟的推测信息。这一目标的实现过程可以拆解为以下两步:

    安装、配置和启动 Pulsar。

    定义构成推理引擎的 Python 函数。

下文将通过一个示例介绍实现过程,重点聚焦 Python 开发以及注册/触发 Pulsar Functions 的调用接口。

启动独立的 Apache Pulsar 实例

通常,Pulsar 会被部署在一个集群中,而非本地独立实例;但独立实例的形式更便于我们在实时 AI 的场景下进行实践。

启动实例步骤可参考 Pulsar 官方文档;输入以下命令:

bin/pulsar standalone

数据集配置

实例运行后,我们需要定义一个 Pulsar Function 来为机器学习提供示例。这里用到的是经典的 Iris Dataset 鸢尾花数据集。该数据集由 Edgar Anderson 收集创建,并由 Ronald Fisher 推广,包含 3 种共 50 朵花的数据,包括:

萼片长度(sepallength):萼片的测量长度

萼片宽度(sepalwidth):萼片的测量宽度

花瓣长度(petallength):花瓣的测量长度

花瓣宽度(petalwidth):花瓣的测量宽度

种类:花的种类(山鸢尾 Setosa、变色鸢尾 Versicolor或维吉尼亚鸢尾 Virginica)

下面附上相关数据的部分概览。

我们的目标是根据输入特性准确推测花的种类。模型 pipeline 的两个核心组成部分是训练和推测,后者正是本文 Pulsar 实践的落脚点。

构建 Pulsar Function

Pulsar Function 可以通过独立的 Python 脚本创建,其中包含将要部署的函数。如前所述,模型推测 routine 将是 Pulsar Function 的基础。在这个框架中,函数本身大部分都是标准的 Python 函数,只有极少为 Pulsar 专用;这一点极大地减少了代码的部署时间以及专业 Python 开发人员的时间。

让我们从拉取鸢尾花数据、训练模型并将其写入 Pulsar Topic 这一基础流程入手。

import osimport pickleimport pandas as pdfrom pulsar import Functionfrom sklearn.model_selection import train_test_splitfrom sklearn.tree import DecisionTreeClassifierdef train_iris_model():    # If we already have an existing model file, we can load it right away    if os.path.exists("model.pkl"):        print("We are loading a pre-existing model")        return pickle.load(open("model.pkl", 'rb'))    # Read in the iris data, split into a 20% test component    iris = pd.read_csv("https://datahub.io/machine-learning/iris/r/iris.csv")    train, test = train_test_split(iris, test_size=0.2, stratify=iris['class'])    # Get the training data    X_train = train[['sepalwidth', 'sepallength', 'petalwidth', 'petallength']]    y_train = train['class']    # Get the test data    X_test = test[['sepalwidth', 'sepallength', 'petalwidth', 'petallength']]    y_test = test['class']    # Train the model    model = DecisionTreeClassifier(max_depth=3, random_state=1)    model.fit(X_train, y_train)    # Dump the model object to a file    pickle.dump(model, open("model.pkl", 'wb'))    return model

这组代码训练出一个决策树分类器(decision tree classifier),会根据萼片和花瓣的宽度和长度来推测花的种类。决策树分类器可以直观地表示为基于特征值的一系列决策,最终在到达树的叶节点(leaf node)时给出结论。下面是一个衍生出的示例决策树。

需要注意的一点是:我们在训练中使用 pickle 模块对模型进行序列化。这将把模型转储到工作目录中的文件中。如果序列化后的模型可用,后续对函数的调用将直接读取模型,而非经由重新训练。这一步对接下来的动作非常关键,因为它实现了一个单独的 routine,负责在收集新数据时对模型进行持续评估、增强和重新训练。

然而,在生产环境中,例如使用 Kubernetes 容器运行 Pulsar 时,工作目录可能是临时的,反而需要引入一些其他解决方案。几种可能性包括:

训练决策树分类器的代码完成后,就可以构建代表 Pulsar Function 的 routine 了。我们将在 Pulsar 中创建 Function 类的 IrisPredictionFunction 子类,并执行两种方法:一种是没有特定功能的 init() 方法,另一种是 process() 方法,负责在给定输入和用户上下文时,从模型中返回推测结果。

class IrisPredictionFunction(Function):    # No initialization code needed    def __init__(self):        pass    def process(self, input, context):        # Convert the input ratio to a float, if it isn't already        flower_parameters = [float(x) for x in input.split(",")]        # Get the prediction        model = train_iris_model()        return model.predict([flower_parameters])[0]

Function 不依赖于用户上下文,而是依赖输入。由于模型训练是基于萼片长度、萼片宽度、花瓣长度和花瓣宽度四个特征数据实现的,我们需要按照顺序提供每个特征数据才能得到推测结果。方便起见,我们使用以逗号分隔的字符串输入信息。

1.8,2.1,4.0,1.4

这代表一朵尺寸如下的花:

萼片长度:1.8

萼片宽度:2.1

花瓣长度:4.0

花瓣宽度:1.4

我们的 Pulsar Function 将使用此字符串,在逗号处拆分并将值转换为浮点,然后将它们传递给模型推测 routine。

部署 Pulsar Function

Pulsar独立客户端运行时,部署Function只需要创建和触发两步。创建:

bin/pulsar-admin functions create                  --tenant public   --namespace default   --name iris_prediction_1   --py iris_prediction.py   --timeout-ms 10000   --classname iris_prediction.IrisPredictionFunction   --inputs persistent://public/default/in   --output persistent://public/default/out

注意一些参数:

--name 定义了 Fucntion 任务的名(唯一即可)以便查询和管理。

--py 是上文编写的 Python 代码脚本文件的名称。

--classname 是 Python 脚本中的类名称。

触发 Function 并传递前文示例参数:

bin/pulsar-admin functions trigger   --tenant public   --namespace default   --name iris_prediction_1   --trigger-value 1.8,2.1,4.0,1.4

批量推测

简单修改上述 Function 即可实现批量推测。添加下述类,用于创建一个新的 Pulsar Function:

class IrisPredictionFunctionBulk(Function):    # No initialization code needed    def __init__(self):        pass    def process(self, input, context):        # Convert the input parameters to floats, if they aren't already        flower_parameters_str = input.split(":")        flower_parameters_split = [x.split(",") for x in flower_parameters_str]        flower_parameters_float = [[float(x) for x in y] for y in flower_parameters_split]        # Get the prediction        model = train_iris_model()        return ", ".join(model.predict(flower_parameters_float))

用于批量推测的 Function 有以下特点:

    定义了一个新的类名,以便在注册函数时区分。

    涵盖可能有不止一组花的测量值的场景,使用 : 字符分割。

    将推测结果输出为以逗号分隔的字符串。

和之前一样,我们注册 Function、定义一个新名称,并引用刚创建的正确类名称:

bin/pulsar-admin functions create   --tenant public   --namespace default   --name iris_prediction_bulk_1   --py iris_prediction.py   --timeout-ms 10000   --classname iris_prediction.IrisPredictionFunctionBulk   --inputs persistent://public/default/in   --output persistent://public/default/out

触发 Function 并一次性传递三组花的测量值:

bin/pulsar-admin functions trigger   --tenant public   --namespace default   --name iris_prediction_bulk_1   --trigger-value 1.8,2.1,4.0,1.4:0.1,0.1,0.1,0.1:1.8,2.5,0.5,5.0

以上是一个简单的案例,在 Python 中使用 Pulsar Functions 并基于测量值对鸢尾花种类进行实时推测。

这个案例仅触及到 Pulsar Functions 的浅层应用,可以当作对基于 Pulsar 实现实时AI Pipeline 的投石问路。

通过 Pulsar 核心框架赋能,具有大量实时数据复杂发布/订阅系统可以实现无缝处理; 推理模型的输出结果可以直接被消费,甚至用于下游任务。

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Pulsar Functions 实时AI 机器学习 Python 推测
相关文章