接着上一次未完结的内容,我们将深入代码中,继续进行探索。
Agent.py
我将create_agent
函数设计为异步流程以构建 AI 代理。设置了一个async
函数,它接受可选参数docs_info
,用于向聊天机器人提供相关数据。
通过async with
上下文管理器集成MultiServerMCPClient
,确保使用服务器发送事件(SSE)与 MCP 服务器http://localhost:8000/sse
进行通信,超时时间为 30 秒。
通过client.get_tools()
调用获取必要的 MCP 工具,实现高级功能。为管理对话状态,使用MessagesState
构建了StateGraph
。
最后通过create_chatbot(docs_info)
创建聊天机器人节点,使其能够处理和交互提供的文档:
async def create_agent(docs_info=None): async with MultiServerMCPClient( { "server":{ "url":"http://localhost:8000/sse", "transport":"sse", "timeout": 30 } } ) as client: # 获取MCP工具 tools = client.get_tools() # 创建图构建器 graph_builder = StateGraph(MessagesState) # 创建节点 chatbot_node = create_chatbot(docs_info) graph_builder.add_node("chatbot", chatbot_node)
异步工具执行器设计
async_tool_executor
被设计为动态处理工具调用,接收包含对话中交换消息列表的state
作为输入。提取最后一条消息(messages[-1]
)检查工具调用,直接从tool_calls
或additional_kwargs
中获取。
若未找到工具调用,则原样返回消息;若找到,则将消息复制到new_messages
并遍历每个工具调用,提取tool_name
、tool_args
和tool_id
,支持字典和对象格式以保证灵活性。通过匹配tool_name
查找对应工具,若未找到则生成包含有效工具列表的错误消息。
若工具存在,使用asyncio.iscoroutinefunction()
判断是否为异步函数:异步函数通过await tool.coroutine(**tool_args)
执行,否则调用tool.func(**tool_args)
或tool(**tool_args)
。通过捕获异常处理错误,并将详细错误消息附加到new_messages
:
async def async_tool_executor(state): messages = state["messages"] last_message = messages[-1] # 检查是否存在工具调用 tool_calls = None if hasattr(last_message, "tool_calls"): tool_calls = last_message.tool_calls elif hasattr(last_message, "additional_kwargs") and "tool_calls" in last_message.additional_kwargs: tool_calls = last_message.additional_kwargs["tool_calls"] if not tool_calls: return {"messages": messages} # 处理每个工具调用 new_messages = messages.copy() for tool_call in tool_calls: # 处理不同格式的tool_call if isinstance(tool_call, dict): tool_name = tool_call.get("name") tool_args = tool_call.get("args", {}) tool_id = tool_call.get("id", "tool-call-id") else: tool_name = tool_call.name tool_args = tool_call.args if hasattr(tool_call, "args") else {} tool_id = getattr(tool_call, "id", "tool-call-id") # 打印调试信息 print(f"执行工具: {tool_name}") print(f"工具参数: {tool_args}") # 查找匹配的工具 tool = next((t for t in tools if t.name == tool_name), None) if not tool: # 工具未找到 tool_error = f"错误: {tool_name} 不是有效工具,请尝试以下工具: {[t.name for t in tools]}。" new_messages.append(AIMessage(content=tool_error)) else: try: # 执行异步工具 if asyncio.iscoroutinefunction(tool.coroutine): result = await tool.coroutine(**tool_args) else: # 必要时回退到同步执行 result = tool.func(**tool_args) if hasattr(tool, 'func') else tool(**tool_args) print(f"工具结果: {result}") # 添加工具结果 new_messages.append(ToolMessage( content=str(result), tool_call_id=tool_id, name=tool_name )) except Exception as e: # 处理错误 error_msg = f"错误: {str(e)}\n 请修复错误。" print(f"工具错误: {error_msg}") new_messages.append(AIMessage(content=error_msg)) return {"messages": new_messages}
对话流程构建
通过将异步工具执行器节点和路由函数集成到聊天机器人的图中,设计结构化对话流程:
添加async_tool_executor
作为名为"tools"
的节点,允许系统动态处理工具调用。
创建路由函数,根据对话中最后一条消息确定下一步:若 AI 发送的最后一条消息包含工具调用,则导向"tools"
,否则结束对话。
向图中添加边:从"chatbot"
开始,有条件地路由到"tools"
或"end"
,并将"tools"
循环回"chatbot"
以支持多次工具交互。
# 添加异步工具执行器节点 graph_builder.add_node("tools", async_tool_executor) # 定义处理工具调用的路由函数 def router(state): messages = state["messages"] last_message = messages[-1] has_tool_calls = False if isinstance(last_message, AIMessage): if hasattr(last_message, "tool_calls") and last_message.tool_calls: has_tool_calls = True elif hasattr(last_message, "additional_kwargs") and last_message.additional_kwargs.get("tool_calls"): has_tool_calls = True return "tools" if has_tool_calls else "end" # 添加边 graph_builder.add_edge(START, "chatbot") graph_builder.add_conditional_edges( "chatbot", router, { "tools": "tools", "end": END } ) graph_builder.add_edge("tools", "chatbot") # 编译图 graph = graph_builder.compile() return graph, client # 返回client以保持其存活状态
Node.py
我设计了get_system_prompt
函数来动态生成 AI 助手的系统提示,确保其在清晰的指导原则和上下文感知下运行。首先,使用datetime.now().strftime("%Y-%m-%d")
格式化当前日期,并嵌入提示中以供实时参考。
然后,定义助手的角色和能力,列出三个主要工具:generate_image
(使用 DALL-E 生成图像)、data_visualization
(使用 matplotlib 创建图表)和python_repl
(Python 执行环境)。
def get_system_prompt(docs_info=None): system_prompt = f""" Today is {datetime.now().strftime("%Y-%m-%d")} You are a helpful AI Assistant that can use these tools: - generate_image: Generate an image using DALL-E based on a prompt - data_visualization: Create charts with Python and matplotlib - python_repl: Execute Python code When you call image generation or data visualization tool, only answer the fact that you generated, not base64 code or url. Once you generated image by a tool, then do not call it again in one answer. """ if docs_info: docs_context = "\n\nYou have access to these documents:\n" for doc in docs_info: docs_context += f"- {doc['name']}: {doc['type']}\n" system_prompt += docs_context system_prompt += "\nYou should always answer in same language as user's ask." return system_prompt
开发了create_chatbot
函数来处理用户输入并生成 AI 响应。它使用ChatPromptTemplate
技术将系统指令与用户消息结合,其中系统指令来自get_system_prompt(docs_info)
。
通过将提示通过管道(|)传递给 LLM,创建无缝的处理链。该函数通过将字符串转换为HumanMessage
对象来确保消息格式的一致性。聊天机器人使用用户消息调用 LLM 并附加响应,从而维护结构化的对话历史。
助手遵循预定义规则,同时适应查询需求。它保持与工具无关的特性,确保响应的灵活性。这种设置可提供交互式和动态的聊天机器人体验。
def create_chatbot(docs_info=None): prompt = ChatPromptTemplate.from_messages([ SystemMessagePromptTemplate.from_template(get_system_prompt(docs_info)), HumanMessagePromptTemplate.from_template("{input}") ]) # Use the LLM without binding tools chain = prompt | llm def chatbot(state: MessagesState): # Ensure messages are in the right format if isinstance(state["messages"], str): from langchain_core.messages import HumanMessage messages = [HumanMessage(content=state["messages"])] else: messages = state["messages"] response = chain.invoke(messages) return {"messages": messages + [response]} return chatbot
Server.py
我开发了一组工具,用于在多工具系统中生成图像、可视化数据和执行 Python 代码。generate_image
工具使用 DALL-E 基于提示词生成图像:它会先确保提示词有效,然后异步调用 OpenAI API 生成图像。成功时返回生成图像的 URL,出错时提供错误信息。
data_visualization
工具执行包含 matplotlib 的 Python 代码以创建图表,并将结果保存为 base64 编码的 PNG 图像。python_repl
工具使用 REPL 环境运行 Python 代码,支持动态执行用户输入。每个工具均设计为处理错误并向用户返回有意义的响应。
@mcp.tool()async def generate_image(prompt: str) -> str: """ 使用DALL-E基于给定提示词生成图像。 """ if not isinstance(prompt, str) or not prompt.strip(): raise ValueError("无效的提示词") try: # 由于这是异步函数,需正确处理同步的OpenAI调用 loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: client.images.generate( model="dall-e-3", prompt=prompt, size="1024x1024", quality="standard", n=1 ) ) # 返回成功消息和URL return f"成功生成了{prompt}的图像!URL如下:{response.data[0].url}" except Exception as e: return f"图像生成错误:{str(e)}"repl = PythonREPL()@mcp.tool()def data_visualization(code: str): """执行Python代码,使用matplotlib进行可视化。""" try: repl.run(code) buf = io.BytesIO() plt.savefig(buf, format='png') buf.seek(0) img_str = base64.b64encode(buf.getvalue()).decode() return f"data:image/png;base64,{img_str}" except Exception as e: return f"图表创建错误:{str(e)}" @mcp.tool()def python_repl(code: str): """执行Python代码。""" return repl.run(code)
我创建了get_tools
函数以返回可用工具列表,确保仅包含功能正常的工具。该函数从基础工具集开始:generate_image
、python_repl
和data_visualization
。若提供了额外的retriever_tool
,则将其添加到工具列表中,最后返回完整的可用工具列表。
最后,脚本通过调用mcp.run(transport="sse")
使用服务器发送事件(SSE)作为传输方式运行 MCP 服务器,确保工具在应用程序执行时就绪。
def get_tools(retriever_tool=None): # 仅包含可用的工具 base_tools = [generate_image, python_repl, data_visualization] if retriever_tool: base_tools.append(retriever_tool) return base_toolsif __name__ == "__main__": mcp.run(transport="sse")
main.py
我设计了main
函数来运行一个异步代理,动态处理用户输入并与工具交互。该函数首先通过await create_agent()
创建代理和客户端,然后通过命令行提示用户输入,并使用用户请求构建初始消息(HumanMessage)。
函数通过agent.ainvoke()
异步调用代理,处理请求并处理响应:若响应来自用户,则打印用户输入;若来自工具,则打印结果并检查是否为图像生成,提取图像 URL。处理过程中捕获并打印任何异常。
最后,保持客户端存活以确保操作持续。脚本通过asyncio.run(main())
异步执行。
async def main(): # 创建代理 agent, client = await create_agent() # 从命令行获取用户输入 user_input = input("What would you like to ask? ") # 创建合适的初始消息 initial_message = HumanMessage(content=user_input) try: # 异步使用代理 print("Processing your request...") result = await agent.ainvoke({"messages": [initial_message]}) # 打印结果 for message in result["messages"]: if hasattr(message, "type") and message.type == "human": print(f"User: {message.content}") elif hasattr(message, "type") and message.type == "tool": print(f"Tool Result: {message.content}") # 如果是图像生成结果,提取URL if "image" in message.content.lower() and "url" in message.content.lower(): print("Image Generated Successfully!") else: print(f"AI: {message.content}") except Exception as e: print(f"Error: {str(e)}") # 保持客户端存活直到所有操作完成 # 在实际应用中,您需要根据需要保持客户端活跃if __name__ == "__main__": asyncio.run(main())
结论
MCP 不仅是一个简单的升级工具调用协议,更是一次重大的范式升级。
MCP 提供了通用开放标准,使 AI 系统能够以标准化方式连接各种数据源、工具和服务 —— 这减少了为每个数据源构建独立连接器的需求,简化了 AI 集成流程。
当前非智能服务可通过 MCP 将自身功能暴露为 “工具” 供 LLM(大型语言模型)调用,这使 LLM 能够与现有系统交互并执行任务,而无需对现有系统进行重大修改。
学习是一个过程,只要坚持学习,就会面临挑战。天道酬勤,越努力,越优秀。