前言
为了能够更加熟悉MCP Server SSE模式,我们需要深入代码内部看看具体的实现,,以便于我们遇到问题时能够更快更好地处理。废话不多说,接下来我们就开始一步一步拆解SSE的代码逻辑。
代码拆解
MCP官方给出的java SSE demo有好几种,我们直接拿出最经典Webmvc实现方式来进行拆解。MCP Server大概实现我们只需要了解三个类:McpAsyncServer,McpServerTransportProvider、McpServerSession。
McpAsyncServer
McpAsyncServer属于资源管理类,专门管理Tools、Resources、Prompts,负责Tools、Resources、Prompts的新增、删除以及变更通知,我们可以大概看一下相关的方法:
这样看起来就一目了然了,McpAsyncServer就是用来管理Tools、Resources、Prompts这三大资源的,根据相关方法,我们可以实现动态地新增和删除Tools、Resources、Prompts,并且能够把这种变化告知Mcp Client;
我们来看一下代码相关代码,以addTool为例:
@Override public Mono<Void> addTool(McpServerFeatures.AsyncToolSpecification toolSpecification) { if (toolSpecification == null) { return Mono.error(new McpError("Tool specification must not be null")); } if (toolSpecification.tool() == null) { return Mono.error(new McpError("Tool must not be null")); } if (toolSpecification.call() == null) { return Mono.error(new McpError("Tool call handler must not be null")); } if (this.serverCapabilities.tools() == null) { return Mono.error(new McpError("Server must be configured with tool capabilities")); } return Mono.defer(() -> { // Check for duplicate tool names if (this.tools.stream().anyMatch(th -> th.tool().name().equals(toolSpecification.tool().name()))) { return Mono .error(new McpError("Tool with name '" + toolSpecification.tool().name() + "' already exists")); } // 核心代码this.tools.add(toolSpecification); logger.debug("Added tool handler: {}", toolSpecification.tool().name()); if (this.serverCapabilities.tools().listChanged()) { return notifyToolsListChanged(); } return Mono.empty(); }); }
咱们把里面最核心的两段代码找出来:
// 新增Tool this.tools.add(toolSpecification); // 如果需要通知MCP Client if (this.serverCapabilities.tools().listChanged()) { // 通知MCP Client,Tools有变更 return notifyToolsListChanged(); }
除了addTool,removeTool的实现也是类似:
// 从列表中删除指定的Toolboolean removed = this.tools .removeIf(toolSpecification -> toolSpecification.tool().name().equals(toolName)); // 如果删除成功,并且需要通知MCP Client,那么执行通知逻辑if (removed) {if (this.serverCapabilities.tools().listChanged()) { return notifyToolsListChanged(); }}
接下来我们看一下通知的实现:
@Override public Mono<Void> notifyToolsListChanged() { return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_TOOLS_LIST_CHANGED, null); }
具体的通知操作交给了TransportProvider来处理;看了上面的代码拆解,我们对McpAsyncServer已经很了解了,它就是负责管理资源的类。在项目中的应用也非常直接,如果需要对Tools、Resources、Prompts进行任何的变更,都可以使用McpAsyncServer。可能会有小伙伴疑惑,咱们在demo或者项目使用的是McpSyncServer怎么办?不着急,咱们看一下McpSyncServerd实现就知道了:
public McpSyncServer(McpAsyncServer asyncServer) { this.asyncServer = asyncServer; }public void addTool(McpServerFeatures.SyncToolSpecification toolHandler) { this.asyncServer.addTool(McpServerFeatures.AsyncToolSpecification.fromSync(toolHandler)).block(); }
也就是说,McpSyncServer的实现是McpAsyncServer代理的。所以我们也可以通过McpSyncServer操作Tools、Resources、Prompts。
McpServerTransportProvider
我们先来看一下McpServerTransportProvider的相关实现:
根据相关实现类的名称,我们就能知道,TransportProvider就是用来实现MCP Client与MCP Server之间的通信方式。针对不同的技术栈,我们会选择相应的通信方式:1、HttpServletSseServerTransportProvider:基于Servlet实现的SSE通信方式,不依赖任何web框架。2、StdioServerTransportProvider:基于操作系统标准输入输出进行数据通信,要求MCP Server与MCP Client运行在同一台计算机上。3、WebFluxSseServerTransportProvider:基于Spring WebFlux实现的SSE通信方式,适用于使用Spring WebFlux的项目;4、WebMvcSseServerTransportProvider:基于Spring WebMvc实现的SSE通信方式,适用于使用Spring WebMvc的项目;
咱们接下来使用WebMvcSseServerTransportProvider作为本次代码解析的对象。首先咱们回顾一下前面的demo中,创建出TransportProvider用来处理/sse,/mcp/message请求,一般会搭配一个RouterFunction作为请求路由,看一下源码:
无论是HttpServletSseServerTransportProvider、WebMvcSseServerTransportProvider还是WebFluxSseServerTransportProvider,主要职责就是处理这两个请求:1、大概来看一下处理/sse请求的相关的代码:
private ServerResponse handleSseConnection(ServerRequest request) { // 如果链接已关闭,那么直接返回服务不可用if (this.isClosing) { return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).body("Server is shutting down"); } // 生成sessionIdString sessionId = UUID.randomUUID().toString(); logger.debug("Creating new SSE connection for session: {}", sessionId); // Send initial endpoint event try { return ServerResponse.sse(sseBuilder -> { // 设置链接完成后需要做的操作sseBuilder.onComplete(() -> { logger.debug("SSE connection completed for session: {}", sessionId); // 移除sessionIdsessions.remove(sessionId); }); // 设置链接超时处理逻辑sseBuilder.onTimeout(() -> { logger.debug("SSE connection timed out for session: {}", sessionId); // 移除sessionIdsessions.remove(sessionId); }); // 创建WebMvcMcpSessionTransport,用来构建McpServerSessionWebMvcMcpSessionTransport sessionTransport = new WebMvcMcpSessionTransport(sessionId, sseBuilder); // 构建 McpServerSessionMcpServerSession session = sessionFactory.create(sessionTransport); // 保存当前链接状态 this.sessions.put(sessionId, session); try { // 返回链接sessionIdsseBuilder.id(sessionId) .event(ENDPOINT_EVENT_TYPE) .data(this.baseUrl + this.messageEndpoint + "?sessionId=" + sessionId); } catch (Exception e) { logger.error("Failed to send initial endpoint event: {}", e.getMessage()); sseBuilder.error(e); } }, Duration.ZERO); } catch (Exception e) { logger.error("Failed to send initial endpoint event to session {}: {}", sessionId, e.getMessage());// 出现异常移除sessionId sessions.remove(sessionId);// 返回500 return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); } }
其实核心代码就只有最关键的几段,主要目标就是创建Session,大概分为以下几步:
- 生成sessionId;通过sessionId构建WebMvcMcpSessionTransport(目的是构建McpServerSession);构建McpServerSession(关键对象);将session状态(McpServerSession)保存到map中(目标是给/mcp/message请求使用);将sessionId返回给MCP Client;
2、接下来看一下如何处理/mcp/message请求的
private ServerResponse handleMessage(ServerRequest request) {// 如果链接关闭,返回服务不可用if (this.isClosing) { return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).body("Server is shutting down"); } // 如果请求参数没有sessionId,直接返回,非法请求if (!request.param("sessionId").isPresent()) { return ServerResponse.badRequest().body(new McpError("Session ID missing in message endpoint")); } // 从请求参数中获取sessionId String sessionId = request.param("sessionId").get(); // 根据sessionId取出对应的McpServerSessionMcpServerSession session = sessions.get(sessionId); // 如果没取到session,直接返回 if (session == null) { return ServerResponse.status(HttpStatus.NOT_FOUND).body(new McpError("Session not found: " + sessionId)); } try { // 解析请求体String body = request.body(String.class);// 把请求体反序列化为JSONRPCMessage对象 McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body); // 调用McpServerSession来处理请求数据session.handle(message).block();// 返回处理成功return ServerResponse.ok().build(); } catch (IllegalArgumentException | IOException e) { logger.error("Failed to deserialize message: {}", e.getMessage()); // 出现异常直接返回return ServerResponse.badRequest().body(new McpError("Invalid message format")); } catch (Exception e) { logger.error("Error handling message: {}", e.getMessage()); return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).body(new McpError(e.getMessage())); } }
总结一下上面的源码:
- 获取请求中的sessionId;根据sessionId取出McpServerSession;反序列化请求体,转成JSONRPCMessage对象;把JSONRPCMessage对象交给McpServerSession处理;
所以真正地处理细节是在McpServerSession,TransportProvider只做了两件事情:
- 处理/sse请求,与MCP Client建立链接,生成sessionId;处理/mcp/message请求,解析请求数据,交给McpServerSession处理;
McpServerSession
McpServerSession主要处理三类数据,请求数据、响应数据和通知数据:
关于McpServerSession如何处理这些数据,我们可以看AsyncServerImpl类,AsyncServerImpl对象在初始化的时候已经设定好了如何创建McpServerSession:
AsyncServerImpl(McpServerTransportProvider mcpTransportProvider, ObjectMapper objectMapper, McpServerFeatures.Async features) { this.mcpTransportProvider = mcpTransportProvider; this.objectMapper = objectMapper; this.serverInfo = features.serverInfo(); this.serverCapabilities = features.serverCapabilities(); this.instructions = features.instructions(); this.tools.addAll(features.tools()); this.resources.putAll(features.resources()); this.resourceTemplates.addAll(features.resourceTemplates()); this.prompts.putAll(features.prompts()); Map<String, McpServerSession.RequestHandler<?>> requestHandlers = new HashMap<>(); // 新增对应的请求处理器requestHandlers.put(McpSchema.METHOD_PING, (exchange, params) -> Mono.just(Map.of())); // 新增tools接口处理器 if (this.serverCapabilities.tools() != null) { requestHandlers.put(McpSchema.METHOD_TOOLS_LIST, toolsListRequestHandler()); requestHandlers.put(McpSchema.METHOD_TOOLS_CALL, toolsCallRequestHandler()); } // 新增resources接口处理器 if (this.serverCapabilities.resources() != null) { requestHandlers.put(McpSchema.METHOD_RESOURCES_LIST, resourcesListRequestHandler()); requestHandlers.put(McpSchema.METHOD_RESOURCES_READ, resourcesReadRequestHandler()); requestHandlers.put(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, resourceTemplateListRequestHandler()); } // 新增prompts接口处理器 if (this.serverCapabilities.prompts() != null) { requestHandlers.put(McpSchema.METHOD_PROMPT_LIST, promptsListRequestHandler()); requestHandlers.put(McpSchema.METHOD_PROMPT_GET, promptsGetRequestHandler()); } // 新增日志接口处理器if (this.serverCapabilities.logging() != null) { requestHandlers.put(McpSchema.METHOD_LOGGING_SET_LEVEL, setLoggerRequestHandler()); } Map<String, McpServerSession.NotificationHandler> notificationHandlers = new HashMap<>(); // 新增通知处理器 notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_INITIALIZED, (exchange, params) -> Mono.empty()); List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeConsumers = features .rootsChangeConsumers(); if (Utils.isEmpty(rootsChangeConsumers)) { rootsChangeConsumers = List.of((exchange, roots) -> Mono.fromRunnable(() -> logger.warn( "Roots list changed notification, but no consumers provided. Roots list changed: {}", roots))); } // 新增通知处理器notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_ROOTS_LIST_CHANGED, asyncRootsListChangedNotificationHandler(rootsChangeConsumers)); // 设置SessionFactory,决定了如何创建McpServerSession对象 mcpTransportProvider .setSessionFactory(transport -> new McpServerSession(UUID.randomUUID().toString(), transport, this::asyncInitializeRequestHandler, Mono::empty, requestHandlers, notificationHandlers)); }
在McpServerSession中,处理请求数据和通知数据,是根据JSONRPCMessage中的method去匹配上面的handler,通过对应的handler来处理目标数据,示例代码如下:
// 根据method获取对应的handlervar handler = this.requestHandlers.get(request.method());// 执行对应的处理逻辑handler.handle(request.params()));
到这一步,我们大概知道McpServerSession是用来处理各类请求、响应和通知数据的,具体的细节将另起一小节来分析,这里就不细讲。
小结
大概来做一个总结,从功能上来分,McpAsyncServer、McpSyncServer是开放给开发人员开使用的,用来管理Tools、Resources、Prompts等资源的;TransportProvider是用来实现通信协议的,这个是可插拔的,想要使用对应的通信协议,创建对应的TransportProvider对象就行;McpServerSession是真正干活的,和通信协议不相干,它根据针对不同的JSONRPCMessage对象调用对应的handler来处理数据,真正地核心业务处理逻辑就在这个对象中。