掘金 人工智能 05月28日 10:58
MCP 系列四:编程实战,基于 SSE 传输方式的客户端编码实现
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文是MCP系列第四篇,着重介绍了MCP客户端的编程实现,尤其是在SSE传输方式下的应用。文章详细阐述了MCP客户端与服务器的生命周期,包括初始化、操作和关闭阶段。通过分析mcp python sdk的ClientSession类,展示了如何实现与服务端的连接握手和操作交互,如调用工具、获取提示模板和读取资源。此外,还提供了基于SSE传输方式的客户端编码示例,以及通过定义MCPClient类和相关操作方法,实现了与服务端的有效通信,并通过功能测试验证了客户端的各项功能。

🤝 **MCP客户端-服务器生命周期**:MCP定义了严格的客户端-服务器连接生命周期,包括初始化(能力协商和协议版本协商)、操作(协议通信)和关闭(连接优雅终止)三个阶段。

🚀 **客户端初始化实现**:客户端通过发送initialize request(包含协议版本、能力集、客户端信息)与服务端进行握手,服务端返回版本及能力信息,客户端发送initialize notification确认,完成初始化过程。

📡 **基于SSE传输的客户端编程**:通过MCPClient类,使用sse_client创建与指定URL的SSE连接,初始化会话,与服务端建立协商,实现跨网络、跨机器环境的通信。

🛠️ **客户端操作方法**:MCPClient类定义了与服务端交互的多种方法,包括列出全部工具、调用工具、列出全部提示模板、读取模板、列出全部资源、获取资源等,并增加了异常处理逻辑,确保通信的稳定性。

本文是 MCP (模型上下文协议)系列的第四篇文章。本文介绍 MCP 客户端的编程实现(网上有不少介绍 MCP 协议的文章,但关于 MCP 客户端的编程实践的文章很少,特别是基于 sse 传输方式的实现)。本文主要内容如下:

    MCP 的 Client-Server 生命周期MCP 客户端的编码实现,包括与服务端连接的握手、与服务端的操作交互(调用工具、获取提示模板、读取资源)客户端的功能测试验证

Client-Server 生命周期

MCP 定义了一套严格的客户端-服务器连接生命周期,确保规范化的能力协商和状态管理:

    初始化阶段:完成能力协商与协议版本协商一致操作阶段:进行正常的协议通信关闭阶段:实现连接的优雅终止(Graceful termination)

初始化(Initialization)

初始化阶段必须是客户端与服务器的首次交互。初始化阶段步骤如下(类似于传统API的握手协议):

    客户端向服务端发送 initialize request(包含协议版本、能力集、客户端信息)服务端返回版本及能力信息(initialize response)客户端发送 initialize notification,通知确认。

阅读 mcp python sdk 的 ClientSession 类,我们可以看到,调用其 initialize 方法,即可实现上述初始化的三个步骤:

async def initialize(self) -> types.InitializeResult:    sampling = types.SamplingCapability()    roots = types.RootsCapability(        TODO: Should this be based on whether we        # _will_ send notifications, or only whether        # they're supported?        listChanged=True,    )    result = await self.send_request(        types.ClientRequest(            types.InitializeRequest(                method="initialize",                params=types.InitializeRequestParams(                    protocolVersion=types.LATEST_PROTOCOL_VERSION,                    capabilities=types.ClientCapabilities(                        sampling=sampling,                        experimental=None,                        roots=roots,                    ),                    clientInfo=types.Implementation(name="mcp", version="0.1.0"),                ),            )        ),        types.InitializeResult,    )    if result.protocolVersion notin SUPPORTED_PROTOCOL_VERSIONS:        raise RuntimeError(            "Unsupported protocol version from the server: "            f"{result.protocolVersion}"        )        await self.send_notification(            types.ClientNotification(                types.InitializedNotification(method="notifications/initialized")            )        )    return result

操作阶段(Normal protocol operations)

在操作阶段,客户端与服务器依据已协商的能力交换消息。双方应遵循以下原则:

    严格遵守协商一致的协议版本仅使用已成功协商的能力

关闭阶段(Shutdown)

在关闭阶段,一方(通常为客户端)将干净地终止协议连接。此阶段无需定义特定的关闭消息,而是应通过底层传输机制(如 TCP 的连接关闭流程)通知连接终止。

客户端编程实现

网上有不少介绍 MCP 协议的文章,但关于 MCP 客户端的编程实践的文章很少。MCP 的官方 python sdk,有基于 stdio 传输方式的客户端代码实现示例(github.com/modelcontex… stdio 传输方式,主要是针对本地的调试,以快速地验证核心逻辑。若要应用于生产环境,sse 传输方式更合适。sse 支持跨网络、跨机器环境进行通信,客户端与服务端可实现真正的解耦。以下是基于 sse 传输方式的客户端编码实现。

定义 MCPClient 类

1. init

初始化必要的属性,包括会话上下文、流上下文和退出栈。

2. connect_to_sse_server 方法实现 Initialization

connect_to_sse_server方法通过SSE(Server-Sent Events)传输方式连接到MCP服务器:

**
**3. cleanup

在不再需要连接时优雅地关闭所有资源:

import asynciofrom typing import Optionalfrom contextlib import AsyncExitStackfrom mcp import ClientSessionfrom mcp.client.sse import sse_clientfrom typing import Anyimport loggingimport mcp.types as typesfrom pydantic import AnyUrllogging.basicConfig(    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")class MCPClient:    def __init__(self):        self._session_context = None        self._streams_context = None        self.session: Optional[ClientSession] = None        self.exit_stack = AsyncExitStack()    asyncdef connect_to_sse_server(self, server_url: str):        """通过 sse 传输方式连接到 MCP 服务端"""        self._streams_context = sse_client(url=server_url)        streams = await self._streams_context.__aenter__()        self._session_context = ClientSession(*streams)        self.session: ClientSession = await self._session_context.__aenter__()        # 初始化        await self.session.initialize()    asyncdef cleanup(self):        """关闭会话和连接流"""        if self._session_context:            await self._session_context.__aexit__(NoneNoneNone)        if self._streams_context:            await self._streams_context.__aexit__(NoneNoneNone)

定义操作方法

给 MCPClient 类定义与服务端交互的方法:

    列出全部工具 tools、调用工具。列出全部提示模板 prompts、读取模板。列出全部资源 resources、获取资源。

上述的方法实现,是基于 ClientSession 类的方法实现的,并增加异常处理逻辑:

    async def list_tools(self):        """列出全部工具"""        try:            response = await self.session.list_tools()            tools = response.tools        except Exception as e:            error_msg = f"Error executing tool: {str(e)}"            logging.error(error_msg)            return error_msg        return tools    asyncdef execute_tool(            self,            tool_name: str,            arguments: dict[str, Any]    ) -> Any:        """调用工具"""        try:            result = await self.session.call_tool(tool_name, arguments)        except Exception as e:            error_msg = f"Error executing tool: {str(e)}"            logging.error(error_msg)            return error_msg        return result    asyncdef list_prompts(self):        """列出全部提示模板"""        try:            prompt_list = await self.session.list_prompts()        except Exception as e:            error_msg = f"Error executing tool: {str(e)}"            logging.error(error_msg)            return error_msg        return prompt_list    asyncdef get_prompt(self, name: str, arguments: dict[str, str] | None = None):        """读取提示模板内容"""        try:            prompt = await self.session.get_prompt(name=name, arguments=arguments)        except Exception as e:            error_msg = f"Error executing tool: {str(e)}"            logging.error(error_msg)            return error_msg        return prompt    asyncdef list_resources(self) -> types.ListResourcesResult:        """列出全部资源"""        try:            list_resources = await self.session.list_resources()        except Exception as e:            error_msg = f"Error list resources: {str(e)}"            logging.error(error_msg)            return error_msg        return list_resources    asyncdef list_resource_templates(self) -> types.ListResourceTemplatesResult:        """列出全部带参数的资源"""        try:            list_resource_templates = await self.session.list_resource_templates()        except Exception as e:            error_msg = f"Error list resource templates: {str(e)}"            logging.error(error_msg)            return error_msg        return list_resource_templates    asyncdef read_resource(self, uri: AnyUrl) -> types.ReadResourceResult:        """读取资源"""        try:            resource_datas = await self.session.read_resource(uri=uri)        except Exception as e:            error_msg = f"Error list resource templates: {str(e)}"            logging.error(error_msg)            return error_msg        return resource_datas

定义 main 函数

定义 main 函数,验证客户端功能(与服务端连接、调用工具、获取提示模板内容、读取资源):

async def main():    if len(sys.argv) < 2:        print("Usage: python sse_client.py <URL of SSE MCP server (i.e. http://localhost:8080/sse)>")        sys.exit(1)    client = MCPClient()    try:        await client.connect_to_sse_server(server_url=sys.argv[1])        # 列出全部 tools        tools = await client.list_tools()        print('------------列出全部 tools')        for tool in tools:            print(f'---- 工具名称:{tool.name}, 描述:{tool.description}')            print(f"输入参数: {tool.inputSchema}")        # 调用工具        result = await client.execute_tool('add', {'a': 2'b'3})        print(f'工具执行结果:{result}')        # 调用全部 prompts        prompts_list = await client.list_prompts()        print('------------列出全部 prompts')        for prompt in prompts_list.prompts:            print(f'---- prompt 名称: {prompt.name}, 描述:{prompt.description}, 参数:{prompt.arguments}')        # 获取 "介绍中国省份" prompt 内容        province_name = '广东省'        prompt_result = await client.get_prompt(name='introduce_china_province', arguments={'province':province_name})        prompt_content = prompt_result.messages[0].content.text        print(f'-------介绍{province_name}的 prompt:{prompt_content}')        # 列出全部的 resources        resources_list = await client.list_resources()        print('---- 列出全部 resources')        print(resources_list.resources)        # 列出全部的 resource templates        resource_templates_list = await client.list_resource_templates()        print('---- 列出全部 resource templates')        print(resource_templates_list.resourceTemplates)        # 获取全部数据表的表名        uri = AnyUrl('db://tables')        table_names = await client.read_resource(uri)        print('---- 全部数据表:')        print(table_names.contents[0].text)        # 读取某个数据表的数据        uri = AnyUrl("db://tables/chinese_movie_ratings/data/20")        resource_datas = await client.read_resource(uri)        print('chinese_movie_ratings 表数据:')        print(resource_datas.contents[0].text)    finally:        await client.cleanup()if __name__ == "__main__":    import sys    asyncio.run(main())

客户端功能测试验证

1 运用服务端

运行服务端服务(服务端的代码实现,见:MCP:编程实战,手把手教你服务端的开发与功能验证)

python server_see_test.py

2 运行客户端

python sse_client.py http://localhost:8080/sse

运行输出如下(调用工具、获取提示模板内容、读取数据资源,均运行正常):

>python sse_client.py http://localhost:8080/sse2025-04-29 19:56:36,252 - INFO - Connecting to SSE endpoint: http://localhost:8080/sse2025-04-29 19:56:36,810 - INFO - HTTP Request: GET http://localhost:8080/sse "HTTP/1.1 200 OK"2025-04-29 19:56:36,810 - INFO - Received endpoint URL: http://localhost:8080/messages/?session_id=a290c049f5b54bc6b44fbe2eeb46684f2025-04-29 19:56:36,810 - INFO - Starting post writer with endpoint URL: http://localhost:8080/messages/?session_id=a290c049f5b54bc6b44fbe2eeb46684f2025-04-29 19:56:37,078 - INFO - HTTP Request: POST http://localhost:8080/messages/?session_id=a290c049f5b54bc6b44fbe2eeb46684f "HTTP/1.1 202 Accepted"2025-04-29 19:56:37,082 - INFO - HTTP Request: POST http://localhost:8080/messages/?session_id=a290c049f5b54bc6b44fbe2eeb46684f "HTTP/1.1 202 Accepted"2025-04-29 19:56:37,085 - INFO - HTTP Request: POST http://localhost:8080/messages/?session_id=a290c049f5b54bc6b44fbe2eeb46684f "HTTP/1.1 202 Accepted"------------列出全部 tools---- 工具名称:add, 描述:加法运算    参数:    a: 第一个数字    b: 第二个数字    返回:    两数之和输入参数: {'properties': {'a': {'title': 'A', 'type': 'number'}, 'b': {'title': 'B', 'type': 'number'}}, 'required': ['a', 'b'], 'title': 'addArguments', 'type': 'object'}---- 工具名称:subtract, 描述:减法运算    参数:    a: 第一个数字    b: 第二个数字    返回:    两数之差 (a - b)输入参数: {'properties': {'a': {'title': 'A', 'type': 'number'}, 'b': {'title': 'B', 'type': 'number'}}, 'required': ['a', 'b'], 'title': 'subtractArguments', 'type': 'object'}---- 工具名称:multiply, 描述:乘法运算    参数:    a: 第一个数字    b: 第二个数字    返回:    两数之积输入参数: {'properties': {'a': {'title': 'A', 'type': 'number'}, 'b': {'title': 'B', 'type': 'number'}}, 'required': ['a', 'b'], 'title': 'multiplyArguments', 'type': 'object'}---- 工具名称:divide, 描述:除法运算    参数:    a: 被除数    b: 除数    返回:    两数之商 (a / b)    异常:    ValueError: 当除数为零时输入参数: {'properties': {'a': {'title': 'A', 'type': 'number'}, 'b': {'title': 'B', 'type': 'number'}}, 'required': ['a', 'b'], 'title': 'divideArguments', 'type': 'object'}2025-04-29 19:56:37,094 - INFO - HTTP Request: POST http://localhost:8080/messages/?session_id=a290c049f5b54bc6b44fbe2eeb46684f "HTTP/1.1 202 Accepted"工具执行结果:meta=None content=[TextContent(type='text', text='5.0', annotations=None)] isError=False2025-04-29 19:56:37,099 - INFO - HTTP Request: POST http://localhost:8080/messages/?session_id=a290c049f5b54bc6b44fbe2eeb46684f "HTTP/1.1 202 Accepted"------------列出全部 prompts---- prompt 名称: introduce_china_province, 描述:介绍中国省份    参数:    province: 省份名称    , 参数:[PromptArgument(name='province', description=None, required=True)]---- prompt 名称: debug_code, 描述:调试代码的对话式提示模板    参数:    code: 需要调试的代码    error_message: 错误信息    , 参数:[PromptArgument(name='code', description=None, required=True), PromptArgument(name='error_message', description=None, required=True)]2025-04-29 19:56:37,104 - INFO - HTTP Request: POST http://localhost:8080/messages/?session_id=a290c049f5b54bc6b44fbe2eeb46684f "HTTP/1.1 202 Accepted"-------介绍广东省的 prompt:    请介绍这个省份:广东省    要求介绍以下内容:    1. 历史沿革    2. 人文地理、风俗习惯    3. 经济发展状况    4. 旅游建议2025-04-29 19:56:37,111 - INFO - HTTP Request: POST http://localhost:8080/messages/?session_id=a290c049f5b54bc6b44fbe2eeb46684f "HTTP/1.1 202 Accepted"---- 列出全部 resources[Resource(uri=AnyUrl('test://hello'), name='test://hello', description=None, mimeType='text/plain', size=None, annotations=None), Resource(uri=AnyUrl('db://test'), name='db://test', description=None, mimeType='text/plain', size=None, annotations=None), Resource(uri=AnyUrl('db://tables'), name='db://tables', description=None, mimeType='text/plain', size=None, annotations=None)]2025-04-29 19:56:37,116 - INFO - HTTP Request: POST http://localhost:8080/messages/?session_id=a290c049f5b54bc6b44fbe2eeb46684f "HTTP/1.1 202 Accepted"---- 列出全部 resource templates[ResourceTemplate(uriTemplate='db://tables/{table_name}/data/{limit}', name='get_table_data', description='获取指定表的数据\n\n    参数:\n    table_name: 表名\n    ', mimeType=None, annotations=None), ResourceTemplate(uriTemplate='db://tables/{table_name}/schema', name='get_table_schema', description='获取表结构信息\n\n    参数:\n    table_name: 表名\n    ', mimeType=None, annotations=None)]2025-04-29 19:56:37,121 - INFO - HTTP Request: POST http://localhost:8080/messages/?session_id=a290c049f5b54bc6b44fbe2eeb46684f "HTTP/1.1 202 Accepted"---- 全部数据表:["chinese_provinces", "chinese_movie_ratings"]2025-04-29 19:56:37,168 - INFO - HTTP Request: POST http://localhost:8080/messages/?session_id=a290c049f5b54bc6b44fbe2eeb46684f "HTTP/1.1 202 Accepted"chinese_movie_ratings 表数据:[{"movie_type": "剧情", "main_actors": "徐峥|王传君|周一围|谭卓|章宇", "region": "中国大陆", "director": "文牧野", "features": "经典", "rating": "8.9", "movie_name": "我不是药神"}, {"movie_type": "剧情", "main_actors": "冯小刚|许晴|张涵予|刘桦|李易峰", "region": "中国大陆", "director": "管虎", "features": "经典", "rating": "7.8", "movie_name": "老炮儿"}, {"movie_type": "剧情", "main_actors": "王宝强|刘昊然|肖央|刘承羽|尚语贤", "region": "中国大陆", "director": "陈思诚", "features": "经典", "rating": "6.7", "movie_name": "唐人街探案2"}, {"movie_type": "剧情", "main_actors": "任素汐|大力|刘帅良|裴魁山|阿如那", "region": "中国大陆", "director": "周申|刘露", "features": "经典", "rating": "8.3", "movie_name": "驴得水"}, {"movie_type": "剧情", "main_actors": "徐峥|王宝强|李曼|李小璐|左小青", "region": "中国大陆", "director": "叶伟民", "features": "经典", "rating": "7.5", "movie_name": "人在囧途"}, {"movie_type": "剧情", "main_actors": "徐峥|黄渤|余男|多布杰|王双宝", "region": "中国大陆", "director": "宁浩", "features": "经典", "rating": "8.1", "movie_name": "无人区"}, {"movie_type": "剧情", "main_actors": "姜文|香川照之|袁丁|姜宏波|丛志军", "region": "中国大陆", "director": "姜文", "features": "经典", "rating": "9.2", "movie_name": "鬼子来了"}, {"movie_type": "剧情", "main_actors": "章子怡|黄晓明|张震|王力宏|陈楚生", "region": "中国大陆", "director": "李芳芳", "features": "经典", "rating": "7.6", "movie_name": "无问西东"}, {"movie_type": "剧情", "main_actors": "彭于晏|廖凡|姜文|周韵|许晴", "region": "中国大陆", "director": "姜文", "features": "经典", "rating": "7.1", "movie_name": "邪不压正"}, {"movie_type": "剧情", "main_actors": "肖央|王太利|韩秋池|于蓓蓓", "region": "中国大陆", "director": "肖央", "features": "经典", "rating": "8.5", "movie_name": "11度青春之"}, {"movie_type": "剧情", "main_actors": "阿尔文|加内特|赫德兰|克里斯汀|斯图尔特|迪塞尔|李淳", "region": "中国大陆", "director": "李安", "features": "经典", "rating": "8.4", "movie_name": "比利"}, {"movie_type": "剧情", "main_actors": "王千源|秦海璐|张申英|刘谦|罗二羊", "region": "中国大陆", "director": "张猛", "features": "经典", "rating": "8.4", "movie_name": "钢的琴"}, {"movie_type": "剧情", "main_actors": "霍卫民|王笑天|罗芸|杨瑜珍|孙黎", "region": "中国大陆", "director": "忻钰坤", "features": "经典", "rating": "8.6", "movie_name": "心迷宫"}, {"movie_type": "剧情", "main_actors": "陈道明|巩俐|张慧雯|郭涛|刘佩琦", "region": "中国大陆", "director": "张艺谋", "features": "经典", "rating": "7.8", "movie_name": "归来"}, {"movie_type": "剧情", "main_actors": "周迅|金城武|张学友|池珍熙|曾志伟", "region": "中国大陆", "director": "陈可辛|赵良骏", "features": "经典", "rating": "7.6", "movie_name": "如果"}, {"movie_type": "剧情", "main_actors": "汤姆|克鲁斯|杰瑞米|雷纳|西蒙|佩吉|丽贝卡|弗格森|瑞姆斯", "region": "中国大陆", "director": "克里斯托弗|麦奎里", "features": "经典", "rating": "7.7", "movie_name": "碟中谍5"}, {"movie_type": "剧情", "main_actors": "夏雨|李冰冰|龚蓓苾|高旗|吴超", "region": "中国大陆", "director": "伍仕贤", "features": "经典", "rating": "8.1", "movie_name": "独自等待"}, {"movie_type": "剧情", "main_actors": "葛优|刘蓓|何冰|冯小刚|英达", "region": "中国大陆", "director": "冯小刚", "features": "经典", "rating": "8.3", "movie_name": "甲方乙方"}, {"movie_type": "剧情", "main_actors": "梁朝伟|刘德华|黎明|陈道明|陈慧琳", "region": "中国大陆", "director": "刘伟强|麦兆辉", "features": "经典", "rating": "7.8", "movie_name": "无间道3"}, {"movie_type": "剧情", "main_actors": "甄子丹|洪金宝|熊黛林|黄晓明|樊少皇", "region": "中国大陆", "director": "叶伟信", "features": "经典", "rating": "7.2", "movie_name": "叶问2"}]

以上是基于 sse 传输方式的 mcp 客户端编码实现。

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

MCP 客户端编程 SSE传输 Python SDK
相关文章