掘金 人工智能 05月31日 21:18
拆解Java MCP Server SSE代码
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文深入剖析了MCP Server SSE模式的代码实现,旨在帮助开发者更好地理解和处理相关问题。文章重点拆解了McpAsyncServer、McpServerTransportProvider和McpServerSession这三个核心类。McpAsyncServer负责管理Tools、Resources和Prompts等资源,并提供动态增删改查的功能。McpServerTransportProvider负责MCP Client与Server之间的通信,针对不同技术栈选择不同的通信方式。McpServerSession则负责处理请求、响应和通知数据,并详细介绍了如何处理/sse和/mcp/message请求。

🧰**McpAsyncServer:资源管理核心**。负责管理Tools、Resources、Prompts三大资源,提供新增、删除和变更通知等功能,方便动态管理和告知MCP Client资源变化。

📡**McpServerTransportProvider:通信方式选择**。实现MCP Client与MCP Server之间的通信,根据技术栈选择合适的通信方式,如HttpServletSseServerTransportProvider、WebFluxSseServerTransportProvider、WebMvcSseServerTransportProvider等。

🔑**McpServerSession:数据处理中心**。主要处理请求数据、响应数据和通知数据,通过AsyncServerImpl类初始化时设定的请求处理器和通知处理器,实现对各类数据的具体处理逻辑。

🤝**/sse与/mcp/message请求处理**。McpServerTransportProvider处理/sse请求,与MCP Client建立链接并生成sessionId;处理/mcp/message请求,解析请求数据并交给McpServerSession处理,实现数据交互。

前言

为了能够更加熟悉MCP Server SSE模式,我们需要深入代码内部看看具体的实现,,以便于我们遇到问题时能够更快更好地处理。废话不多说,接下来我们就开始一步一步拆解SSE的代码逻辑。

代码拆解

MCP官方给出的java SSE demo有好几种,我们直接拿出最经典Webmvc实现方式来进行拆解。MCP Server大概实现我们只需要了解三个类:McpAsyncServer,McpServerTransportProvider、McpServerSession。

@Override  public Mono<Void> addTool(McpServerFeatures.AsyncToolSpecification toolSpecification) {  if (toolSpecification == null) {  return Mono.error(new McpError("Tool specification must not be null"));  }  if (toolSpecification.tool() == null) {  return Mono.error(new McpError("Tool must not be null"));  }  if (toolSpecification.call() == null) {  return Mono.error(new McpError("Tool call handler must not be null"));  }  if (this.serverCapabilities.tools() == null) {  return Mono.error(new McpError("Server must be configured with tool capabilities"));  }    return Mono.defer(() -> {  // Check for duplicate tool names  if (this.tools.stream().anyMatch(th -> th.tool().name().equals(toolSpecification.tool().name()))) {  return Mono  .error(new McpError("Tool with name '" + toolSpecification.tool().name() + "' already exists"));  }  // 核心代码this.tools.add(toolSpecification);  logger.debug("Added tool handler: {}", toolSpecification.tool().name());    if (this.serverCapabilities.tools().listChanged()) {  return notifyToolsListChanged();  }  return Mono.empty();  });  }

咱们把里面最核心的两段代码找出来:

    // 新增Tool    this.tools.add(toolSpecification);    // 如果需要通知MCP Client    if (this.serverCapabilities.tools().listChanged()) {     // 通知MCP Client,Tools有变更 return notifyToolsListChanged();  }

除了addTool,removeTool的实现也是类似:

// 从列表中删除指定的Toolboolean removed = this.tools  .removeIf(toolSpecification -> toolSpecification.tool().name().equals(toolName)); // 如果删除成功,并且需要通知MCP Client,那么执行通知逻辑if (removed) {if (this.serverCapabilities.tools().listChanged()) {  return notifyToolsListChanged();  }}

接下来我们看一下通知的实现:

  @Override  public Mono<Void> notifyToolsListChanged() {  return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_TOOLS_LIST_CHANGED, null);  }

具体的通知操作交给了TransportProvider来处理;看了上面的代码拆解,我们对McpAsyncServer已经很了解了,它就是负责管理资源的类。在项目中的应用也非常直接,如果需要对Tools、Resources、Prompts进行任何的变更,都可以使用McpAsyncServer。可能会有小伙伴疑惑,咱们在demo或者项目使用的是McpSyncServer怎么办?不着急,咱们看一下McpSyncServerd实现就知道了:

    public McpSyncServer(McpAsyncServer asyncServer) {  this.asyncServer = asyncServer;  }public void addTool(McpServerFeatures.SyncToolSpecification toolHandler) {  this.asyncServer.addTool(McpServerFeatures.AsyncToolSpecification.fromSync(toolHandler)).block();  }

也就是说,McpSyncServer的实现是McpAsyncServer代理的。所以我们也可以通过McpSyncServer操作Tools、Resources、Prompts。

private ServerResponse handleSseConnection(ServerRequest request) {      // 如果链接已关闭,那么直接返回服务不可用if (this.isClosing) {  return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).body("Server is shutting down");  }  // 生成sessionIdString sessionId = UUID.randomUUID().toString();  logger.debug("Creating new SSE connection for session: {}", sessionId);    // Send initial endpoint event  try {  return ServerResponse.sse(sseBuilder -> {    // 设置链接完成后需要做的操作sseBuilder.onComplete(() -> {  logger.debug("SSE connection completed for session: {}", sessionId);  // 移除sessionIdsessions.remove(sessionId);  });  // 设置链接超时处理逻辑sseBuilder.onTimeout(() -> {  logger.debug("SSE connection timed out for session: {}", sessionId);  // 移除sessionIdsessions.remove(sessionId);  });  // 创建WebMvcMcpSessionTransport,用来构建McpServerSessionWebMvcMcpSessionTransport sessionTransport = new WebMvcMcpSessionTransport(sessionId, sseBuilder); // 构建 McpServerSessionMcpServerSession session = sessionFactory.create(sessionTransport); // 保存当前链接状态 this.sessions.put(sessionId, session);    try {  // 返回链接sessionIdsseBuilder.id(sessionId)  .event(ENDPOINT_EVENT_TYPE)  .data(this.baseUrl + this.messageEndpoint + "?sessionId=" + sessionId);  } catch (Exception e) {  logger.error("Failed to send initial endpoint event: {}", e.getMessage());  sseBuilder.error(e);  }  }, Duration.ZERO);  }   catch (Exception e) {  logger.error("Failed to send initial endpoint event to session {}: {}", sessionId, e.getMessage());// 出现异常移除sessionId  sessions.remove(sessionId);// 返回500  return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); }  }

其实核心代码就只有最关键的几段,主要目标就是创建Session,大概分为以下几步:

2、接下来看一下如何处理/mcp/message请求的

private ServerResponse handleMessage(ServerRequest request) {// 如果链接关闭,返回服务不可用if (this.isClosing) {  return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).body("Server is shutting down");  }  // 如果请求参数没有sessionId,直接返回,非法请求if (!request.param("sessionId").isPresent()) {  return ServerResponse.badRequest().body(new McpError("Session ID missing in message endpoint"));  }  // 从请求参数中获取sessionId  String sessionId = request.param("sessionId").get(); // 根据sessionId取出对应的McpServerSessionMcpServerSession session = sessions.get(sessionId);  // 如果没取到session,直接返回  if (session == null) {  return ServerResponse.status(HttpStatus.NOT_FOUND).body(new McpError("Session not found: " + sessionId));  }    try {  // 解析请求体String body = request.body(String.class);// 把请求体反序列化为JSONRPCMessage对象  McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body);    // 调用McpServerSession来处理请求数据session.handle(message).block();// 返回处理成功return ServerResponse.ok().build();  } catch (IllegalArgumentException | IOException e) {  logger.error("Failed to deserialize message: {}", e.getMessage());  // 出现异常直接返回return ServerResponse.badRequest().body(new McpError("Invalid message format"));  } catch (Exception e) {  logger.error("Error handling message: {}", e.getMessage());  return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).body(new McpError(e.getMessage()));  }  }

总结一下上面的源码:

所以真正地处理细节是在McpServerSession,TransportProvider只做了两件事情:

    处理/sse请求,与MCP Client建立链接,生成sessionId;处理/mcp/message请求,解析请求数据,交给McpServerSession处理;
AsyncServerImpl(McpServerTransportProvider mcpTransportProvider, ObjectMapper objectMapper,  McpServerFeatures.Async features) {  this.mcpTransportProvider = mcpTransportProvider;  this.objectMapper = objectMapper;  this.serverInfo = features.serverInfo();  this.serverCapabilities = features.serverCapabilities();  this.instructions = features.instructions();  this.tools.addAll(features.tools());  this.resources.putAll(features.resources());  this.resourceTemplates.addAll(features.resourceTemplates());  this.prompts.putAll(features.prompts());    Map<String, McpServerSession.RequestHandler<?>> requestHandlers = new HashMap<>();    // 新增对应的请求处理器requestHandlers.put(McpSchema.METHOD_PING, (exchange, params) -> Mono.just(Map.of()));    // 新增tools接口处理器  if (this.serverCapabilities.tools() != null) {  requestHandlers.put(McpSchema.METHOD_TOOLS_LIST, toolsListRequestHandler());  requestHandlers.put(McpSchema.METHOD_TOOLS_CALL, toolsCallRequestHandler());  }    // 新增resources接口处理器  if (this.serverCapabilities.resources() != null) {  requestHandlers.put(McpSchema.METHOD_RESOURCES_LIST, resourcesListRequestHandler());  requestHandlers.put(McpSchema.METHOD_RESOURCES_READ, resourcesReadRequestHandler());  requestHandlers.put(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, resourceTemplateListRequestHandler());  }    // 新增prompts接口处理器  if (this.serverCapabilities.prompts() != null) {  requestHandlers.put(McpSchema.METHOD_PROMPT_LIST, promptsListRequestHandler());  requestHandlers.put(McpSchema.METHOD_PROMPT_GET, promptsGetRequestHandler());  }    // 新增日志接口处理器if (this.serverCapabilities.logging() != null) {  requestHandlers.put(McpSchema.METHOD_LOGGING_SET_LEVEL, setLoggerRequestHandler());  }    Map<String, McpServerSession.NotificationHandler> notificationHandlers = new HashMap<>();  // 新增通知处理器  notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_INITIALIZED, (exchange, params) -> Mono.empty());    List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeConsumers = features  .rootsChangeConsumers();    if (Utils.isEmpty(rootsChangeConsumers)) {  rootsChangeConsumers = List.of((exchange,  roots) -> Mono.fromRunnable(() -> logger.warn(  "Roots list changed notification, but no consumers provided. Roots list changed: {}",  roots)));  }  // 新增通知处理器notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_ROOTS_LIST_CHANGED,  asyncRootsListChangedNotificationHandler(rootsChangeConsumers));  // 设置SessionFactory,决定了如何创建McpServerSession对象  mcpTransportProvider  .setSessionFactory(transport -> new McpServerSession(UUID.randomUUID().toString(), transport,  this::asyncInitializeRequestHandler, Mono::empty, requestHandlers, notificationHandlers));  }

在McpServerSession中,处理请求数据和通知数据,是根据JSONRPCMessage中的method去匹配上面的handler,通过对应的handler来处理目标数据,示例代码如下:

// 根据method获取对应的handlervar handler = this.requestHandlers.get(request.method());// 执行对应的处理逻辑handler.handle(request.params()));

到这一步,我们大概知道McpServerSession是用来处理各类请求、响应和通知数据的,具体的细节将另起一小节来分析,这里就不细讲。

小结

大概来做一个总结,从功能上来分,McpAsyncServer、McpSyncServer是开放给开发人员开使用的,用来管理Tools、Resources、Prompts等资源的;TransportProvider是用来实现通信协议的,这个是可插拔的,想要使用对应的通信协议,创建对应的TransportProvider对象就行;McpServerSession是真正干活的,和通信协议不相干,它根据针对不同的JSONRPCMessage对象调用对应的handler来处理数据,真正地核心业务处理逻辑就在这个对象中。

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

MCP Server SSE模式 代码拆解 McpAsyncServer McpServerTransportProvider
相关文章