掘金 人工智能 05月08日 10:23
MCP实战派【2】从0-1实现一个满足 MCP 协议的 Server
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文详细介绍了如何在不使用MCP官方Python SDK的情况下,从零开始构建一个符合MCP协议的服务端。文章深入探讨了JSON-RPC协议、SSE/message端点的实现,以及MCP Server的核心逻辑。通过结合message和SSE两种机制,系统提升了交互的响应性和灵活性,优化了用户体验,尤其是在处理长时间任务时表现突出。文章还提供了完整的代码示例和流程图,帮助读者理解和实践。

💡MCP服务端的消息体基于JSON-RPC协议,需要定义请求和响应实体,包括id、jsonrpc、method、params、result和error等字段,以满足协议规范。

📡SSE(Server-Sent Events)端点用于建立服务端与客户端之间的长连接,实现服务端主动推送事件消息。message端点用于处理客户端发起的POST请求,例如initialize、tools/list等方法的调用。

🔄服务端通过message接口接收请求,执行相应操作,如果操作耗时较长,则通过SSE通道异步推送任务进度给客户端,提升响应性和用户体验。

🔑MCP Server的核心逻辑从handle_message方法开始,根据请求中的method名称匹配对应的处理器(Handler),执行业务逻辑,并将结果写入event_queue。

🚦系统维护一个mcp_server_list存储已连接的MCP客户端实例,每个客户端通过唯一的client_id进行标识,实现不同客户端之间的事件队列隔离。

在继续我们关于 MCP 的 Dify 插件分享的过程中,本文将引导你了解如何在不使用 MCP 官方 Python SDK 的情况下,从零开始构建一个符合 MCP 协议的服务端(Server)。我们将详细介绍每一步骤,帮助你在完全手动的情况下实现一个满足 MCP 标准的完整解决方案。

JSON-RPC

首先,我们需要了解的是,MCP 的消息体遵循 JSON-RPC 协议规范。关于该协议的详细说明,可以参考其官方文档:JSON-RPC 2.0 Specification

第一步定义RPC的请求和响应实体,

from pydantic import BaseModel# {#   jsonrpc: "2.0",#   id: number | string,#   method: string,#   params?: object# }class MCPRequest(BaseModel):    id: Optional[int | str] = None    jsonrpc: str = "2.0"    method: str    params: Optional[dict] = None# 定义一个响应模型# {#   jsonrpc: "2.0",#   id: number | string,#   result?: object,#   error?: {#     code: number,#     message: string,#     data?: unknown#   }# }class MCPResponse(BaseModel):    id: Optional[int | str] = None    jsonrpc: str = "2.0"    result: Optional[dict] = None    error: Optional[Dict[str, Any]] = None

 SSE/message 端点的实现

在上一篇文章中我们已经分析过,SSE(Server-Sent Events)端点的作用在于建立服务端与客户端之间的长连接。通过服务端主动推送事件消息的方式,客户端能够以流式的形式实时更新内容。相较之下,message 端点则主要用于处理客户端发起的各类 POST 请求,例如 initializetools/list 等方法的调用。

整个流程是这样的:客户端首先通过 message 接口发送请求,服务端接收并执行相应的操作;如果该操作耗时较长(如使用 MCP 协议下载一个大文件),服务端便可通过 SSE 通道异步地将任务进度持续推送给客户端。这种方式使得 LLM 能够及时感知到 MCP 的执行状态,并最终将相关信息反馈给用户。

通过结合 message 和 SSE 两种机制,系统不仅提升了交互的响应性和灵活性,也显著优化了用户体验,特别是在处理长时间任务时表现尤为突出。

@app.get("/sse")async def sse():    """SSE连接端点"""    mcp_server = MCPServer(tools=[get_current_weather])    await mcp_server.event_queue.put(        {"event": "endpoint", "data": f"/message?client_id={mcp_server.client_id}"}    )    return EventSourceResponse(        mcp_server.event_message_reader(), ping=60  # 增加ping参数保持连接活跃    )@app.post("/message")async def message(request: Request, mcpRequest: MCPRequest):    """MCP消息处理端点"""    print(f"Received request: {request} mcpRequest: {mcpRequest.model_dump_json()}")    client_id = request.query_params.get("client_id")    if client_id is None or client_id not in mcp_server_list:        return MCPResponse(            id=mcpRequest.id,            jsonrpc="2.0",            error={"code": -32600, "message": "Client ID not found or invalid"},        ).model_dump_json(exclude_none=True)    try:        await mcp_server_list[client_id].handle_message(mcpRequest)        return Response(status_code=202)    except Exception as e:        print(f"Error handling message: {e}")        return MCPResponse(            id=mcpRequest.id,            jsonrpc="2.0",            error={"code": -32600, "message": f"Internal error: {str(e)}"},        ).model_dump_json(exclude_none=True)

SSE(Server-Sent Events)是一个基于 GET 方法的接口。在客户端首次初始化时,服务端会通过 SSE 连接返回一个包含客户端标识信息的消息,其中包括对应的 message 端点地址。这样,MCP 客户端就能在后续发起 POST 请求时,准确地知道应向哪个端点发送 RPC 方法调用。

message 接口则是一个 POST 方法,主要用于处理客户端的 RPC 请求。在这个接口中,我额外加入了一个客户端身份验证机制:如果请求中携带的客户端 ID 并不在服务端维护的已注册客户端列表中,服务端将返回一个异常响应,以确保只有经过认证的客户端才能继续执行后续操作。

MCP Server 的实现

先看代码:

# 存储活跃的MCP服务器实例mcp_server_list: Dict[str, "MCPServer"] = {}class MCPServer:    def __init__(self, tools, server_name="xunberg", server_version="0.0.1"):        self.event_queue = asyncio.Queue()  # 用于存储事件消息        self.client_id = str(uuid.uuid4())        self.server_info = {            "protocolVersion": "2024-11-05",            "capabilities": {                "experimental": {},                "prompts": {"listChanged": False},                "resources": {"subscribe": False, "listChanged": False},                "tools": {"listChanged": False},            },            "serverInfo": {"name": server_name, "version": server_version},        }        self.tools = tools        mcp_server_list[self.client_id] = self    async def event_message_reader(self):        """事件消息读取器,用于SSE连接"""        try:            while True:                event_message = await self.event_queue.get()                print(f"event_message: {event_message}")                yield event_message                self.event_queue.task_done()        except asyncio.CancelledError:            print(f"Client {self.client_id} disconnected")            # 清理资源            if self.client_id in mcp_server_list:                del mcp_server_list[self.client_id]    async def handle_message(self, request: MCPRequest):        """处理接收到的MCP请求"""        method = request.method        handlers = {            "initialize": self._handle_initialize,            "notifications/initialized": lambda req: MCPResponse(                id=req.id, result={}            ),  # 处理初始化完成通知            "tools/list": self._handle_tools_list,            "tools/call": self._handle_tools_call,            "resources/list": self._handle_resources_list,            "prompts/list": self._handle_prompts_list,        }        handler = handlers.get(method)        if handler:            mcpResponse = handler(request)            response = mcpResponse.model_dump_json(exclude_none=True)            await self.event_queue.put({"event": "message", "data": response})        else:            # 处理未知方法            error_response = MCPResponse(                id=request.id,                error={"code": -32601, "message": f"Method '{method}' not found"},            ).model_dump_json(exclude_none=True)            await self.event_queue.put({"event": "message", "data": error_response})    def _handle_initialize(self, request: MCPRequest) -> MCPResponse:        """处理初始化请求"""        return MCPResponse(id=request.id, result=self.server_info)    def _handle_tools_list(self, request: MCPRequest) -> MCPResponse:        """处理工具列表请求"""        mcp_tools = []        for tool in self.tools:            mcp_tools.append(                {                    "name": tool.__name__,                    "description": tool.__doc__,                    "inputSchema": {                        "type": "object",                        "properties": {                            param.name: {                                "type": "string",                                "description": param.annotation.__doc__,                            }                            for param in inspect.signature(tool).parameters.values()                        },                    },                }            )        return MCPResponse(id=request.id, result={"tools": mcp_tools})    def _handle_tools_call(self, request: MCPRequest) -> MCPResponse:        """处理工具调用请求"""        tool_name = request.params.get("name")        arguments = request.params.get("arguments", {})        final_result = {            "type": "text",        }        try:            for tool in self.tools:                if tool.__name__ == tool_name:                    tool_result = tool(**arguments)                    final_result["text"] = str(tool_result)                    return MCPResponse(                        id=request.id, result={"content": [final_result]}                    )        except Exception as e:            final_result["text"] = str(f"Error executing tool {tool_name}: {str(e)}")            return MCPResponse(                id=request.id, result={"content": [final_result], "isError": True}            )    def _handle_resources_list(self, request: MCPRequest) -> MCPResponse:        """处理资源列表请求"""        return MCPResponse(id=request.id, result={"resources": []})    def _handle_prompts_list(self, request: MCPRequest) -> MCPResponse:        """处理提示列表请求"""        return MCPResponse(id=request.id, result={"prompts": []})

核心逻辑可以从 handle_message 方法开始理解。当客户端携带某个 RPC method 请求到达服务端时,系统会进入 handle_message 方法进行处理。该方法根据请求中的 method 名称匹配对应的处理器(Handler),执行相应的业务逻辑,并将结果封装后写入 event_queue

event_queue 本质上是一个消息队列(MQ)的模拟实现,负责在 handle_message 中“生产”事件数据,并由 event_message_reader 方法进行“消费”。这个 event_message_reader 是在 SSE 的 GET 接口(即事件流接口)中被调用的,通过 EventSourceResponse 持续地将事件流推送给客户端。

为了保持事件的持续推送,event_message_reader 内部采用了一个 while True 循环,不断从 event_queue 中读取新事件,确保客户端能够实时接收到最新的异步处理结果。

通过这种设计,我们实现了 MCP 协议中“命令-响应”的异步化流程:

此外,我们还维护了一个 mcp_server_list,用于存储已连接的 MCP 客户端实例。每个客户端通过唯一的 client_id 进行标识,从而实现不同客户端之间的事件队列隔离,确保每个客户端只接收到属于自己的事件流。

图解

最后画个图来让我们更清楚的去理解整个过程

graph LR    A[LLM Application] --> B[MCP Client]    B -->|GET /sse 长连接| C[MCP Server]    C -->| <br>Server-Sent Events 推送| B    B <-->|POST /message<br>JSON RPC 2.0 调用 <br> 返回 202 | C    C -->|生产MQ| D[handle_message]    C <-->| loop <br>消费 MQ | E[event_message_reader]

完整的代码地址:xunberg.github.io

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

MCP JSON-RPC SSE Dify插件 服务端构建
相关文章