原文链接可见:MCP的SSE重连机制,源码解析
快速上手可见:MCP的SSE重连机制,低成本接入框架,快速上手
SSE 重连源码解读
配置类
McpRecoveryAutoConfiguration
Mcp 恢复的自动配置类,只有当 spring.ai.alibaba.mcp.recovery.enabled = true
时生效
- SYNC:提供 McpSyncRecovery 的 BeanASYNC:提供 McpAsyncRecovery 的 Bean
package com.alibaba.cloud.ai.autoconfigure.mcp.client;import com.alibaba.cloud.ai.mcp.client.McpAsyncRecovery;import com.alibaba.cloud.ai.mcp.client.McpSyncRecovery;import com.alibaba.cloud.ai.mcp.client.config.McpRecoveryProperties;import org.springframework.ai.mcp.client.autoconfigure.configurer.McpAsyncClientConfigurer;import org.springframework.ai.mcp.client.autoconfigure.configurer.McpSyncClientConfigurer;import org.springframework.ai.mcp.client.autoconfigure.properties.McpClientCommonProperties;import org.springframework.ai.mcp.client.autoconfigure.properties.McpSseClientProperties;import org.springframework.ai.mcp.customizer.McpAsyncClientCustomizer;import org.springframework.ai.mcp.customizer.McpSyncClientCustomizer;import org.springframework.beans.factory.ObjectProvider;import org.springframework.boot.autoconfigure.AutoConfiguration;import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.scheduling.annotation.EnableScheduling;@AutoConfiguration@EnableScheduling@EnableConfigurationProperties({ McpSseClientProperties.class, McpClientCommonProperties.class, McpRecoveryProperties.class })@ConditionalOnProperty(prefix = McpRecoveryProperties.CONFIGPREFIX, name = "enabled", havingValue = "true")public class McpRecoveryAutoConfiguration { @Bean(name = "mcpSyncRecovery") @ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "type" }, havingValue = "SYNC", matchIfMissing = true) public McpSyncRecovery mcpSyncRecovery(McpRecoveryProperties mcpRecoveryProperties, McpSseClientProperties mcpSseClientProperties, McpClientCommonProperties mcpClientCommonProperties, McpSyncClientConfigurer mcpSyncClientConfigurer) { McpSyncRecovery mcpSyncRecovery = new McpSyncRecovery(mcpRecoveryProperties, mcpSseClientProperties, mcpClientCommonProperties, mcpSyncClientConfigurer); mcpSyncRecovery.init(); mcpSyncRecovery.startScheduledPolling(); mcpSyncRecovery.startReconnectTask(); return mcpSyncRecovery; } @Bean(name = "mcpAsyncRecovery") @ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "type" }, havingValue = "ASYNC") public McpAsyncRecovery mcpAsyncRecovery(McpRecoveryProperties mcpRecoveryProperties, McpSseClientProperties mcpSseClientProperties, McpClientCommonProperties mcpClientCommonProperties, McpAsyncClientConfigurer mcpAsyncClientConfigurer) { McpAsyncRecovery mcpAsyncRecovery = new McpAsyncRecovery(mcpRecoveryProperties, mcpSseClientProperties, mcpClientCommonProperties, mcpAsyncClientConfigurer); mcpAsyncRecovery.init(); mcpAsyncRecovery.startScheduledPolling(); mcpAsyncRecovery.startReconnectTask(); return mcpAsyncRecovery; } @Bean @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "type" }, havingValue = "SYNC", matchIfMissing = true) McpSyncClientConfigurer mcpSyncClientConfigurer(ObjectProvider<McpSyncClientCustomizer> customizerProvider) { return new McpSyncClientConfigurer(customizerProvider.orderedStream().toList()); } @Bean @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "type" }, havingValue = "ASYNC") McpAsyncClientConfigurer mcpAsyncClientConfigurer(ObjectProvider<McpAsyncClientCustomizer> customizerProvider) { return new McpAsyncClientConfigurer(customizerProvider.orderedStream().toList()); }}
McpRecoveryProperties
MCP 恢复的属性配置类
- Duration ping:间隔多久 Ping 一次 MCP Server 端Duration delay:延时队列的间隔时间Duration stop:关闭线程池的最大等待时间
package com.alibaba.cloud.ai.mcp.client.config;import org.springframework.boot.context.properties.ConfigurationProperties;import java.time.Duration;@ConfigurationProperties(prefix = McpRecoveryProperties.CONFIGPREFIX)public class McpRecoveryProperties { public static final String CONFIGPREFIX = "spring.ai.alibaba.mcp.recovery"; private Duration ping = Duration.ofSeconds(5L); private Duration delay = Duration.ofSeconds(5L); private Duration stop = Duration.ofSeconds(10L); public Duration getPing() { return ping; } public void setPing(Duration ping) { this.ping = ping; } public Duration getDelay() { return delay; } public void setDelay(Duration delay) { this.delay = delay; } public Duration getStop() { return stop; } public void setStop(Duration stop) { this.stop = stop; }}
组件类
McpSyncClientWrapper
McpSyncClient 及对应的 ToolCallback 列表的封装
package com.alibaba.cloud.ai.mcp.client.component;import io.modelcontextprotocol.client.McpSyncClient;import org.springframework.ai.tool.ToolCallback;import java.util.List;public class McpSyncClientWrapper { private final McpSyncClient client; private final List<ToolCallback> toolCallbacks; public McpSyncClientWrapper(McpSyncClient client, List<ToolCallback> toolCallbacks) { this.client = client; this.toolCallbacks = toolCallbacks; } public McpSyncClient getClient() { return client; } public List<ToolCallback> getToolCallbacks() { return toolCallbacks; }}
McpAsyncClientWrapper
McpAsyncClient 及对应的 ToolCallback 列表的封装
package com.alibaba.cloud.ai.mcp.client.component;import io.modelcontextprotocol.client.McpAsyncClient;import org.springframework.ai.tool.ToolCallback;import java.util.List;public class McpAsyncClientWrapper { private final McpAsyncClient client; private final List<ToolCallback> toolCallbacks; public McpAsyncClientWrapper(McpAsyncClient client, List<ToolCallback> toolCallbacks) { this.client = client; this.toolCallbacks = toolCallbacks; } public McpAsyncClient getClient() { return client; } public List<ToolCallback> getToolCallbacks() { return toolCallbacks; }}
McpReconnectTask
封装对应的服务名,延时时间
package com.alibaba.cloud.ai.mcp.client.component;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;public class McpReconnectTask implements Delayed { private final String serverName; private final long delay; public McpReconnectTask(String serverName, long delay, TimeUnit unit) { this.serverName = serverName; this.delay = System.nanoTime() + unit.toNanos(delay); } public String getServerName() { return serverName; } @Override public long getDelay(TimeUnit unit) { return unit.convert(delay - System.nanoTime(), TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed o) { return Long.compare(this.delay, ((McpReconnectTask) o).delay); }}
核心类
McpAsyncRecovery
McpAsyncClient 恢复的核心类
字段信息如下
字段名称 | 字段类型 | 描述 |
mcpRecoveryProperties | McpRecoveryProperties | MCP重连恢复的属性配置类 |
mcpSseClientProperties | McpSseClientProperties | SSE连接的基础配置,如url等 |
commonProperties | McpClientCommonProperties | Client连接的基础信息 |
mcpAsyncClientConfigurer | McpAsyncClientConfigurer | Async时的Client侧配置类 |
pingScheduler | ScheduledExecutorService | 定时任务线程池,定期ping Mcp Server侧,记录失联的Server信息,记录至延时队列中 |
reconnectExecutor | ExecutorService | 异步线程池,从延时队列中获取失联的Server信息,进行重连 |
isRunning | boolean | 异步线程池,循环的判断条件 |
mcpClientWrapperMap | Map | 记录服务名和对应的Client连接 |
reconnectTaskQueue | DelayQueue | 延时队列,封装重连Server信息 |
方法信息如下
方法名称 | 描述 |
init | Client连接初始化 |
createClient | 实际创建Client连接的逻辑,将新建连接维护至mcpClientWrapperMap中 |
startScheduledPolling | 定时任务线程池,开启定期检查 |
checkMcpClients | 实际ping Mcp Server侧的逻辑,将ping失败记录至延时队列reconnectTaskQueue中,同时移除维护的mcpClientWrapperMap |
startReconnectTask | 异步线程池,不断循环检查循环队列中是否为空 |
package com.alibaba.cloud.ai.mcp.client;import com.alibaba.cloud.ai.mcp.client.component.McpAsyncClientWrapper;import com.alibaba.cloud.ai.mcp.client.component.McpReconnectTask;import com.alibaba.cloud.ai.mcp.client.config.McpRecoveryProperties;import com.fasterxml.jackson.databind.ObjectMapper;import io.modelcontextprotocol.client.McpAsyncClient;import io.modelcontextprotocol.client.McpClient;import io.modelcontextprotocol.client.transport.WebFluxSseClientTransport;import io.modelcontextprotocol.spec.McpSchema;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.ai.mcp.AsyncMcpToolCallbackProvider;import org.springframework.ai.mcp.client.autoconfigure.NamedClientMcpTransport;import org.springframework.ai.mcp.client.autoconfigure.configurer.McpAsyncClientConfigurer;import org.springframework.ai.mcp.client.autoconfigure.properties.McpClientCommonProperties;import org.springframework.ai.mcp.client.autoconfigure.properties.McpSseClientProperties;import org.springframework.ai.tool.ToolCallback;import org.springframework.util.CollectionUtils;import org.springframework.web.reactive.function.client.WebClient;import org.springframework.web.reactive.function.client.WebClientRequestException;import org.springframework.web.reactive.function.client.WebClientResponseException;import java.util.Arrays;import java.util.List;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.DelayQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class McpAsyncRecovery { private static final Logger logger = LoggerFactory.getLogger(McpSyncRecovery.class); private final McpRecoveryProperties mcpRecoveryProperties; private final McpSseClientProperties mcpSseClientProperties; private final McpClientCommonProperties commonProperties; private final McpAsyncClientConfigurer mcpAsyncClientConfigurer; private final ObjectMapper objectMapper = new ObjectMapper(); private final WebClient.Builder webClientBuilderTemplate = WebClient.builder(); private final ScheduledExecutorService pingScheduler = Executors.newSingleThreadScheduledExecutor(); private final ExecutorService reconnectExecutor = Executors.newSingleThreadExecutor(); private volatile boolean isRunning = true; private final Map<String, McpAsyncClientWrapper> mcpClientWrapperMap = new ConcurrentHashMap<>(); private final DelayQueue<McpReconnectTask> reconnectTaskQueue = new DelayQueue<>(); public McpAsyncRecovery(McpRecoveryProperties mcpRecoveryAutoProperties, McpSseClientProperties mcpSseClientProperties, McpClientCommonProperties mcpClientCommonProperties, McpAsyncClientConfigurer mcpAsyncClientConfigurer) { this.mcpRecoveryProperties = mcpRecoveryAutoProperties; this.mcpSseClientProperties = mcpSseClientProperties; this.commonProperties = mcpClientCommonProperties; this.mcpAsyncClientConfigurer = mcpAsyncClientConfigurer; } public void init() { Map<String, McpSseClientProperties.SseParameters> connections = mcpSseClientProperties.getConnections(); if (CollectionUtils.isEmpty(connections)) { logger.warn("No MCP connection config found."); return; } connections.forEach(this::createClient); } public void startReconnectTask() { reconnectExecutor.submit(this::processReconnectQueue); } private void processReconnectQueue() { while (isRunning) { try { McpReconnectTask task = reconnectTaskQueue.take(); // 从队列中取出任务 String serviceName = task.getServerName(); logger.debug("Processing reconnect task for serviceName: {}", serviceName); // 尝试创建客户端 boolean clientCreated = createClient(serviceName, mcpSseClientProperties.getConnections().get(serviceName)); if (!clientCreated) { // 如果创建失败,将任务重新放回队列 reconnectTaskQueue.offer(task); logger.warn("Failed to create client for service: {}, will retry.", serviceName); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 重新设置中断标志 logger.debug("Reconnect thread interrupted", e); } catch (Exception e) { logger.error("Error in reconnect thread", e); } } } private boolean createClient(String key, McpSseClientProperties.SseParameters params) { try { WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(params.url()); String sseEndpoint = params.sseEndpoint() != null ? params.sseEndpoint() : "/sse"; WebFluxSseClientTransport transport = WebFluxSseClientTransport.builder(webClientBuilder) .sseEndpoint(sseEndpoint) .objectMapper(objectMapper) .build(); NamedClientMcpTransport namedTransport = new NamedClientMcpTransport(key, transport); McpSchema.Implementation clientInfo = new McpSchema.Implementation( this.connectedClientName(commonProperties.getName(), namedTransport.name()), commonProperties.getVersion()); McpClient.AsyncSpec asyncSpec = McpClient.async(namedTransport.transport()) .clientInfo(clientInfo) .requestTimeout(commonProperties.getRequestTimeout()); asyncSpec = mcpAsyncClientConfigurer.configure(namedTransport.name(), asyncSpec); McpAsyncClient asyncClient = asyncSpec.build(); if (commonProperties.isInitialized()) { asyncClient.initialize().doOnError(WebClientRequestException.class, ex -> { logger.error("WebClientRequestException occurred during initialization: {}", ex.getMessage()); isRunning = false; }).subscribe(result -> { if (result != null) { logger.info("Async client 初始化成功"); } }); } if (isRunning) { logger.info("Initialized server name: {} with server URL: {}", key, params.url()); List<ToolCallback> callbacks = Arrays .asList(new AsyncMcpToolCallbackProvider(asyncClient).getToolCallbacks()); mcpClientWrapperMap.put(key, new McpAsyncClientWrapper(asyncClient, callbacks)); } return isRunning; } catch (Exception e) { isRunning = false; logger.error("Unexpected error occurred during reconnection process", e); return isRunning; } } public void startScheduledPolling() { pingScheduler.scheduleAtFixedRate(this::checkMcpClients, mcpRecoveryProperties.getPing().getSeconds(), mcpRecoveryProperties.getPing().getSeconds(), TimeUnit.SECONDS); } private void checkMcpClients() { logger.debug("Checking MCP clients..."); checkAndRestartTask(); mcpClientWrapperMap.forEach((serviceName, wrapperClient) -> { wrapperClient.getClient().ping().doOnError(WebClientResponseException.class, ex -> { logger.error("Ping failed for {}", serviceName); mcpClientWrapperMap.remove(serviceName); reconnectTaskQueue.offer(new McpReconnectTask(serviceName, mcpRecoveryProperties.getDelay().getSeconds(), TimeUnit.SECONDS)); logger.info("need reconnect: {}", serviceName); }).subscribe(); }); } private void checkAndRestartTask() { if (!isRunning) { logger.info("Restarting task..."); isRunning = true; startReconnectTask(); } } public List<ToolCallback> getToolCallback() { return mcpClientWrapperMap.values() .stream() .map(McpAsyncClientWrapper::getToolCallbacks) .flatMap(List::stream) .toList(); } public void stop() { pingScheduler.shutdown(); logger.info("定时ping任务线程池已关闭"); // 关闭异步任务线程池 try { reconnectExecutor.shutdown(); if (!reconnectExecutor.awaitTermination(mcpRecoveryProperties.getStop().getSeconds(), TimeUnit.SECONDS)) { reconnectExecutor.shutdownNow(); } logger.info("异步重连任务线程池已关闭"); } catch (InterruptedException e) { logger.error("关闭重连异步任务线程池时发生中断异常", e); reconnectExecutor.shutdownNow(); Thread.currentThread().interrupt(); // 恢复中断状态 } } private String connectedClientName(String clientName, String serverConnectionName) { return clientName + " - " + serverConnectionName; }}
这里有个注意点:client 侧和 server 侧的连接的 Webflux 响应式的,try catch 的形式无法捕捉到对应的异常,需要通过 doOnError 的形式去捕获异常
- asyncClient.initialize().doOnError(...).subscribe(...)wrapperClient.getClient().ping().doOnError(...).subscribe()
McpSyncRecovery
同上 McpSyncClient 恢复的核心类
package com.alibaba.cloud.ai.mcp.client;import com.alibaba.cloud.ai.mcp.client.component.McpReconnectTask;import com.alibaba.cloud.ai.mcp.client.component.McpSyncClientWrapper;import com.alibaba.cloud.ai.mcp.client.config.McpRecoveryProperties;import com.fasterxml.jackson.databind.ObjectMapper;import io.modelcontextprotocol.client.McpAsyncClient;import io.modelcontextprotocol.client.McpClient;import io.modelcontextprotocol.client.McpSyncClient;import io.modelcontextprotocol.client.transport.WebFluxSseClientTransport;import io.modelcontextprotocol.spec.McpSchema;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.ai.mcp.SyncMcpToolCallbackProvider;import org.springframework.ai.mcp.client.autoconfigure.NamedClientMcpTransport;import org.springframework.ai.mcp.client.autoconfigure.configurer.McpSyncClientConfigurer;import org.springframework.ai.mcp.client.autoconfigure.properties.McpClientCommonProperties;import org.springframework.ai.mcp.client.autoconfigure.properties.McpSseClientProperties;import org.springframework.ai.tool.ToolCallback;import org.springframework.util.CollectionUtils;import org.springframework.web.reactive.function.client.WebClient;import org.springframework.web.reactive.function.client.WebClientRequestException;import org.springframework.web.reactive.function.client.WebClientResponseException;import java.lang.reflect.Field;import java.util.Arrays;import java.util.List;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.DelayQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class McpSyncRecovery { private static final Logger logger = LoggerFactory.getLogger(McpSyncRecovery.class); private final McpRecoveryProperties mcpRecoveryProperties; private final McpSseClientProperties mcpSseClientProperties; private final McpClientCommonProperties commonProperties; private final McpSyncClientConfigurer mcpSyncClientConfigurer; private final ObjectMapper objectMapper = new ObjectMapper(); private final WebClient.Builder webClientBuilderTemplate = WebClient.builder(); private final ScheduledExecutorService pingScheduler = Executors.newSingleThreadScheduledExecutor(); private final ExecutorService reconnectExecutor = Executors.newSingleThreadExecutor(); private volatile boolean isRunning = true; private final Map<String, McpSyncClientWrapper> mcpClientWrapperMap = new ConcurrentHashMap<>(); private final DelayQueue<McpReconnectTask> reconnectTaskQueue = new DelayQueue<>(); public McpSyncRecovery(McpRecoveryProperties mcpRecoveryProperties, McpSseClientProperties mcpSseClientProperties, McpClientCommonProperties mcpClientCommonProperties, McpSyncClientConfigurer mcpSyncClientConfigurer) { this.mcpRecoveryProperties = mcpRecoveryProperties; this.mcpSseClientProperties = mcpSseClientProperties; this.commonProperties = mcpClientCommonProperties; this.mcpSyncClientConfigurer = mcpSyncClientConfigurer; } public void init() { Map<String, McpSseClientProperties.SseParameters> connections = mcpSseClientProperties.getConnections(); if (CollectionUtils.isEmpty(connections)) { logger.warn("No MCP connection config found."); return; } connections.forEach((serviceName, params) -> { boolean clientCreated = createClient(serviceName, params); if (!clientCreated) { // 如果创建失败,将任务重新放回队列 reconnectTaskQueue.offer(new McpReconnectTask(serviceName, mcpRecoveryProperties.getDelay().getSeconds(), TimeUnit.SECONDS)); logger.warn("Failed to create client for serviceName: {}, will retry.", serviceName); } }); } public void startReconnectTask() { reconnectExecutor.submit(this::processReconnectQueue); } private void processReconnectQueue() { while (isRunning) { try { McpReconnectTask task = reconnectTaskQueue.take(); // 从队列中取出任务 String serviceName = task.getServerName(); logger.debug("Processing reconnect task for serviceName: {}", serviceName); // 尝试创建客户端 boolean clientCreated = createClient(serviceName, mcpSseClientProperties.getConnections().get(serviceName)); if (!clientCreated) { // 如果创建失败,将任务重新放回队列 reconnectTaskQueue.offer(task); logger.warn("Failed to create client for service: {}, will retry.", serviceName); } } catch (InterruptedException e) { logger.debug("Reconnect thread interrupted", e); Thread.currentThread().interrupt(); } catch (Exception e) { logger.error("Error in reconnect thread", e); } } } private boolean createClient(String key, McpSseClientProperties.SseParameters params) { try { WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(params.url()); String sseEndpoint = params.sseEndpoint() != null ? params.sseEndpoint() : "/sse"; WebFluxSseClientTransport transport = WebFluxSseClientTransport.builder(webClientBuilder) .sseEndpoint(sseEndpoint) .objectMapper(objectMapper) .build(); NamedClientMcpTransport namedTransport = new NamedClientMcpTransport(key, transport); McpSchema.Implementation clientInfo = new McpSchema.Implementation( this.connectedClientName(commonProperties.getName(), namedTransport.name()), commonProperties.getVersion()); McpClient.SyncSpec syncSpec = McpClient.sync(namedTransport.transport()) .clientInfo(clientInfo) .requestTimeout(commonProperties.getRequestTimeout()); syncSpec = mcpSyncClientConfigurer.configure(namedTransport.name(), syncSpec); McpSyncClient syncClient = syncSpec.build(); if (commonProperties.isInitialized()) { // 得到syncClient的delegate字段 Field delegateField = McpSyncClient.class.getDeclaredField("delegate"); delegateField.setAccessible(true); McpAsyncClient mcpAsyncClient = (McpAsyncClient) delegateField.get(syncClient); mcpAsyncClient.initialize().doOnError(WebClientRequestException.class, ex -> { logger.error("WebClientRequestException occurred during initialization: {}", ex.getMessage()); isRunning = false; }).subscribe(result -> { if (result != null) { logger.info("Sync client 初始化成功"); } }); } if (isRunning) { logger.info("Initialized server name: {} with server URL: {}", key, params.url()); List<ToolCallback> callbacks = Arrays .asList(new SyncMcpToolCallbackProvider(syncClient).getToolCallbacks()); mcpClientWrapperMap.put(key, new McpSyncClientWrapper(syncClient, callbacks)); } return isRunning; } catch (Exception e) { isRunning = false; logger.error("Unexpected error occurred during reconnection process", e); return isRunning; } } public void startScheduledPolling() { pingScheduler.scheduleAtFixedRate(this::checkMcpClients, mcpRecoveryProperties.getPing().getSeconds(), mcpRecoveryProperties.getPing().getSeconds(), TimeUnit.SECONDS); } private void checkMcpClients() { logger.debug("Checking MCP clients..."); checkAndRestartTask(); mcpClientWrapperMap.forEach((serviceName, wrapperClient) -> { McpSyncClient syncClient = wrapperClient.getClient(); Field delegateField = null; try { delegateField = McpSyncClient.class.getDeclaredField("delegate"); delegateField.setAccessible(true); McpAsyncClient asyncClient = (McpAsyncClient) delegateField.get(syncClient); asyncClient.ping().doOnError(WebClientResponseException.class, ex -> { logger.error("Ping failed for {}", serviceName); mcpClientWrapperMap.remove(serviceName); reconnectTaskQueue.offer(new McpReconnectTask(serviceName, mcpRecoveryProperties.getDelay().getSeconds(), TimeUnit.SECONDS)); logger.info("need reconnect: {}", serviceName); }).subscribe(); } catch (Exception e) { logger.error("Ping failed for {}", serviceName, e); } }); } private void checkAndRestartTask() { if (!isRunning) { logger.info("Restarting task..."); isRunning = true; startReconnectTask(); } } public List<ToolCallback> getToolCallback() { return mcpClientWrapperMap.values() .stream() .map(McpSyncClientWrapper::getToolCallbacks) .flatMap(List::stream) .toList(); } public void stop() { pingScheduler.shutdown(); logger.info("定时ping任务线程池已关闭"); // 关闭异步任务线程池 try { reconnectExecutor.shutdown(); if (!reconnectExecutor.awaitTermination(mcpRecoveryProperties.getStop().getSeconds(), TimeUnit.SECONDS)) { reconnectExecutor.shutdownNow(); } logger.info("异步重连任务线程池已关闭"); } catch (InterruptedException e) { logger.error("关闭重连异步任务线程池时发生中断异常", e); reconnectExecutor.shutdownNow(); Thread.currentThread().interrupt(); // 恢复中断状态 } } private String connectedClientName(String clientName, String serverConnectionName) { return clientName + " - " + serverConnectionName; }}
注意点:这里 Sync 是同步的,再向 Mcp Server 发起 ping 或 初始化连接的请求时,有可能会报 WebClientException 的错误。我们需要得到内部 McpAsyncClient,进行响应式的处理捕获异常
初始化:
// 得到syncClient的delegate字段Field delegateField = McpSyncClient.class.getDeclaredField("delegate");delegateField.setAccessible(true);McpAsyncClient mcpAsyncClient = (McpAsyncClient) delegateField.get(syncClient);mcpAsyncClient.initialize().doOnError(WebClientRequestException.class, ex -> { logger.error("WebClientRequestException occurred during initialization: {}", ex.getMessage()); isRunning = false;}).subscribe(result -> { if (result != null) { logger.info("Sync client 初始化成功"); }});
Ping:
McpSyncClient syncClient = wrapperClient.getClient();Field delegateField = null;try { delegateField = McpSyncClient.class.getDeclaredField("delegate"); delegateField.setAccessible(true); McpAsyncClient asyncClient = (McpAsyncClient) delegateField.get(syncClient); asyncClient.ping().doOnError(WebClientResponseException.class, ex -> { logger.error("Ping failed for {}", serviceName); mcpClientWrapperMap.remove(serviceName); reconnectTaskQueue.offer(new McpReconnectTask(serviceName, mcpRecoveryProperties.getDelay().getSeconds(), TimeUnit.SECONDS)); logger.info("need reconnect: {}", serviceName); }).subscribe();}
往期文章
第一章内容
SpringAI(GA)的chat:快速上手+自动注入源码解读
第二章内容
SpringAI(GA):Sqlite、Mysql、Redis消息存储快速上手
第三章内容
第四章内容
第五章内容
SpringAI(GA):内存、Redis、ES的向量数据库存储—快速上手
SpringAI(GA):向量数据库理论源码解读+Redis、Es接入源码
第六章内容
第七章内容
SpringAI(GA): SpringAI下的MCP源码解读
Spring AI Alibaba MCP Gateway GateWay无缝斜街存量应用转换 MCP 工具
第八章内容
第九章内容
第十章内容
Spring AI Alibaba Graph:多节点并行—快速上手
Spring AI Alibaba Graph:节点流式透传案例
Spring AI Alibaba Graph:分配MCP到指定节点
Spring AI Alibaba Graph:中断!人类反馈介入,流程丝滑走完~
可付费获取飞书云文档获得更好的观赏体验~ 感谢观众老爷们的支持
学习交流圈
你好,我是影子,曾先后在🐻、新能源、老铁就职,兼任Spring AI Alibaba开源社区的Committer。目前新建了一个交流群,一个人走得快,一群人走得远,另外,本人长期维护一套飞书云文档笔记,涵盖后端、大数据系统化的面试资料,可私信免费获取