项目已开源: github.com/TickHaiJun/…
配置好数据库,可直接通过语义化操作你的数据库数据
Hello, 大家好,我是程序员海军, 全栈开发
|AI爱好者
| 独立开发
。
今天就来和大家分享一下,如何从零开始开发一个能够操作MySQL数据库的MCP Server。
相信很多朋友都有过这样的想法:要是AI助手能直接帮我查询数据库就好了!现在,这个想法完全可以实现。
什么是MCP Server?为什么要开发它?
MCP的魅力所在
说到MCP,可能很多朋友还比较陌生。MCP全称Model Context Protocol,是Anthropic推出的一个协议标准,允许AI模型通过标准化接口调用外部工具和服务。
简单打个比方:如果把AI助手比作一个聪明的助理,那么MCP就像是给这个助理配备了各种专业工具。原本助理只能动嘴说话,现在可以动手操作数据库、调用API、处理文件了。
技术选型
在开始动手之前,我们先来聊聊技术选型。这次我选择了:
- Express:轻量级的Node.js框架,上手简单@modelcontextprotocol/sdk:Anthropic官方提供的MCP开发工具包MySQL2:性能优秀的MySQL驱动
核心架构设计:安全第一
在开始编码之前,我们需要先设计好整体架构。一个好的MCP Server不仅要功能完善,更要足够安全。
安全防护策略
数据库操作的安全性不能马虎,我设计了多层防护:
第一层:SQL语句验证
- 只允许SELECT、INSERT、UPDATE、DELETE操作禁止DROP、TRUNCATE等危险操作检测常见的SQL注入模式
第二层:参数化查询
- 所有用户输入都通过参数化查询处理避免直接拼接SQL字符串
第三层:连接池管理
- 使用连接池避免连接泄露设置合理的超时和重连机制
模块化设计
为了代码的可维护性,我把功能拆分成了几个核心模块:
src/├── index.js # 程序入口├── server.js # MCP服务器核心逻辑├── database.js # 数据库操作封装├── validators.js # SQL安全验证└── config.js # 配置管理
这样的设计有几个好处:
- 职责清晰,每个模块专注自己的功能便于单元测试后续扩展更容易
MCP Server 实现
第一步:SQL安全验证器
SQL验证器是整个系统的安全基础,我花了不少心思在这上面:
class SQLValidator { constructor() { // 危险关键词黑名单 this.dangerousKeywords = [ 'drop table', 'drop database', 'truncate', 'alter table', 'create database', 'drop index', // ...更多危险操作 ]; } validateSQL(sql) { // 多重验证逻辑 // 1. 检查危险关键词 // 2. 验证操作类型 // 3. SQL注入风险检测 }}
这个验证器的设计思路是"白名单+黑名单"双重保护。白名单确保只允许安全操作,黑名单拦截已知的危险模式。
第二步:数据库连接管理
数据库连接管理是另一个重点。线上环境中,连接管理不当很容易导致系统崩溃:
class DatabaseManager { async initialize() { this.pool = mysql.createPool({ host: config.database.host, connectionLimit: 10, // 限制连接数 acquireTimeout: 60000, // 获取连接超时 reconnect: true // 自动重连 }); }}
连接池的配置需要根据实际业务调整。我这里设置了10个连接,对于一般的开发和测试环境够用了。
第三步:MCP服务器实现
MCP服务器的核心是工具注册和调用处理:
const { Server } = require('@modelcontextprotocol/sdk/server/index.js');const { StdioServerTransport } = require('@modelcontextprotocol/sdk/server/stdio.js');const { ListToolsRequestSchema, CallToolRequestSchema,} = require('@modelcontextprotocol/sdk/types.js');const DatabaseManager = require('./database');const SQLValidator = require('./validators');const config = require('./config');class MCPMySQLServer { constructor() { this.server = new Server({ name: config.mcp.name, version: config.mcp.version, }, { capabilities: { tools: {}, }, }); this.dbManager = new DatabaseManager(); this.validator = new SQLValidator(); this.setupHandlers(); } /** * 设置MCP服务器处理器 */ setupHandlers() { this.server.setRequestHandler(ListToolsRequestSchema, async () => { return { tools: [ { name: "execute_sql", description: "执行SQL查询语句,支持SELECT、INSERT、UPDATE、DELETE操作", inputSchema: { type: "object", properties: { sql: { type: "string", description: "要执行的SQL语句" }, params: { type: "array", description: "SQL查询参数(可选)", items: { type: ["string", "number", "boolean", "null"] } } }, required: ["sql"] } }, { name: "get_tables_info", description: "获取数据库表结构信息", inputSchema: { type: "object", properties: {} } }, { name: "get_connection_status", description: "获取数据库连接状态", inputSchema: { type: "object", properties: {} } } ] }; }); // 执行工具调用 this.server.setRequestHandler(CallToolRequestSchema, async (request) => { const { name, arguments: args } = request.params; try { switch (name) { case "execute_sql": return await this.handleExecuteSQL(args); case "get_tables_info": return await this.handleGetTablesInfo(); case "get_connection_status": return await this.handleGetConnectionStatus(); default: throw new Error(`未知的工具: ${name}`); } } catch (error) { return { content: [ { type: "text", text: `错误: ${error.message}` } ], isError: true }; } }); } /** * 处理SQL执行请求 * @param {Object} args - 请求参数 * @returns {Object} 执行结果 */ async handleExecuteSQL(args) { const { sql, params = [] } = args; // 验证SQL语句 const sqlValidation = this.validator.validateSQL(sql); if (!sqlValidation.isValid) { return { content: [ { type: "text", text: `SQL验证失败: ${sqlValidation.error}` } ], isError: true }; } // 验证参数 const paramsValidation = this.validator.validateParams(params); if (!paramsValidation.isValid) { return { content: [ { type: "text", text: `参数验证失败: ${paramsValidation.error}` } ], isError: true }; } // 执行SQL const result = await this.dbManager.executeQuery(sql, params); if (result.success) { let responseText = `✓ SQL执行成功\n`; responseText += `操作类型: ${result.operation}\n`; responseText += `执行时间: ${result.executionTime}ms\n`; responseText += `影响行数: ${result.rowCount}\n`; if (result.insertId) { responseText += `插入ID: ${result.insertId}\n`; } if (result.operation === 'SELECT' && result.data.length > 0) { responseText += `\n查询结果:\n`; responseText += JSON.stringify(result.data, null, 2); } return { content: [ { type: "text", text: responseText } ] }; } else { return { content: [ { type: "text", text: `✗ SQL执行失败: ${result.error}` } ], isError: true }; } } /** * 处理获取表信息请求 * @returns {Object} 表信息 */ async handleGetTablesInfo() { const result = await this.dbManager.getTablesInfo(); if (result.success) { let responseText = `数据库表信息:\n\n`; for (const table of result.data) { responseText += `表名: ${table.name}\n`; responseText += `字段信息:\n`; for (const column of table.columns) { responseText += ` - ${column.Field} (${column.Type}) ${column.Null === 'NO' ? 'NOT NULL' : 'NULL'} ${column.Key ? column.Key : ''}\n`; } responseText += `\n`; } return { content: [ { type: "text", text: responseText } ] }; } else { return { content: [ { type: "text", text: `获取表信息失败: ${result.error}` } ], isError: true }; } } /** * 处理获取连接状态请求 * @returns {Object} 连接状态 */ async handleGetConnectionStatus() { const isActive = this.dbManager.isConnectionActive(); return { content: [ { type: "text", text: `数据库连接状态: ${isActive ? '已连接' : '未连接'}` } ] }; } /** * 启动MCP服务器 */ async start() { try { // 初始化数据库连接 await this.dbManager.initialize(); // 启动MCP服务器 const transport = new StdioServerTransport(); await this.server.connect(transport); console.log(`✓ MCP MySQL Server 启动成功`); console.log(`服务器名称: ${config.mcp.name}`); console.log(`服务器版本: ${config.mcp.version}`); } catch (error) { console.error('✗ MCP服务器启动失败:', error.message); process.exit(1); } } /** * 停止服务器 */ async stop() { await this.dbManager.close(); console.log('✓ MCP服务器已停止'); }}module.exports = MCPMySQLServer;
这里的设计遵循了MCP协议规范。
在 Cursor 中 验证 开发的 MCP Server 是否能运行
Cursor 添加 MCP Server
通过 Cursor Settings -> MCP -> Add new global MCP server可以将 MCP 服务添加为全局可用。这意味着你配置的 MCP 服务将在所有项目中生效。
也可以只针对 项目级别 添加
- 在项目的 .cursor目录下,新建一个 mcp.json文件进行配置,这样的设置只会对特定项目生效。
mcp server 的格式是如何呢
配置 server 基本格式规范
{ "mcpServers": { "<server-name>": { "command": "<启动命令>", "args": ["<参数1>", "<参数2>", ...], "env": { "<环境变量名1>": "<值1>", "<环境变量名2>": "<值2>", ... }, "transport": "<传输协议>", "port": <端口号>, "host": "<主机地址>" } }}
查询数据库表和字段信息
查看当前数据库是否连接
查询表里数据
看到这里,是不是觉得很神奇?我们用自然语言描述需求,AI助手自动生成并执行SQL,然后分析结果给出结论。
扩展MCP Server 能力
支持更多数据库
当前实现只支持MySQL,但扩展到其他数据库并不困难:
// 数据库适配器模式class DatabaseAdapter { static create(type, config) { switch(type) { case 'mysql': return new MySQLAdapter(config); case 'postgresql': return new PostgreSQLAdapter(config); case 'mongodb': return new MongoDBAdapter(config); } }}
添加缓存机制
对于频繁查询的数据,可以添加缓存:
const Redis = require('redis');const client = Redis.createClient();async executeQuery(sql, params) { const cacheKey = `query:${crypto.createHash('md5').update(sql + JSON.stringify(params)).digest('hex')}`; // 先查缓存 const cached = await client.get(cacheKey); if (cached) { return JSON.parse(cached); } // 执行查询并缓存结果 const result = await this.pool.execute(sql, params); await client.setex(cacheKey, 300, JSON.stringify(result)); // 缓存5分钟 return result;}
权限控制
生产环境中,权限控制是必不可少的:
class PermissionManager { checkPermission(user, operation, table) { const permissions = this.getUserPermissions(user); return permissions.includes(`${operation}:${table}`); }}
实战经验总结
经过这次开发,我有几点体会想和大家分享:
1. 安全永远是第一位的
数据库操作涉及敏感数据,安全防护必须做到位。宁可功能简单一些,也不能在安全上妥协。我在项目中实现了多层防护,虽然增加了一些复杂度,但换来的是使用时的安心。
2. 用户体验很重要
技术实现只是基础,用户体验才是关键。比如错误信息要友好易懂,执行时间要及时反馈,查询结果要格式化展示。这些细节决定了工具是否真正好用。
3. 文档和示例不能少
一个好的工具需要配套好的文档。我在代码中加了大量注释,也提供了完整的使用示例。这不仅帮助别人理解,也方便自己后续维护。
写在最后
开发这个MCP Server的过程很有趣,也很有成就感。看着AI助手能够理解自然语言需求,自动生成和执行SQL,然后给出分析结果,真的有种科幻电影成为现实的感觉。
最后想说的是,MCP协议为AI应用的扩展性提供了无限可能。数据库操作只是一个开始,我们还可以开发文件处理、API调用、图像生成等各种工具。
项目地址: github.com/TickHaiJun/…
配置好数据库,可直接通过语义化操作你的数据库数据