掘金 人工智能 9小时前
MCP第3章:开发案例合集(typescript-sdk + python-sdk)
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍了如何使用通义灵码和TypeScript SDK开发MCP服务器,包括创建牛排信息服务和加法运算服务。文章详细展示了服务器端的代码实现、工具定义以及客户端的配置和验证过程。此外,还探讨了如何使用Python SDK构建MCP服务器和客户端,并实现了新闻搜索、情感分析和邮件发送等功能,为开发者提供了完整的MCP服务开发流程指南。

🐄 **牛排信息服务器开发:** 文章展示了如何使用TypeScript SDK和通义灵码创建名为“Steak Information Server”的MCP服务器。该服务器提供了`get_steak_info`工具,可以根据用户指定的牛排类型(如肋眼、西冷、菲力、T骨)和信息类型(基本信息、烹饪方法、温度指南)返回相应的牛排数据。同时,还提供了`list_steak_types`工具,列出所有可用的牛排类型。服务器通过stdin/stdout与客户端通信。

➕ **加法运算服务器开发:** 为了解决大模型在数学运算中可能出现的精度问题,文章演示了如何开发一个简单的加法运算MCP服务器。该服务器使用`add`工具,接收两个数字作为输入,并返回它们的和。还包含了一个`greeting`资源,可以根据传入的`name`生成个性化的问候语。此部分代码也使用了TypeScript SDK和StdioServerTransport进行通信。

🐍 **Python SDK开发MCP服务:** 文章详细介绍了如何使用Python SDK构建MCP服务器和客户端。通过`uv init mcp_news`初始化项目,并创建了`server.py`和`client.py`文件。`server.py`实现了`search_google_news`工具(利用Serper API搜索新闻并保存为JSON文件)、`analyze_sentiment`工具(对文本进行情感分析并保存为Markdown文件)和`send_email_with_attachment`工具(发送带附件的邮件)。

🚀 **MCP客户端与集成:** Python部分的`client.py`展示了如何连接到MCP服务器,处理用户查询,并执行工具链。它能够根据用户请求规划工具调用顺序,收集工具输出,并将最终结果通过OpenAI模型生成回复。客户端还负责生成报告文件名,并将对话内容保存到本地文件,实现了端到端的服务集成。

⚙️ **配置与验证:** 文章强调了MCP服务器的配置过程,包括在`mcp server`配置文件中指定启动命令和脚本路径,以及在Cherry Studio等平台进行验证。通过实际的提问示例,如“有什么牛排可以点”和“如何烹饪菲力牛排?”,展示了MCP服务器的交互和功能验证。

1. mcp server开发(通义灵码+typescript-sdk)

1.1. 通义灵码插件生成 mcp server 程序

使用nodejs开发一个MCP服邻器,当用户问大模型的提示词中有提到牛排相关的问题时,大模型会调用MCP服务,并返回牛排数据

// steak-mcp-server.jsimport { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";import { z } from "zod";// 创建一个MCP服务器const server = new McpServer({  name: "Steak Information Server",  version: "1.0.0"});// 牛排数据const steakData = {  ribeye: {    name: "肋眼牛排 (Ribeye)",    description: "来自牛肋脊部位的牛排,脂肪含量高,口感丰富多汁",    cookingMethods: ["烧烤", "煎制", "烤箱烘烤"],    internalTemp: {      rare: "120-125°F (一分熟)",      mediumRare: "130-135°F (三分熟)",      medium: "135-145°F (五分熟)",      mediumWell: "145-155°F (七分熟)",      wellDone: "155°F+ (全熟)"    },    weight: "通常为8-12盎司"  },  sirloin: {    name: "西冷牛排 (Sirloin)",    description: "来自牛后腰部位,瘦肉较多,口感有嚼劲",    cookingMethods: ["烧烤", "煎制", "烤箱烘烤"],    internalTemp: {      rare: "120-125°F (一分熟)",      mediumRare: "130-135°F (三分熟)",      medium: "135-145°F (五分熟)",      mediumWell: "145-155°F (七分熟)",      wellDone: "155°F+ (全熟)"    },    weight: "通常为6-10盎司"  },  filetMignon: {    name: "菲力牛排 (Filet Mignon)",    description: "来自牛里脊部位,是最嫩的牛排,脂肪含量低",    cookingMethods: ["煎制", "烧烤", "烤箱烘烤"],    internalTemp: {      rare: "120-125°F (一分熟)",      mediumRare: "130-135°F (三分熟)",      medium: "135-145°F (五分熟)",      mediumWell: "145-155°F (七分熟)",      wellDone: "155°F+ (全熟)"    },    weight: "通常为6-8盎司"  },  tBone: {    name: "T骨牛排 (T-Bone)",    description: "带有T字形骨头,一边是菲力,一边是纽约客牛排",    cookingMethods: ["烧烤", "煎制"],    internalTemp: {      rare: "120-125°F (一分熟)",      mediumRare: "130-135°F (三分熟)",      medium: "135-145°F (五分熟)",      mediumWell: "145-155°F (七分熟)",      wellDone: "155°F+ (全熟)"    },    weight: "通常为10-16盎司"  }};// 定义获取牛排信息的工具server.tool("get_steak_info",  {     steakType: z.enum(["ribeye", "sirloin", "filetMignon", "tBone"]).describe("牛排类型"),    infoType: z.enum(["basic", "cooking", "temperature"]).optional().describe("信息类型:basic(基本信息), cooking(烹饪方法), temperature(温度指南)")  },  async ({steakType, infoType = "basic"}) => {    try {      const steak = steakData[steakType];            if (!steak) {        return {          content: [{ type: "text", text: `未找到牛排类型: ${steakType}` }]        };      }            let responseText = "";            switch (infoType) {        case "cooking":          responseText = `【${steak.name}】推荐烹饪方法:\n${steak.cookingMethods.join("、")}`;          break;                  case "temperature":          responseText = `【${steak.name}】烹饪温度指南:\n` +            Object.entries(steak.internalTemp)              .map(([level, temp]) => `${level}: ${temp}`)              .join("\n");          break;                  case "basic":        default:          responseText = `【${steak.name}】\n` +            `描述: ${steak.description}\n` +            `重量: ${steak.weight}\n` +            `推荐烹饪方法: ${steak.cookingMethods.join("、")}`;          break;      }            return {        content: [{ type: "text", text: responseText }]      };    } catch (error) {      return {        content: [{ type: "text", text: `获取牛排信息时出错: ${error.message}` }]      };    }  });// 定义获取所有牛排类型列表的工具server.tool("list_steak_types",  {},  async () => {    try {      const steakList = Object.entries(steakData)        .map(([key, steak]) => `${key}: ${steak.name}`)        .join("\n");              return {        content: [{           type: "text",           text: `可用的牛排类型:\n${steakList}\n\n使用 get_steak_info 工具获取特定牛排的详细信息。`         }]      };    } catch (error) {      return {        content: [{ type: "text", text: `获取牛排列表时出错: ${error.message}` }]      };    }  });// 启动服务器,通过stdin/stdout进行通信const transport = new StdioServerTransport();await server.connect(transport);

1.2. 编写 mcp server 配置

{  "mcpServers": {    "my-steak-server": {      "command": "node",      "args": [        "D:\project\AI\MCP\mcpserver\my-server3\steak.js"              ],      "disabled": true,      "autoApprove": []    }  }}

1.3. 配置到 cherry studio 后进行验证

有什么牛排可以点

如何烹饪菲力牛排?

2. mcp server开发(typescript-sdk)

2.1. 新建项目

项目名称:my-server3

2.2. 安装 typescript sdk

npm install @modelcontextprotocol/sdk

参考:github.com/modelcontex…

2.3. 开发加法运算 mcpserver 程序

大模型对数学运算存在精度问题,这里通过示例自定义一个 mcp server 来解决这个问题。

示例代码放到将 demo.js 文件中。

import { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";import { z } from "zod";// Create an MCP serverconst server = new McpServer({  name: "Demo",  version: "1.0.0"});// Add an addition toolserver.tool("add",  { a: z.number(), b: z.number() },  async ({ a, b }) => ({    content: [{ type: "text", text: String(a + b) }]  }));// Add a dynamic greeting resourceserver.resource(  "greeting",  new ResourceTemplate("greeting://{name}", { list: undefined }),  async (uri, { name }) => ({    contents: [{      uri: uri.href,      text: `Hello, ${name}!`    }]  }));// Start receiving messages on stdin and sending messages on stdoutconst transport = new StdioServerTransport();await server.connect(transport);

2.4. cline 配置 mcp 服务

配置如下:

{  "mcpServers": {    "my-add-server": {      "command": "node",      "args": [        "D:\project\AI\MCP\mcpserver\my-server3\demo.js"              ],      "disabled": true,      "autoApprove": []    }  }}

demo.js 保存后,出现在了左边列表上, 启用该 MCP server,如果是红灯,即点【Restart Server】重启下,变绿后, 然后点【Done】

2.5. 测试 mcp 服务

如果是比较小的数值运算,大模型能正常处理,没误差的话,可能就不会调用工具。这种情况下,可以在提示词指定工具。

提问如下:

2342342423423432 + 2342342342343等于多少

示例如下:

3. mcp server和mcp client开发(python-sdk)

3.1. 项目构建

3.1.1. 初始化项目

uv init mcp_news

3.1.2. 创建 .env, server.py 和 client.py

3.1.3. .evn

BASE_URL="https://dashscope.aliyuncs.com/compatible-mode/v1"MODEL=qwen2.5-vl-32b-instructDASHSCOPE_API_KEY="sk-bffdxxxxxxe927d67273"SERPER_API_KEY="618b9909116xxxxx7312428bbba76"SMTP_SERVER=smtp.163.comSMTP_PORT=465EMAIL_USER=codxxxxding@163.comEMAIL_PASS=AZeDNxxxxxt3Vr

3.1.4. client.py

📎client.py

import asyncioimport osimport jsonfrom typing import Optional, Listfrom contextlib import AsyncExitStackfrom datetime import datetimeimport refrom openai import OpenAIfrom dotenv import load_dotenvfrom mcp import ClientSession, StdioServerParametersfrom mcp.client.stdio import stdio_clientload_dotenv()class MCPClient:    def __init__(self):        self.exit_stack = AsyncExitStack()        self.openai_api_key = os.getenv("DASHSCOPE_API_KEY")        self.base_url = os.getenv("BASE_URL")        self.model = os.getenv("MODEL")        if not self.openai_api_key:            raise ValueError("❌ 未找到 OpenAI API Key,请在 .env 文件中设置 DASHSCOPE_API_KEY")        self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)        self.session: Optional[ClientSession] = None    async def connect_to_server(self, server_script_path: str):        # 对服务器脚本进行判断,只允许是 .py 或 .js        is_python = server_script_path.endswith('.py')        is_js = server_script_path.endswith('.js')        if not (is_python or is_js):            raise ValueError("服务器脚本必须是 .py 或 .js 文件")        # 确定启动命令,.py 用 python,.js 用 node        command = "python" if is_python else "node"        # 构造 MCP 所需的服务器参数,包含启动命令、脚本路径参数、环境变量(为 None 表示默认)        server_params = StdioServerParameters(command=command, args=[server_script_path], env=None)        # 启动 MCP 工具服务进程(并建立 stdio 通信)        stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))        # 拆包通信通道,读取服务端返回的数据,并向服务端发送请求        self.stdio, self.write = stdio_transport        # 创建 MCP 客户端会话对象        self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))        # 初始化会话        await self.session.initialize()        # 获取工具列表并打印        response = await self.session.list_tools()        tools = response.tools        print("\n已连接到服务器,支持以下工具:", [tool.name for tool in tools])    async def process_query(self, query: str) -> str:        # 准备初始消息和获取工具列表        messages = [{"role": "user", "content": query}]        response = await self.session.list_tools()        available_tools = [            {                "type": "function",                "function": {                    "name": tool.name,                    "description": tool.description,                    "input_schema": tool.inputSchema                }            } for tool in response.tools        ]        # 提取问题的关键词,对文件名进行生成。        # 在接收到用户提问后就应该生成出最后输出的 md 文档的文件名,        # 因为导出时若再生成文件名会导致部分组件无法识别该名称。        keyword_match = re.search(r'(关于|分析|查询|搜索|查看)([^的\s,。、?\n]+)', query)        keyword = keyword_match.group(2) if keyword_match else "分析对象"        safe_keyword = re.sub(r'[\/:*?"<>|]', '', keyword)[:20]        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')        md_filename = f"sentiment_{safe_keyword}_{timestamp}.md"        md_path = os.path.join("./sentiment_reports", md_filename)        # 更新查询,将文件名添加到原始查询中,使大模型在调用工具链时可以识别到该信息        # 然后调用 plan_tool_usage 获取工具调用计划        query = query.strip() + f" [md_filename={md_filename}] [md_path={md_path}]"        messages = [{"role": "user", "content": query}]        tool_plan = await self.plan_tool_usage(query, available_tools)        tool_outputs = {}        messages = [{"role": "user", "content": query}]        # 依次执行工具调用,并收集结果        for step in tool_plan:            tool_name = step["name"]            tool_args = step["arguments"]            for key, val in tool_args.items():                if isinstance(val, str) and val.startswith("{{") and val.endswith("}}"):                    ref_key = val.strip("{} ")                    resolved_val = tool_outputs.get(ref_key, val)                    tool_args[key] = resolved_val            # 注入统一的文件名或路径(用于分析和邮件)            if tool_name == "analyze_sentiment" and "filename" not in tool_args:                tool_args["filename"] = md_filename            if tool_name == "send_email_with_attachment" and "attachment_path" not in tool_args:                tool_args["attachment_path"] = md_path            result = await self.session.call_tool(tool_name, tool_args)            tool_outputs[tool_name] = result.content[0].text            messages.append({                "role": "tool",                "tool_call_id": tool_name,                "content": result.content[0].text            })        # 调用大模型生成回复信息,并输出保存结果        final_response = self.client.chat.completions.create(            model=self.model,            messages=messages        )        final_output = final_response.choices[0].message.content        # 对辅助函数进行定义,目的是把文本清理成合法的文件名        def clean_filename(text: str) -> str:            text = text.strip()            text = re.sub(r'[\/:*?"<>|]', '', text)            return text[:50]        # 使用清理函数处理用户查询,生成用于文件命名的前缀,并添加时间戳、设置输出目录        # 最后构建出完整的文件路径用于保存记录        safe_filename = clean_filename(query)        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')        filename = f"{safe_filename}_{timestamp}.txt"        output_dir = "./llm_outputs"        os.makedirs(output_dir, exist_ok=True)        file_path = os.path.join(output_dir, filename)        # 将对话内容写入 md 文档,其中包含用户的原始提问以及模型的最终回复结果        with open(file_path, "w", encoding="utf-8") as f:            f.write(f"🗣 用户提问:{query}\n\n")            f.write(f"🤖 模型回复:\n{final_output}\n")        print(f"📄 对话记录已保存为:{file_path}")        return final_output    async def chat_loop(self):        # 初始化提示信息        print("\n🤖 MCP 客户端已启动!输入 'quit' 退出")        # 进入主循环中等待用户输入        while True:            try:                query = input("\n你: ").strip()                if query.lower() == 'quit':                    break                # 处理用户的提问,并返回结果                response = await self.process_query(query)                print(f"\n🤖 AI: {response}")            except Exception as e:                print(f"\n⚠️ 发生错误: {str(e)}")    async def plan_tool_usage(self, query: str, tools: List[dict]) -> List[dict]:        # 构造系统提示词 system_prompt。        # 将所有可用工具组织为文本列表插入提示中,并明确指出工具名,        # 限定返回格式是 JSON,防止其输出错误格式的数据。        print("\n📤 提交给大模型的工具定义:")        print(json.dumps(tools, ensure_ascii=False, indent=2))        tool_list_text = "\n".join([            f"- {tool['function']['name']}: {tool['function']['description']}"            for tool in tools        ])        system_prompt = {            "role": "system",            "content": (                "你是一个智能任务规划助手,用户会给出一句自然语言请求。\n"                "你只能从以下工具中选择(严格使用工具名称):\n"                f"{tool_list_text}\n"                "如果多个工具需要串联,后续步骤中可以使用 {{上一步工具名}} 占位。\n"                "返回格式:JSON 数组,每个对象包含 name 和 arguments 字段。\n"                "不要返回自然语言,不要使用未列出的工具名。"            )        }        # 构造对话上下文并调用模型。        # 将系统提示和用户的自然语言一起作为消息输入,并选用当前的模型。        planning_messages = [            system_prompt,            {"role": "user", "content": query}        ]        response = self.client.chat.completions.create(            model=self.model,            messages=planning_messages,            tools=tools,            tool_choice="none"        )        # 提取出模型返回的 JSON 内容        content = response.choices[0].message.content.strip()        match = re.search(r"```(?:json)?\s*([\s\S]+?)\s*```", content)        if match:            json_text = match.group(1)        else:            json_text = content        # 在解析 JSON 之后返回调用计划        try:            plan = json.loads(json_text)            return plan if isinstance(plan, list) else []        except Exception as e:            print(f"❌ 工具调用链规划失败: {e}\n原始返回: {content}")            return []    async def cleanup(self):        await self.exit_stack.aclose()async def main():    server_script_path = r"D:\project\AI\MCP\mcp_news\server.py"    client = MCPClient()    try:        await client.connect_to_server(server_script_path)        await client.chat_loop()    finally:        await client.cleanup()if __name__ == "__main__":    asyncio.run(main())

3.1.5. server.py

📎server.py

import osimport jsonimport smtplibfrom datetime import datetimefrom email.message import EmailMessageimport httpxfrom mcp.server.fastmcp import FastMCPfrom dotenv import load_dotenvfrom openai import OpenAI# 加载环境变量load_dotenv()# 初始化 MCP 服务器mcp = FastMCP("NewsServer")# @mcp.tool() 是 MCP 框架的装饰器,表明这是一个 MCP 工具。之后是对这个工具功能的描述@mcp.tool()async def search_google_news(keyword: str) -> str:    """    使用 Serper API(Google Search 封装)根据关键词搜索新闻内容,返回前5条标题、描述和链接。    参数:        keyword (str): 关键词,如 "小米汽车"    返回:        str: JSON 字符串,包含新闻标题、描述、链接    """    # 从环境中获取 API 密钥并进行检查    api_key = os.getenv("SERPER_API_KEY")    if not api_key:        return "❌ 未配置 SERPER_API_KEY,请在 .env 文件中设置"    # 设置请求参数并发送请求    url = "https://google.serper.dev/news"    headers = {        "X-API-KEY": api_key,        "Content-Type": "application/json"    }    payload = {"q": keyword}    async with httpx.AsyncClient() as client:        response = await client.post(url, headers=headers, json=payload)        data = response.json()    # 检查数据,并按照格式提取新闻,返回前五条新闻    if "news" not in data:        return "❌ 未获取到搜索结果"    articles = [        {            "title": item.get("title"),            "desc": item.get("snippet"),            "url": item.get("link")        } for item in data["news"][:5]    ]    # 将新闻结果以带有时间戳命名后的 JSON 格式文件的形式保存在本地指定的路径    output_dir = "./google_news"    os.makedirs(output_dir, exist_ok=True)    filename = f"google_news_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"    file_path = os.path.join(output_dir, filename)    with open(file_path, "w", encoding="utf-8") as f:        json.dump(articles, f, ensure_ascii=False, indent=2)    return (        f"✅ 已获取与 [{keyword}] 相关的前5条 Google 新闻:\n"        f"{json.dumps(articles, ensure_ascii=False, indent=2)}\n"        f"📄 已保存到:{file_path}"    )# @mcp.tool() 是 MCP 框架的装饰器,标记该函数为一个可调用的工具@mcp.tool()async def analyze_sentiment(text: str, filename: str) -> str:    """    对传入的一段文本内容进行情感分析,并保存为指定名称的 Markdown 文件。    参数:        text (str): 新闻描述或文本内容        filename (str): 保存的 Markdown 文件名(不含路径)    返回:        str: 完整文件路径(用于邮件发送)    """    # 这里的情感分析功能需要去调用 LLM,所以从环境中获取 LLM 的一些相应配置    openai_key = os.getenv("DASHSCOPE_API_KEY")    model = os.getenv("MODEL")    client = OpenAI(api_key=openai_key, base_url=os.getenv("BASE_URL"))    # 构造情感分析的提示词    prompt = f"请对以下新闻内容进行情绪倾向分析,并说明原因:\n\n{text}"    # 向模型发送请求,并处理返回的结果    response = client.chat.completions.create(        model=model,        messages=[{"role": "user", "content": prompt}]    )    result = response.choices[0].message.content.strip()    # 生成 Markdown 格式的舆情分析报告,并存放进设置好的输出目录    markdown = f"""# 舆情分析报告**分析时间:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}---## 📥 原始文本{text}---## 📊 分析结果{result}"""    output_dir = "./sentiment_reports"    os.makedirs(output_dir, exist_ok=True)    if not filename:        filename = f"sentiment_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"    file_path = os.path.join(output_dir, filename)    with open(file_path, "w", encoding="utf-8") as f:        f.write(markdown)    return file_path@mcp.tool()async def send_email_with_attachment(to: str, subject: str, body: str, filename: str) -> str:    """    发送带附件的邮件。    参数:        to: 收件人邮箱地址        subject: 邮件标题        body: 邮件正文        filename (str): 保存的 Markdown 文件名(不含路径)    返回:        邮件发送状态说明    """    # 获取并配置 SMTP 相关信息    smtp_server = os.getenv("SMTP_SERVER")  # 例如 smtp.qq.com    smtp_port = int(os.getenv("SMTP_PORT", 465))    sender_email = os.getenv("EMAIL_USER")    sender_pass = os.getenv("EMAIL_PASS")    # 获取附件文件的路径,并进行检查是否存在    full_path = os.path.abspath(os.path.join("./sentiment_reports", filename))    if not os.path.exists(full_path):        return f"❌ 附件路径无效,未找到文件: {full_path}"    # 创建邮件并设置内容    msg = EmailMessage()    msg["Subject"] = subject    msg["From"] = sender_email    msg["To"] = to    msg.set_content(body)    # 添加附件并发送邮件    try:        with open(full_path, "rb") as f:            file_data = f.read()            file_name = os.path.basename(full_path)            msg.add_attachment(file_data, maintype="application", subtype="octet-stream", filename=file_name)    except Exception as e:        return f"❌ 附件读取失败: {str(e)}"    try:        with smtplib.SMTP_SSL(smtp_server, smtp_port) as server:            server.login(sender_email, sender_pass)            server.send_message(msg)        return f"✅ 邮件已成功发送给 {to},附件路径: {full_path}"    except Exception as e:        return f"❌ 邮件发送失败: {str(e)}"if __name__ == "__main__":    mcp.run(transport='stdio')

3.2. 运行测试

步骤:

    直接运行client.py,输入:

2.“请分析一下马斯克近期的热点新闻,并且发送到593xx3437@qq.com

    执行一段时间后,去邮箱查看

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

MCP 通义灵码 TypeScript SDK Python SDK AI开发
相关文章