掘金 人工智能 07月09日 16:04
LangGraph + MCP + Ollama:构建强大代理 AI 的关键(二)
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文深入探讨了基于MCP框架构建AI代理的实践过程。文章首先介绍了如何使用异步流程创建AI代理,包括与MCP服务器的通信、工具的获取以及对话状态的管理。随后,详细阐述了异步工具执行器的设计,该执行器能够动态处理工具调用,并支持异步和同步工具的执行。此外,还介绍了结构化对话流程的构建,包括节点的添加、路由函数的定义以及边的连接。文章还涉及了系统提示的生成、聊天机器人的创建以及工具的实现,如图像生成、数据可视化和Python代码执行。最后,文章总结了MCP作为通用开放标准的优势,以及其在简化AI集成流程方面的作用。

🤖 **Agent.py**: 通过`create_agent`函数,使用异步流程构建AI代理,集成`MultiServerMCPClient`进行服务器通信,并利用`StateGraph`管理对话状态。

🛠️ **异步工具执行器**: 设计`async_tool_executor`动态处理工具调用,支持不同格式的工具调用,并根据`tool_name`匹配工具,执行异步或同步函数,处理错误并提供详细错误信息。

💬 **对话流程构建**: 通过添加异步工具执行器节点和路由函数,构建结构化对话流程。路由函数根据消息中的工具调用,导向工具或结束对话,实现多次工具交互。

💡 **Node.py**: 设计`get_system_prompt`函数动态生成AI助手系统提示,定义助手角色和能力,并结合`create_chatbot`函数,处理用户输入并生成AI响应,维护结构化的对话历史。

🖼️ **Server.py**: 开发生成图像、可视化数据和执行Python代码的工具,包括`generate_image`、`data_visualization`和`python_repl`。`get_tools`函数返回可用工具列表,确保仅包含功能正常的工具。

接着上一次未完结的内容,我们将深入代码中,继续进行探索。

Agent.py

我将create_agent函数设计为异步流程以构建 AI 代理。设置了一个async函数,它接受可选参数docs_info,用于向聊天机器人提供相关数据。

通过async with上下文管理器集成MultiServerMCPClient,确保使用服务器发送事件(SSE)与 MCP 服务器http://localhost:8000/sse进行通信,超时时间为 30 秒。

通过client.get_tools()调用获取必要的 MCP 工具,实现高级功能。为管理对话状态,使用MessagesState构建了StateGraph

最后通过create_chatbot(docs_info)创建聊天机器人节点,使其能够处理和交互提供的文档:

async def create_agent(docs_info=None):    async with MultiServerMCPClient(        {            "server":{                "url":"http://localhost:8000/sse",                "transport":"sse",                "timeout": 30            }        }    ) as client:        # 获取MCP工具        tools = client.get_tools()                # 创建图构建器        graph_builder = StateGraph(MessagesState)                # 创建节点        chatbot_node = create_chatbot(docs_info)        graph_builder.add_node("chatbot", chatbot_node)

异步工具执行器设计

async_tool_executor被设计为动态处理工具调用,接收包含对话中交换消息列表的state作为输入。提取最后一条消息(messages[-1])检查工具调用,直接从tool_callsadditional_kwargs中获取。

若未找到工具调用,则原样返回消息;若找到,则将消息复制到new_messages并遍历每个工具调用,提取tool_nametool_argstool_id,支持字典和对象格式以保证灵活性。通过匹配tool_name查找对应工具,若未找到则生成包含有效工具列表的错误消息。

若工具存在,使用asyncio.iscoroutinefunction()判断是否为异步函数:异步函数通过await tool.coroutine(**tool_args)执行,否则调用tool.func(**tool_args)tool(**tool_args)。通过捕获异常处理错误,并将详细错误消息附加到new_messages

        async def async_tool_executor(state):            messages = state["messages"]            last_message = messages[-1]                        # 检查是否存在工具调用            tool_calls = None            if hasattr(last_message, "tool_calls"):                tool_calls = last_message.tool_calls            elif hasattr(last_message, "additional_kwargs") and "tool_calls" in last_message.additional_kwargs:                tool_calls = last_message.additional_kwargs["tool_calls"]                            if not tool_calls:                return {"messages": messages}                        # 处理每个工具调用            new_messages = messages.copy()                        for tool_call in tool_calls:                # 处理不同格式的tool_call                if isinstance(tool_call, dict):                    tool_name = tool_call.get("name")                    tool_args = tool_call.get("args", {})                    tool_id = tool_call.get("id", "tool-call-id")                else:                    tool_name = tool_call.name                    tool_args = tool_call.args if hasattr(tool_call, "args") else {}                    tool_id = getattr(tool_call, "id", "tool-call-id")                                # 打印调试信息                print(f"执行工具: {tool_name}")                print(f"工具参数: {tool_args}")                                # 查找匹配的工具                tool = next((t for t in tools if t.name == tool_name), None)                                if not tool:                    # 工具未找到                    tool_error = f"错误: {tool_name} 不是有效工具,请尝试以下工具: {[t.name for t in tools]}。"                    new_messages.append(AIMessage(content=tool_error))                else:                    try:                        # 执行异步工具                        if asyncio.iscoroutinefunction(tool.coroutine):                            result = await tool.coroutine(**tool_args)                        else:                            # 必要时回退到同步执行                            result = tool.func(**tool_args) if hasattr(tool, 'func') else tool(**tool_args)                                                print(f"工具结果: {result}")                                                # 添加工具结果                        new_messages.append(ToolMessage(                            content=str(result),                            tool_call_id=tool_id,                            name=tool_name                        ))                    except Exception as e:                        # 处理错误                        error_msg = f"错误: {str(e)}\n 请修复错误。"                        print(f"工具错误: {error_msg}")                        new_messages.append(AIMessage(content=error_msg))                        return {"messages": new_messages}

对话流程构建

通过将异步工具执行器节点和路由函数集成到聊天机器人的图中,设计结构化对话流程:

    添加async_tool_executor作为名为"tools"的节点,允许系统动态处理工具调用。

    创建路由函数,根据对话中最后一条消息确定下一步:若 AI 发送的最后一条消息包含工具调用,则导向"tools",否则结束对话。

    向图中添加边:从"chatbot"开始,有条件地路由到"tools""end",并将"tools"循环回"chatbot"以支持多次工具交互。

        # 添加异步工具执行器节点    graph_builder.add_node("tools", async_tool_executor)        # 定义处理工具调用的路由函数    def router(state):        messages = state["messages"]        last_message = messages[-1]                has_tool_calls = False        if isinstance(last_message, AIMessage):            if hasattr(last_message, "tool_calls") and last_message.tool_calls:                has_tool_calls = True            elif hasattr(last_message, "additional_kwargs") and last_message.additional_kwargs.get("tool_calls"):                has_tool_calls = True                return "tools" if has_tool_calls else "end"        # 添加边    graph_builder.add_edge(START, "chatbot")    graph_builder.add_conditional_edges(        "chatbot",        router,        {            "tools": "tools",            "end": END        }    )    graph_builder.add_edge("tools", "chatbot")        # 编译图    graph = graph_builder.compile()    return graph, client  # 返回client以保持其存活状态

Node.py

我设计了get_system_prompt函数来动态生成 AI 助手的系统提示,确保其在清晰的指导原则和上下文感知下运行。首先,使用datetime.now().strftime("%Y-%m-%d")格式化当前日期,并嵌入提示中以供实时参考。

然后,定义助手的角色和能力,列出三个主要工具:generate_image(使用 DALL-E 生成图像)、data_visualization(使用 matplotlib 创建图表)和python_repl(Python 执行环境)。

def get_system_prompt(docs_info=None):    system_prompt = f"""    Today is {datetime.now().strftime("%Y-%m-%d")}    You are a helpful AI Assistant that can use these tools:    - generate_image: Generate an image using DALL-E based on a prompt    - data_visualization: Create charts with Python and matplotlib    - python_repl: Execute Python code        When you call image generation or data visualization tool, only answer the fact that you generated, not base64 code or url.    Once you generated image by a tool, then do not call it again in one answer.    """    if docs_info:        docs_context = "\n\nYou have access to these documents:\n"        for doc in docs_info:            docs_context += f"- {doc['name']}: {doc['type']}\n"        system_prompt += docs_context            system_prompt += "\nYou should always answer in same language as user's ask."    return system_prompt

开发了create_chatbot函数来处理用户输入并生成 AI 响应。它使用ChatPromptTemplate技术将系统指令与用户消息结合,其中系统指令来自get_system_prompt(docs_info)

通过将提示通过管道(|)传递给 LLM,创建无缝的处理链。该函数通过将字符串转换为HumanMessage对象来确保消息格式的一致性。聊天机器人使用用户消息调用 LLM 并附加响应,从而维护结构化的对话历史。

助手遵循预定义规则,同时适应查询需求。它保持与工具无关的特性,确保响应的灵活性。这种设置可提供交互式和动态的聊天机器人体验。

def create_chatbot(docs_info=None):    prompt = ChatPromptTemplate.from_messages([        SystemMessagePromptTemplate.from_template(get_system_prompt(docs_info)),        HumanMessagePromptTemplate.from_template("{input}")    ])        # Use the LLM without binding tools    chain = prompt | llm        def chatbot(state: MessagesState):        # Ensure messages are in the right format        if isinstance(state["messages"], str):            from langchain_core.messages import HumanMessage            messages = [HumanMessage(content=state["messages"])]        else:            messages = state["messages"]                    response = chain.invoke(messages)        return {"messages": messages + [response]}        return chatbot

Server.py

我开发了一组工具,用于在多工具系统中生成图像、可视化数据和执行 Python 代码。generate_image工具使用 DALL-E 基于提示词生成图像:它会先确保提示词有效,然后异步调用 OpenAI API 生成图像。成功时返回生成图像的 URL,出错时提供错误信息。

data_visualization工具执行包含 matplotlib 的 Python 代码以创建图表,并将结果保存为 base64 编码的 PNG 图像。python_repl工具使用 REPL 环境运行 Python 代码,支持动态执行用户输入。每个工具均设计为处理错误并向用户返回有意义的响应。

@mcp.tool()async def generate_image(prompt: str) -> str:    """    使用DALL-E基于给定提示词生成图像。    """    if not isinstance(prompt, str) or not prompt.strip():        raise ValueError("无效的提示词")        try:        # 由于这是异步函数,需正确处理同步的OpenAI调用        loop = asyncio.get_event_loop()        response = await loop.run_in_executor(            None,             lambda: client.images.generate(                model="dall-e-3",                prompt=prompt,                size="1024x1024",                quality="standard",                n=1            )        )                # 返回成功消息和URL        return f"成功生成了{prompt}的图像!URL如下:{response.data[0].url}"    except Exception as e:        return f"图像生成错误:{str(e)}"repl = PythonREPL()@mcp.tool()def data_visualization(code: str):    """执行Python代码,使用matplotlib进行可视化。"""    try:        repl.run(code)        buf = io.BytesIO()        plt.savefig(buf, format='png')        buf.seek(0)        img_str = base64.b64encode(buf.getvalue()).decode()        return f"data:image/png;base64,{img_str}"    except Exception as e:        return f"图表创建错误:{str(e)}"        @mcp.tool()def python_repl(code: str):    """执行Python代码。"""    return repl.run(code)

我创建了get_tools函数以返回可用工具列表,确保仅包含功能正常的工具。该函数从基础工具集开始:generate_imagepython_repldata_visualization。若提供了额外的retriever_tool,则将其添加到工具列表中,最后返回完整的可用工具列表。

最后,脚本通过调用mcp.run(transport="sse")使用服务器发送事件(SSE)作为传输方式运行 MCP 服务器,确保工具在应用程序执行时就绪。

def get_tools(retriever_tool=None):    # 仅包含可用的工具    base_tools = [generate_image, python_repl, data_visualization]        if retriever_tool:        base_tools.append(retriever_tool)        return base_toolsif __name__ == "__main__":    mcp.run(transport="sse")

main.py

我设计了main函数来运行一个异步代理,动态处理用户输入并与工具交互。该函数首先通过await create_agent()创建代理和客户端,然后通过命令行提示用户输入,并使用用户请求构建初始消息(HumanMessage)。

函数通过agent.ainvoke()异步调用代理,处理请求并处理响应:若响应来自用户,则打印用户输入;若来自工具,则打印结果并检查是否为图像生成,提取图像 URL。处理过程中捕获并打印任何异常。

最后,保持客户端存活以确保操作持续。脚本通过asyncio.run(main())异步执行。

async def main():    # 创建代理    agent, client = await create_agent()        # 从命令行获取用户输入    user_input = input("What would you like to ask? ")        # 创建合适的初始消息    initial_message = HumanMessage(content=user_input)        try:        # 异步使用代理        print("Processing your request...")        result = await agent.ainvoke({"messages": [initial_message]})                # 打印结果        for message in result["messages"]:            if hasattr(message, "type") and message.type == "human":                print(f"User: {message.content}")            elif hasattr(message, "type") and message.type == "tool":                print(f"Tool Result: {message.content}")                # 如果是图像生成结果,提取URL                if "image" in message.content.lower() and "url" in message.content.lower():                    print("Image Generated Successfully!")            else:                print(f"AI: {message.content}")    except Exception as e:        print(f"Error: {str(e)}")        # 保持客户端存活直到所有操作完成    # 在实际应用中,您需要根据需要保持客户端活跃if __name__ == "__main__":    asyncio.run(main())

结论

MCP 不仅是一个简单的升级工具调用协议,更是一次重大的范式升级。

MCP 提供了通用开放标准,使 AI 系统能够以标准化方式连接各种数据源、工具和服务 —— 这减少了为每个数据源构建独立连接器的需求,简化了 AI 集成流程。

当前非智能服务可通过 MCP 将自身功能暴露为 “工具” 供 LLM(大型语言模型)调用,这使 LLM 能够与现有系统交互并执行任务,而无需对现有系统进行重大修改。

学习是一个过程,只要坚持学习,就会面临挑战。天道酬勤,越努力,越优秀。

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

AI代理 MCP框架 工具调用 异步编程
相关文章