掘金 人工智能 07月16日 10:17
MCP的SSE重连机制,源码解析
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文深入解析了MCP的SSE重连机制,主要关注`McpRecoveryAutoConfiguration`、`McpRecoveryProperties`等关键配置类,以及`McpAsyncRecovery`和`McpSyncRecovery`等核心组件。文章详细介绍了重连机制的实现细节,包括如何通过定时任务检测连接状态,以及在连接中断后如何进行异步重连。通过对源码的解读,帮助读者理解如何在Alibaba Cloud AI框架中实现高效、可靠的SSE连接管理。

⚙️ `McpRecoveryAutoConfiguration`:这是MCP恢复的自动配置类,它会根据配置参数决定是否启用重连机制。该类主要负责创建`McpSyncRecovery`或`McpAsyncRecovery`的Bean,并启动定时任务和重连任务。

⏱️ `McpRecoveryProperties`:此配置类定义了MCP重连相关的属性,例如ping间隔、延时队列的间隔时间以及关闭线程池的最大等待时间。这些属性用于控制重连的频率和行为。

🔄 `McpAsyncRecovery`:作为异步Client恢复的核心,`McpAsyncRecovery`负责维护与MCP Server的连接。它使用定时任务定期ping服务器,并通过延时队列处理重连任务。当连接失败时,将服务信息加入延时队列,等待重连。

💡 `McpReconnectTask`:封装了重连任务,包含服务名和延迟时间。该任务被放入延时队列中,当达到延迟时间后,将被取出进行重连操作。

🚧 异常处理:文章特别强调了在Webflux响应式编程中,需要使用`doOnError`来捕获异常,而不是传统的try-catch。这保证了在连接异常时,能够正确触发重连机制。

原文链接可见:MCP的SSE重连机制,源码解析

快速上手可见:MCP的SSE重连机制,低成本接入框架,快速上手

SSE 重连源码解读

配置类

McpRecoveryAutoConfiguration

Mcp 恢复的自动配置类,只有当 spring.ai.alibaba.mcp.recovery.enabled = true 时生效

package com.alibaba.cloud.ai.autoconfigure.mcp.client;import com.alibaba.cloud.ai.mcp.client.McpAsyncRecovery;import com.alibaba.cloud.ai.mcp.client.McpSyncRecovery;import com.alibaba.cloud.ai.mcp.client.config.McpRecoveryProperties;import org.springframework.ai.mcp.client.autoconfigure.configurer.McpAsyncClientConfigurer;import org.springframework.ai.mcp.client.autoconfigure.configurer.McpSyncClientConfigurer;import org.springframework.ai.mcp.client.autoconfigure.properties.McpClientCommonProperties;import org.springframework.ai.mcp.client.autoconfigure.properties.McpSseClientProperties;import org.springframework.ai.mcp.customizer.McpAsyncClientCustomizer;import org.springframework.ai.mcp.customizer.McpSyncClientCustomizer;import org.springframework.beans.factory.ObjectProvider;import org.springframework.boot.autoconfigure.AutoConfiguration;import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;import org.springframework.boot.context.properties.EnableConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.scheduling.annotation.EnableScheduling;@AutoConfiguration@EnableScheduling@EnableConfigurationProperties({ McpSseClientProperties.class, McpClientCommonProperties.class,       McpRecoveryProperties.class })@ConditionalOnProperty(prefix = McpRecoveryProperties.CONFIGPREFIX, name = "enabled", havingValue = "true")public class McpRecoveryAutoConfiguration {    @Bean(name = "mcpSyncRecovery")    @ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "type" }, havingValue = "SYNC",          matchIfMissing = true)    public McpSyncRecovery mcpSyncRecovery(McpRecoveryProperties mcpRecoveryProperties,          McpSseClientProperties mcpSseClientProperties, McpClientCommonProperties mcpClientCommonProperties,          McpSyncClientConfigurer mcpSyncClientConfigurer) {       McpSyncRecovery mcpSyncRecovery = new McpSyncRecovery(mcpRecoveryProperties, mcpSseClientProperties,             mcpClientCommonProperties, mcpSyncClientConfigurer);       mcpSyncRecovery.init();       mcpSyncRecovery.startScheduledPolling();       mcpSyncRecovery.startReconnectTask();       return mcpSyncRecovery;    }    @Bean(name = "mcpAsyncRecovery")    @ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "type" }, havingValue = "ASYNC")    public McpAsyncRecovery mcpAsyncRecovery(McpRecoveryProperties mcpRecoveryProperties,          McpSseClientProperties mcpSseClientProperties, McpClientCommonProperties mcpClientCommonProperties,          McpAsyncClientConfigurer mcpAsyncClientConfigurer) {       McpAsyncRecovery mcpAsyncRecovery = new McpAsyncRecovery(mcpRecoveryProperties, mcpSseClientProperties,             mcpClientCommonProperties, mcpAsyncClientConfigurer);       mcpAsyncRecovery.init();       mcpAsyncRecovery.startScheduledPolling();       mcpAsyncRecovery.startReconnectTask();       return mcpAsyncRecovery;    }    @Bean    @ConditionalOnMissingBean    @ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "type" }, havingValue = "SYNC",          matchIfMissing = true)    McpSyncClientConfigurer mcpSyncClientConfigurer(ObjectProvider<McpSyncClientCustomizer> customizerProvider) {       return new McpSyncClientConfigurer(customizerProvider.orderedStream().toList());    }    @Bean    @ConditionalOnMissingBean    @ConditionalOnProperty(prefix = "spring.ai.mcp.client", name = { "type" }, havingValue = "ASYNC")    McpAsyncClientConfigurer mcpAsyncClientConfigurer(ObjectProvider<McpAsyncClientCustomizer> customizerProvider) {       return new McpAsyncClientConfigurer(customizerProvider.orderedStream().toList());    }}

McpRecoveryProperties

MCP 恢复的属性配置类

package com.alibaba.cloud.ai.mcp.client.config;import org.springframework.boot.context.properties.ConfigurationProperties;import java.time.Duration;@ConfigurationProperties(prefix = McpRecoveryProperties.CONFIGPREFIX)public class McpRecoveryProperties {    public static final String CONFIGPREFIX = "spring.ai.alibaba.mcp.recovery";    private Duration ping = Duration.ofSeconds(5L);    private Duration delay = Duration.ofSeconds(5L);    private Duration stop = Duration.ofSeconds(10L);    public Duration getPing() {       return ping;    }    public void setPing(Duration ping) {       this.ping = ping;    }    public Duration getDelay() {       return delay;    }    public void setDelay(Duration delay) {       this.delay = delay;    }    public Duration getStop() {       return stop;    }    public void setStop(Duration stop) {       this.stop = stop;    }}

组件类

McpSyncClientWrapper

McpSyncClient 及对应的 ToolCallback 列表的封装

package com.alibaba.cloud.ai.mcp.client.component;import io.modelcontextprotocol.client.McpSyncClient;import org.springframework.ai.tool.ToolCallback;import java.util.List;public class McpSyncClientWrapper {    private final McpSyncClient client;    private final List<ToolCallback> toolCallbacks;    public McpSyncClientWrapper(McpSyncClient client, List<ToolCallback> toolCallbacks) {       this.client = client;       this.toolCallbacks = toolCallbacks;    }    public McpSyncClient getClient() {       return client;    }    public List<ToolCallback> getToolCallbacks() {       return toolCallbacks;    }}

McpAsyncClientWrapper

McpAsyncClient 及对应的 ToolCallback 列表的封装

package com.alibaba.cloud.ai.mcp.client.component;import io.modelcontextprotocol.client.McpAsyncClient;import org.springframework.ai.tool.ToolCallback;import java.util.List;public class McpAsyncClientWrapper {    private final McpAsyncClient client;    private final List<ToolCallback> toolCallbacks;    public McpAsyncClientWrapper(McpAsyncClient client, List<ToolCallback> toolCallbacks) {       this.client = client;       this.toolCallbacks = toolCallbacks;    }    public McpAsyncClient getClient() {       return client;    }    public List<ToolCallback> getToolCallbacks() {       return toolCallbacks;    }}

McpReconnectTask

封装对应的服务名,延时时间

package com.alibaba.cloud.ai.mcp.client.component;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;public class McpReconnectTask implements Delayed {    private final String serverName;    private final long delay;    public McpReconnectTask(String serverName, long delay, TimeUnit unit) {       this.serverName = serverName;       this.delay = System.nanoTime() + unit.toNanos(delay);    }    public String getServerName() {       return serverName;    }    @Override    public long getDelay(TimeUnit unit) {       return unit.convert(delay - System.nanoTime(), TimeUnit.NANOSECONDS);    }    @Override    public int compareTo(Delayed o) {       return Long.compare(this.delay, ((McpReconnectTask) o).delay);    }}

核心类

McpAsyncRecovery

McpAsyncClient 恢复的核心类

字段信息如下

字段名称
字段类型
描述
mcpRecoveryProperties
McpRecoveryProperties
MCP重连恢复的属性配置类
mcpSseClientProperties
McpSseClientProperties
SSE连接的基础配置,如url等
commonProperties
McpClientCommonProperties
Client连接的基础信息
mcpAsyncClientConfigurer
McpAsyncClientConfigurer
Async时的Client侧配置类
pingScheduler
ScheduledExecutorService
定时任务线程池,定期ping Mcp Server侧,记录失联的Server信息,记录至延时队列中
reconnectExecutor
ExecutorService
异步线程池,从延时队列中获取失联的Server信息,进行重连
isRunning
boolean
异步线程池,循环的判断条件
mcpClientWrapperMap
Map
记录服务名和对应的Client连接
reconnectTaskQueue
DelayQueue
延时队列,封装重连Server信息

方法信息如下

方法名称
描述
init
Client连接初始化
createClient
实际创建Client连接的逻辑,将新建连接维护至mcpClientWrapperMap中
startScheduledPolling
定时任务线程池,开启定期检查
checkMcpClients
实际ping Mcp Server侧的逻辑,将ping失败记录至延时队列reconnectTaskQueue中,同时移除维护的mcpClientWrapperMap
startReconnectTask
异步线程池,不断循环检查循环队列中是否为空
package com.alibaba.cloud.ai.mcp.client;import com.alibaba.cloud.ai.mcp.client.component.McpAsyncClientWrapper;import com.alibaba.cloud.ai.mcp.client.component.McpReconnectTask;import com.alibaba.cloud.ai.mcp.client.config.McpRecoveryProperties;import com.fasterxml.jackson.databind.ObjectMapper;import io.modelcontextprotocol.client.McpAsyncClient;import io.modelcontextprotocol.client.McpClient;import io.modelcontextprotocol.client.transport.WebFluxSseClientTransport;import io.modelcontextprotocol.spec.McpSchema;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.ai.mcp.AsyncMcpToolCallbackProvider;import org.springframework.ai.mcp.client.autoconfigure.NamedClientMcpTransport;import org.springframework.ai.mcp.client.autoconfigure.configurer.McpAsyncClientConfigurer;import org.springframework.ai.mcp.client.autoconfigure.properties.McpClientCommonProperties;import org.springframework.ai.mcp.client.autoconfigure.properties.McpSseClientProperties;import org.springframework.ai.tool.ToolCallback;import org.springframework.util.CollectionUtils;import org.springframework.web.reactive.function.client.WebClient;import org.springframework.web.reactive.function.client.WebClientRequestException;import org.springframework.web.reactive.function.client.WebClientResponseException;import java.util.Arrays;import java.util.List;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.DelayQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class McpAsyncRecovery {    private static final Logger logger = LoggerFactory.getLogger(McpSyncRecovery.class);    private final McpRecoveryProperties mcpRecoveryProperties;    private final McpSseClientProperties mcpSseClientProperties;    private final McpClientCommonProperties commonProperties;    private final McpAsyncClientConfigurer mcpAsyncClientConfigurer;    private final ObjectMapper objectMapper = new ObjectMapper();    private final WebClient.Builder webClientBuilderTemplate = WebClient.builder();    private final ScheduledExecutorService pingScheduler = Executors.newSingleThreadScheduledExecutor();    private final ExecutorService reconnectExecutor = Executors.newSingleThreadExecutor();    private volatile boolean isRunning = true;    private final Map<String, McpAsyncClientWrapper> mcpClientWrapperMap = new ConcurrentHashMap<>();    private final DelayQueue<McpReconnectTask> reconnectTaskQueue = new DelayQueue<>();    public McpAsyncRecovery(McpRecoveryProperties mcpRecoveryAutoProperties,          McpSseClientProperties mcpSseClientProperties, McpClientCommonProperties mcpClientCommonProperties,          McpAsyncClientConfigurer mcpAsyncClientConfigurer) {       this.mcpRecoveryProperties = mcpRecoveryAutoProperties;       this.mcpSseClientProperties = mcpSseClientProperties;       this.commonProperties = mcpClientCommonProperties;       this.mcpAsyncClientConfigurer = mcpAsyncClientConfigurer;    }    public void init() {       Map<String, McpSseClientProperties.SseParameters> connections = mcpSseClientProperties.getConnections();       if (CollectionUtils.isEmpty(connections)) {          logger.warn("No MCP connection config found.");          return;       }       connections.forEach(this::createClient);    }    public void startReconnectTask() {       reconnectExecutor.submit(this::processReconnectQueue);    }    private void processReconnectQueue() {       while (isRunning) {          try {             McpReconnectTask task = reconnectTaskQueue.take(); // 从队列中取出任务             String serviceName = task.getServerName();             logger.debug("Processing reconnect task for serviceName: {}", serviceName);             // 尝试创建客户端             boolean clientCreated = createClient(serviceName,                   mcpSseClientProperties.getConnections().get(serviceName));             if (!clientCreated) {                // 如果创建失败,将任务重新放回队列                reconnectTaskQueue.offer(task);                logger.warn("Failed to create client for service: {}, will retry.", serviceName);             }          }          catch (InterruptedException e) {             Thread.currentThread().interrupt(); // 重新设置中断标志             logger.debug("Reconnect thread interrupted", e);          }          catch (Exception e) {             logger.error("Error in reconnect thread", e);          }       }    }    private boolean createClient(String key, McpSseClientProperties.SseParameters params) {       try {          WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(params.url());          String sseEndpoint = params.sseEndpoint() != null ? params.sseEndpoint() : "/sse";          WebFluxSseClientTransport transport = WebFluxSseClientTransport.builder(webClientBuilder)             .sseEndpoint(sseEndpoint)             .objectMapper(objectMapper)             .build();          NamedClientMcpTransport namedTransport = new NamedClientMcpTransport(key, transport);          McpSchema.Implementation clientInfo = new McpSchema.Implementation(                this.connectedClientName(commonProperties.getName(), namedTransport.name()),                commonProperties.getVersion());          McpClient.AsyncSpec asyncSpec = McpClient.async(namedTransport.transport())             .clientInfo(clientInfo)             .requestTimeout(commonProperties.getRequestTimeout());          asyncSpec = mcpAsyncClientConfigurer.configure(namedTransport.name(), asyncSpec);          McpAsyncClient asyncClient = asyncSpec.build();          if (commonProperties.isInitialized()) {             asyncClient.initialize().doOnError(WebClientRequestException.class, ex -> {                logger.error("WebClientRequestException occurred during initialization: {}", ex.getMessage());                isRunning = false;             }).subscribe(result -> {                if (result != null) {                   logger.info("Async client 初始化成功");                }             });          }          if (isRunning) {             logger.info("Initialized server name: {} with server URL: {}", key, params.url());             List<ToolCallback> callbacks = Arrays                .asList(new AsyncMcpToolCallbackProvider(asyncClient).getToolCallbacks());             mcpClientWrapperMap.put(key, new McpAsyncClientWrapper(asyncClient, callbacks));          }          return isRunning;       }       catch (Exception e) {          isRunning = false;          logger.error("Unexpected error occurred during reconnection process", e);          return isRunning;       }    }    public void startScheduledPolling() {       pingScheduler.scheduleAtFixedRate(this::checkMcpClients, mcpRecoveryProperties.getPing().getSeconds(),             mcpRecoveryProperties.getPing().getSeconds(), TimeUnit.SECONDS);    }    private void checkMcpClients() {       logger.debug("Checking MCP clients...");       checkAndRestartTask();       mcpClientWrapperMap.forEach((serviceName, wrapperClient) -> {          wrapperClient.getClient().ping().doOnError(WebClientResponseException.class, ex -> {             logger.error("Ping failed for {}", serviceName);             mcpClientWrapperMap.remove(serviceName);             reconnectTaskQueue.offer(new McpReconnectTask(serviceName,                   mcpRecoveryProperties.getDelay().getSeconds(), TimeUnit.SECONDS));             logger.info("need reconnect: {}", serviceName);          }).subscribe();       });    }    private void checkAndRestartTask() {       if (!isRunning) {          logger.info("Restarting task...");          isRunning = true;          startReconnectTask();       }    }    public List<ToolCallback> getToolCallback() {       return mcpClientWrapperMap.values()          .stream()          .map(McpAsyncClientWrapper::getToolCallbacks)          .flatMap(List::stream)          .toList();    }    public void stop() {       pingScheduler.shutdown();       logger.info("定时ping任务线程池已关闭");       // 关闭异步任务线程池       try {          reconnectExecutor.shutdown();          if (!reconnectExecutor.awaitTermination(mcpRecoveryProperties.getStop().getSeconds(), TimeUnit.SECONDS)) {             reconnectExecutor.shutdownNow();          }          logger.info("异步重连任务线程池已关闭");       }       catch (InterruptedException e) {          logger.error("关闭重连异步任务线程池时发生中断异常", e);          reconnectExecutor.shutdownNow();          Thread.currentThread().interrupt(); // 恢复中断状态       }    }    private String connectedClientName(String clientName, String serverConnectionName) {       return clientName + " - " + serverConnectionName;    }}

这里有个注意点:client 侧和 server 侧的连接的 Webflux 响应式的,try catch 的形式无法捕捉到对应的异常,需要通过 doOnError 的形式去捕获异常

McpSyncRecovery

同上 McpSyncClient 恢复的核心类

package com.alibaba.cloud.ai.mcp.client;import com.alibaba.cloud.ai.mcp.client.component.McpReconnectTask;import com.alibaba.cloud.ai.mcp.client.component.McpSyncClientWrapper;import com.alibaba.cloud.ai.mcp.client.config.McpRecoveryProperties;import com.fasterxml.jackson.databind.ObjectMapper;import io.modelcontextprotocol.client.McpAsyncClient;import io.modelcontextprotocol.client.McpClient;import io.modelcontextprotocol.client.McpSyncClient;import io.modelcontextprotocol.client.transport.WebFluxSseClientTransport;import io.modelcontextprotocol.spec.McpSchema;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.ai.mcp.SyncMcpToolCallbackProvider;import org.springframework.ai.mcp.client.autoconfigure.NamedClientMcpTransport;import org.springframework.ai.mcp.client.autoconfigure.configurer.McpSyncClientConfigurer;import org.springframework.ai.mcp.client.autoconfigure.properties.McpClientCommonProperties;import org.springframework.ai.mcp.client.autoconfigure.properties.McpSseClientProperties;import org.springframework.ai.tool.ToolCallback;import org.springframework.util.CollectionUtils;import org.springframework.web.reactive.function.client.WebClient;import org.springframework.web.reactive.function.client.WebClientRequestException;import org.springframework.web.reactive.function.client.WebClientResponseException;import java.lang.reflect.Field;import java.util.Arrays;import java.util.List;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.DelayQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class McpSyncRecovery {    private static final Logger logger = LoggerFactory.getLogger(McpSyncRecovery.class);    private final McpRecoveryProperties mcpRecoveryProperties;    private final McpSseClientProperties mcpSseClientProperties;    private final McpClientCommonProperties commonProperties;    private final McpSyncClientConfigurer mcpSyncClientConfigurer;    private final ObjectMapper objectMapper = new ObjectMapper();    private final WebClient.Builder webClientBuilderTemplate = WebClient.builder();    private final ScheduledExecutorService pingScheduler = Executors.newSingleThreadScheduledExecutor();    private final ExecutorService reconnectExecutor = Executors.newSingleThreadExecutor();    private volatile boolean isRunning = true;    private final Map<String, McpSyncClientWrapper> mcpClientWrapperMap = new ConcurrentHashMap<>();    private final DelayQueue<McpReconnectTask> reconnectTaskQueue = new DelayQueue<>();    public McpSyncRecovery(McpRecoveryProperties mcpRecoveryProperties, McpSseClientProperties mcpSseClientProperties,          McpClientCommonProperties mcpClientCommonProperties, McpSyncClientConfigurer mcpSyncClientConfigurer) {       this.mcpRecoveryProperties = mcpRecoveryProperties;       this.mcpSseClientProperties = mcpSseClientProperties;       this.commonProperties = mcpClientCommonProperties;       this.mcpSyncClientConfigurer = mcpSyncClientConfigurer;    }    public void init() {       Map<String, McpSseClientProperties.SseParameters> connections = mcpSseClientProperties.getConnections();       if (CollectionUtils.isEmpty(connections)) {          logger.warn("No MCP connection config found.");          return;       }       connections.forEach((serviceName, params) -> {          boolean clientCreated = createClient(serviceName, params);          if (!clientCreated) {             // 如果创建失败,将任务重新放回队列             reconnectTaskQueue.offer(new McpReconnectTask(serviceName,                   mcpRecoveryProperties.getDelay().getSeconds(), TimeUnit.SECONDS));             logger.warn("Failed to create client for serviceName: {}, will retry.", serviceName);          }       });    }    public void startReconnectTask() {       reconnectExecutor.submit(this::processReconnectQueue);    }    private void processReconnectQueue() {       while (isRunning) {          try {             McpReconnectTask task = reconnectTaskQueue.take(); // 从队列中取出任务             String serviceName = task.getServerName();             logger.debug("Processing reconnect task for serviceName: {}", serviceName);             // 尝试创建客户端             boolean clientCreated = createClient(serviceName,                   mcpSseClientProperties.getConnections().get(serviceName));             if (!clientCreated) {                // 如果创建失败,将任务重新放回队列                reconnectTaskQueue.offer(task);                logger.warn("Failed to create client for service: {}, will retry.", serviceName);             }          }          catch (InterruptedException e) {             logger.debug("Reconnect thread interrupted", e);             Thread.currentThread().interrupt();          }          catch (Exception e) {             logger.error("Error in reconnect thread", e);          }       }    }    private boolean createClient(String key, McpSseClientProperties.SseParameters params) {       try {          WebClient.Builder webClientBuilder = webClientBuilderTemplate.clone().baseUrl(params.url());          String sseEndpoint = params.sseEndpoint() != null ? params.sseEndpoint() : "/sse";          WebFluxSseClientTransport transport = WebFluxSseClientTransport.builder(webClientBuilder)             .sseEndpoint(sseEndpoint)             .objectMapper(objectMapper)             .build();          NamedClientMcpTransport namedTransport = new NamedClientMcpTransport(key, transport);          McpSchema.Implementation clientInfo = new McpSchema.Implementation(                this.connectedClientName(commonProperties.getName(), namedTransport.name()),                commonProperties.getVersion());          McpClient.SyncSpec syncSpec = McpClient.sync(namedTransport.transport())             .clientInfo(clientInfo)             .requestTimeout(commonProperties.getRequestTimeout());          syncSpec = mcpSyncClientConfigurer.configure(namedTransport.name(), syncSpec);          McpSyncClient syncClient = syncSpec.build();          if (commonProperties.isInitialized()) {             // 得到syncClient的delegate字段             Field delegateField = McpSyncClient.class.getDeclaredField("delegate");             delegateField.setAccessible(true);             McpAsyncClient mcpAsyncClient = (McpAsyncClient) delegateField.get(syncClient);             mcpAsyncClient.initialize().doOnError(WebClientRequestException.class, ex -> {                logger.error("WebClientRequestException occurred during initialization: {}", ex.getMessage());                isRunning = false;             }).subscribe(result -> {                if (result != null) {                   logger.info("Sync client 初始化成功");                }             });          }          if (isRunning) {             logger.info("Initialized server name: {} with server URL: {}", key, params.url());             List<ToolCallback> callbacks = Arrays                .asList(new SyncMcpToolCallbackProvider(syncClient).getToolCallbacks());             mcpClientWrapperMap.put(key, new McpSyncClientWrapper(syncClient, callbacks));          }          return isRunning;       }       catch (Exception e) {          isRunning = false;          logger.error("Unexpected error occurred during reconnection process", e);          return isRunning;       }    }    public void startScheduledPolling() {       pingScheduler.scheduleAtFixedRate(this::checkMcpClients, mcpRecoveryProperties.getPing().getSeconds(),             mcpRecoveryProperties.getPing().getSeconds(), TimeUnit.SECONDS);    }    private void checkMcpClients() {       logger.debug("Checking MCP clients...");       checkAndRestartTask();       mcpClientWrapperMap.forEach((serviceName, wrapperClient) -> {          McpSyncClient syncClient = wrapperClient.getClient();          Field delegateField = null;          try {             delegateField = McpSyncClient.class.getDeclaredField("delegate");             delegateField.setAccessible(true);             McpAsyncClient asyncClient = (McpAsyncClient) delegateField.get(syncClient);             asyncClient.ping().doOnError(WebClientResponseException.class, ex -> {                logger.error("Ping failed for {}", serviceName);                mcpClientWrapperMap.remove(serviceName);                reconnectTaskQueue.offer(new McpReconnectTask(serviceName,                      mcpRecoveryProperties.getDelay().getSeconds(), TimeUnit.SECONDS));                logger.info("need reconnect: {}", serviceName);             }).subscribe();          }          catch (Exception e) {             logger.error("Ping failed for {}", serviceName, e);          }       });    }    private void checkAndRestartTask() {       if (!isRunning) {          logger.info("Restarting task...");          isRunning = true;          startReconnectTask();       }    }    public List<ToolCallback> getToolCallback() {       return mcpClientWrapperMap.values()          .stream()          .map(McpSyncClientWrapper::getToolCallbacks)          .flatMap(List::stream)          .toList();    }    public void stop() {       pingScheduler.shutdown();       logger.info("定时ping任务线程池已关闭");       // 关闭异步任务线程池       try {          reconnectExecutor.shutdown();          if (!reconnectExecutor.awaitTermination(mcpRecoveryProperties.getStop().getSeconds(), TimeUnit.SECONDS)) {             reconnectExecutor.shutdownNow();          }          logger.info("异步重连任务线程池已关闭");       }       catch (InterruptedException e) {          logger.error("关闭重连异步任务线程池时发生中断异常", e);          reconnectExecutor.shutdownNow();          Thread.currentThread().interrupt(); // 恢复中断状态       }    }    private String connectedClientName(String clientName, String serverConnectionName) {       return clientName + " - " + serverConnectionName;    }}

注意点:这里 Sync 是同步的,再向 Mcp Server 发起 ping 或 初始化连接的请求时,有可能会报 WebClientException 的错误。我们需要得到内部 McpAsyncClient,进行响应式的处理捕获异常

初始化:

// 得到syncClient的delegate字段Field delegateField = McpSyncClient.class.getDeclaredField("delegate");delegateField.setAccessible(true);McpAsyncClient mcpAsyncClient = (McpAsyncClient) delegateField.get(syncClient);mcpAsyncClient.initialize().doOnError(WebClientRequestException.class, ex -> {    logger.error("WebClientRequestException occurred during initialization: {}", ex.getMessage());    isRunning = false;}).subscribe(result -> {    if (result != null) {       logger.info("Sync client 初始化成功");    }});

Ping:

McpSyncClient syncClient = wrapperClient.getClient();Field delegateField = null;try {    delegateField = McpSyncClient.class.getDeclaredField("delegate");    delegateField.setAccessible(true);    McpAsyncClient asyncClient = (McpAsyncClient) delegateField.get(syncClient);    asyncClient.ping().doOnError(WebClientResponseException.class, ex -> {       logger.error("Ping failed for {}", serviceName);       mcpClientWrapperMap.remove(serviceName);       reconnectTaskQueue.offer(new McpReconnectTask(serviceName,             mcpRecoveryProperties.getDelay().getSeconds(), TimeUnit.SECONDS));       logger.info("need reconnect: {}", serviceName);    }).subscribe();}

往期文章

第一章内容

SpringAI(GA)的chat:快速上手+自动注入源码解读

SpringAI(GA):ChatClient调用链路解读

第二章内容

SpringAI的Advisor:快速上手+源码解读

SpringAI(GA):Sqlite、Mysql、Redis消息存储快速上手

第三章内容

SpringAI(GA):Tool工具整合—快速上手

SpringAI(GA):Tool源码+工具触发链路解读

第四章内容

SpringAI(GA):结构化输出的快速上手+源码解读

第五章内容

SpringAI(GA):内存、Redis、ES的向量数据库存储—快速上手

SpringAI(GA):向量数据库理论源码解读+Redis、Es接入源码

第六章内容

SpringAI(GA):RAG快速上手+模块化解读

SpringAI(GA):RAG下的ETL快速上手

SpringAI(GA):RAG下的ETL源码解读

第七章内容

SpringAI(GA):Nacos2下的分布式MCP

SpringAI(GA):Nacos3下的分布式MCP

SpringAI(GA):MCP源码解读

SpringAI(GA): SpringAI下的MCP源码解读

进阶:MCP服务鉴权案例

Spring AI Alibaba MCP Gateway GateWay无缝斜街存量应用转换 MCP 工具

MCP的SSE重连机制,低成本接入框架,快速上手

第八章内容

SpringAI(GA): 多模型评估篇

第九章内容

SpringAI(GA):观测篇快速上手+源码解读

第十章内容

Spring AI Alibaba Graph:快速入门

Spring AI Alibaba Graph:多节点并行—快速上手

Spring AI Alibaba Graph:节点流式透传案例

Spring AI Alibaba Graph:分配MCP到指定节点

Spring AI Alibaba Graph:中断!人类反馈介入,流程丝滑走完~

可付费获取飞书云文档获得更好的观赏体验~ 感谢观众老爷们的支持

学习交流圈

你好,我是影子,曾先后在🐻、新能源、老铁就职,兼任Spring AI Alibaba开源社区的Committer。目前新建了一个交流群,一个人走得快,一群人走得远,另外,本人长期维护一套飞书云文档笔记,涵盖后端、大数据系统化的面试资料,可私信免费获取

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

MCP SSE 重连机制 Alibaba Cloud AI 源码解析
相关文章