掘金 人工智能 06月18日 10:22
LangChain + MCP + vLLM + Qwen3-32B 构建本地私有化智能体应用
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍了一种基于LangChain、vLLM、Qwen3-32B和MCP技术栈构建的数据库问答智能体。通过私有化部署Qwen3-32B模型,并结合MCP Server,实现了对数据库的智能查询和交互。文章详细阐述了vLLM的部署、MCP Server的搭建以及LangChain智能体的构建过程,并提供了关键参数说明和测试结果,旨在帮助读者理解和实践该技术方案。

💡文章首先介绍了使用LangChain、vLLM、Qwen3-32B和MCP技术栈构建数据库问答应用的技术方案,并展示了整体流程图和实现效果。

💾 随后,文章详细阐述了如何使用modelscope下载Qwen3-32B模型,并通过vLLM启动API服务,包括关键参数的设置,如CUDA_VISIBLE_DEVICES、dtype、tensor-parallel-size等,以及显存占用情况的说明。

🛠️ 接着,文章构建了DB MCP Server,定义了三个MCP Tool:获取表名、获取Schema和执行SQL,并提供了相应的代码实现。MCP Server使用SSE模式进行交互。

🤖 最后,文章介绍了如何使用Langchain构建MCP Client Agent智能体,实现了与MCP Server的交互,并通过测试展示了智能体在数据库查询中的应用,包括提问和结果展示。

 

一、私有化智能体应用

在本专栏的前面文章基于Spring AI MCP实现了本地 ChatBI 问答应用,本文还是依据该场景,采用 LangChain + vLLM + Qwen3-32B + MCP 技术栈构建该流程,整体过程如下图所示:

实现效果如下所示:

关于 MySQL 表结构的创建,可以参考下面这篇文章:

Spring AI MCP Server + Cline 快速搭建一个数据库 ChatBi 助手

小毕超,公众号:狂热JAVA小毕超Spring AI MCP Server + Cline 快速搭建一个数据库 ChatBi 助手

实验所使用依赖的版本如下:

torch==2.6.0transformers==4.51.3modelscope==1.23.1vllm==0.8.4mcp==1.9.2openai==1.75.0langchain==0.3.25langchain-openai==0.3.18langgraph==0.4.7pymysql==1.0.3

二、vLLM 部署 Qwen3-32B

使用 modelscope 下载 Qwen3-32B 模型到本地:

modelscope download --model="Qwen/Qwen3-32B" --local_dir Qwen3-32B

vLLM 读取模型启动API服务。

export CUDA_VISIBLE_DEVICES=0,1vllm serve "Qwen3-32B"--host 0.0.0.0 \  --port 8060 \  --dtype bfloat16 \  --tensor-parallel-size 2 \  --cpu-offload-gb 0 \  --gpu-memory-utilization 0.8 \  --max-model-len 8126 \  --api-key token-abc123 \  --enable-prefix-caching \  --enable-reasoning \  --reasoning-parser deepseek_r1\  --enable-auto-tool-choice \  --tool-call-parser hermes \  --trust-remote-code

关键参数说明:

显存占用情况:

如果启动显存不足,可适当调整 gpu-memory-utilization 和 max-model-len 参数,或通过 cpu-offload-gb 将部分模型权重卸载到内存中。

启动成功后,可通过 /v1/models 接口可查看模型列表:

curl http://127.0.0.1:8060/v1/models -H "Authorization: Bearer token-abc123"

测试API交互,思考模式:

curl http://127.0.0.1:8060/v1/chat/completions \    -H "Content-Type: application/json" \    -H "Authorization: Bearer token-abc123" \    -d '{        "model": "Qwen3-32B",        "messages": [            {"role": "system", "content": "You are a helpful assistant."},            {"role": "user", "content": "你是谁"}        ]    }'

非思考模式测试:

curl http://127.0.0.1:8060/v1/chat/completions \    -H "Content-Type: application/json" \    -H "Authorization: Bearer token-abc123" \    -d '{        "model": "Qwen3-32B",        "messages": [            {"role": "system", "content": "You are a helpful assistant."},            {"role": "user", "content": "你是谁/no_think"}        ]    }'

三、构建DB MCP Server

在 MCP Server 端,依据上面图片的规划,包括三个 MCP Tool ,分别是 获取所有可用的表名:get_all_tables根据表名获取:Schema get_table_schema执行SQL:run_sql ,交互协议选择 SSE 模式。

首先实现数据库操作,这里仅仅做了数据库的交互,实际使用你应考虑很多性能细节的优化:

utils_db.py

import pymysqldefget_conn():    return pymysql.connect(        host="127.0.0.1",        port=3306,        database="langchain",        user="root",        password="root",        autocommit=True    )defquery(sql):    conn = get_conn()    cursor = conn.cursor()    cursor.execute(sql)    columns = [column[0] for column in cursor.description]    res = list()    for row in cursor.fetchall():        res.append(dict(zip(columns, row)))    cursor.close()    conn.close()    return res

db_mcp_server.py

import jsonfrom mcp.server.fastmcp import FastMCPimport utils_dbmcp = FastMCP("DB Mcp Server")all_tables_sql = "SELECT TABLE_NAME, TABLE_COMMENT FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'langchain'"schema_sql = """SELECT COLUMN_NAME, DATA_TYPE, COLUMN_COMMENT FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 'langchain' AND TABLE_NAME = '{table}'"""@mcp.tool()defget_all_tables() -> str:    """获取所有可用的表名"""    return json.dumps(utils_db.query(all_tables_sql), ensure_ascii=False)@mcp.tool()defget_table_schema(table_names: list[str]) -> str:    """根据表名获取Schema"""    table_schema = []    for table in table_names:        schemas = utils_db.query(schema_sql.format(table=table))        schemas = ", \n".join([f"{s['COLUMN_NAME']} {s['DATA_TYPE']} COMMENT {s['COLUMN_COMMENT']}"for s in schemas])        table_schema.append(f"{table} ({schemas})")    return"\n\n".join(table_schema)@mcp.tool()defrun_sql(sql: str) -> str:    """执行SQL查询数据,一次仅能执行一句SQL!"""    try:        return json.dumps(utils_db.query(sql), ensure_ascii=False)    except Exception as e:        returnf"执行SQL错误:{str(e)} ,请修正后重新发起。"if __name__ == "__main__":    mcp.settings.port = 6030    mcp.run("sse")

启动 MCP Server 服务:

四、Langchain 构建 MCP Client Agent 智能体

官方关于 MCP 的集成介绍文档:

langchain-ai.github.io/langgraph/a…

实现过程:

import os, configos.environ["OPENAI_BASE_URL"] = "http://127.0.0.1:8060/v1"os.environ["OPENAI_API_KEY"] = "token-abc123"from langchain_mcp_adapters.client import MultiServerMCPClientfrom langgraph.prebuilt import create_react_agentfrom langgraph.checkpoint.memory import InMemorySaverimport asynciofrom colorama import Fore, Style, initasyncdefmain():    client = MultiServerMCPClient(        {            "db": {                "url""http://127.0.0.1:6030/sse",                "transport""sse",            }        }    )    tools = await client.get_tools()    checkpointer = InMemorySaver()    agent = create_react_agent(        "openai:Qwen3-32B",        tools,        checkpointer=checkpointer    )    config = {        "configurable": {            "thread_id""1"        }    }    whileTrue:        question = input("请输入问题:")        ifnot question:            continue        if question == "q":            break        asyncfor chunk in agent.astream(                {                    "messages": [                        {                            "role""user",                            "content": question                        }                    ]                },                config=config,                stream_mode="updates"        ):            if"agent"in chunk:                content = chunk["agent"]["messages"][0].content                tool_calls = chunk["agent"]["messages"][0].tool_calls                if tool_calls:                    for tool in tool_calls:                        print(Fore.YELLOW, Style.BRIGHT, f">>> Call MCP Server: {tool['name']} , args: {tool['args']}")                else:                    print(Fore.BLACK, Style.BRIGHT, f"LLM: {content}")            elif"tools"in chunk:                content = chunk["tools"]["messages"][0].content                name = chunk["tools"]["messages"][0].name                print(Fore.GREEN, Style.BRIGHT, f"<<< {name} : {content}")if __name__ == '__main__':    asyncio.run(main())

运行智能体,开始测试。

四、智能体问答测试

提问:当前的用户数,以及工作组清单

提问:工作组 A 下的人,都有哪些角色

可以看到执行过程,当发现错误后,能够及时的纠正,进而得到正确的结果:

提问:role1 下的有哪些人

 

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

LangChain vLLM Qwen3-32B MCP 数据库智能体
相关文章