掘金 人工智能 05月16日 11:03
【RAG 实战】用 StarRocks + DeepSeek 构建智能问答与企业知识库
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍了RAG(检索增强生成)技术,阐述了其在提升大语言模型准确性和相关性的作用。文章重点探讨了DeepSeek和StarRocks的结合,构建智能问答系统、企业私有知识库助手以及实时数据分析与报告生成等应用场景,并提供了操作演示,展示了RAG流程的实现细节,以及如何通过RAG机制优化回答质量。

💡 RAG技术的核心在于结合信息检索与生成式大语言模型,通过从外部知识库检索相关信息,并将检索结果作为上下文输入LLM,从而提升生成内容的准确性和相关性,解决大模型的知识局限性问题。

✨ 结合DeepSeek和StarRocks,可以构建多种RAG应用场景,例如针对数据库技术文档的智能问答系统、支持企业内部文档的实时问答的企业私有知识库助手,以及结合StarRocks查询结果与生成模型的多模态报告生成。

⚙️ 实现RAG流程的关键步骤包括:数据预处理、向量化与索引构建、查询检索与优化、上下文整合和大模型生成。通过Ollama部署DeepSeek,并利用StarRocks的高效向量检索能力,可以构建基于真实信息的回答,降低“幻觉”风险。

🛠️ 操作演示部分详细介绍了环境准备、数据向量化、知识存储、知识提取和RAG增强等流程,并提供了代码示例,帮助用户理解RAG在实际应用中的具体实现方式。

一、RAG 和向量索引简介

RAG(Retrieval-Augmented Generation)是一种结合信息检索与生成式大语言模型(LLM)的技术框架,通过从外部知识库检索相关信息,并将检索结果作为上下文输入LLM,从而提升生成内容的准确性和相关性。其核心目标是解决大模型的知识局限性(如幻觉、时效性差、私有知识缺失等问题。

1、 RAG 的核心流程

数据准备与处理:将原始文本分割为语义连贯的块(如512字符的段落),并采用滑动窗口或递归分割等技术优化信息冗余和粒度17。例如,文本可能被切分为子块(子文档)和父块(父文档),以支持多级检索。

向量化与索引构建:使用嵌入模型(如BGE、E5)将文本块转换为向量,存储至向量数据库(如Milvus、Chroma)。索引方式包括平面索引、分层索引,或结合稀疏向量(关键词检索)与稠密向量(语义检索)的混合搜索。

查询检索与优化:用户查询被转换为向量后,通过余弦相似度或混合搜索策略从数据库中召回相关文本。检索优化技术包括查询重写(如HyDE生成假设文档)、多路召回(结合语义与关键词检索)、重排序(使用交叉编码器精排)456。

上下文整合: 将检索结果与用户查询合并为增强提示(Prompt),例如模板化拼接或动态压缩去噪,确保输入在LLM的上下文窗口限制内。

大模型生成: LLM(如GPT、DeepSeek)基于增强后的上下文生成最终答案。生成过程需平衡检索内容与模型推理能力,避免过度依赖外部知识或复述噪声

上图展示了 RAG 的标准流程。首先,图片、文档、视频和音频等数据经过预处理,转换为 Embedding 并存入向量数据库。Embedding 通常是高维 float 数组,借助向量索引(如 HNSW、IVF)进行相似性搜索,加速高效检索。

向量索引通过近似最近邻(ANN)算法优化查询效率,减少高维计算负担。语义搜索匹配用户问题与知识库中的相关内容,使回答基于真实信息,从而降低大模型的“幻觉”风险,提升回答的自然性和可靠性。

二、 StarRocks + DeepSeek 的典型 RAG 应用场景

DeepSeek 负责生成高质量 Embedding 和回答,StarRocks 提供实时高效的向量检索,二者结合可构建更智能、更精准的 AI 解决方案。

1. 数据库文档问答系统

结合StarRocks支持的向量索引(如ANN、HNSW)与DeepSeek的生成能力,可构建针对数据库技术文档的智能问答系统。例如,用户提问“StarRocks 3.4.0版本支持哪些向量索引?”,系统通过检索官方文档片段并生成结构化答案38。

2. 企业私有知识库助手

在数据安全要求高的场景下,DeepSeek本地化部署(如Ollama框架)与StarRocks的高效向量检索能力结合,支持企业内部文档的实时问答,避免云端数据泄露风险389。

3. 实时数据分析与报告生成

StarRocks作为分析型数据库,可存储结构化业务数据,通过RAG流程将查询结果(如销售趋势)与生成模型结合,自动生成包含图表解读的多模态报告

三、操作演示

1、 系统组成

2、实现流程

步骤负责组件具体实现
1. 环境准备OllamaStarRocks用 Ollama 在本地机器上便捷地部署和运行大型语言模型
2. 数据向量化DeepSeek-Embedding文本 → 3584 维向量
3. 存储向量StarRocks创建表,存入向量
4. 近似最近邻搜索StarRocks 向量索引IVFPQ / HNSW 检索
5. 检索增强模拟 RAG 逻辑结合检索数据
6. 生成答案DeepSeek LLM生成基于真实数据的回答

3、 环境准备

3.1 DeepSeek 本地部署

Tips: 以下内容使用的是 macbook 进行 demo 演示

3.1.1 使用 ollama 安装本地模型

在本地部署 DeepSeek 时,Ollama 主要起到模型管理和提供推理接口的作用,支持运行多个不同的 LLM,并允许用户在本地切换和管理不同的模型。

# 该命令会自动下载并加载模型ollama run deepseek-r1:7b

Tips: 如果想使用云端 LLM(如 DeepSeek 的官方 API),需要获取并填写 API Key

访问 DeepSeek 官网(platform.deepseek.com)后注册账号并登录;在仪表盘中创建 API Key(通常在 “API Keys” 或 “Developer” 部分),复制生成的密钥(如 sk-xxxxxxxxxxxxxxxx)。

3.1.2 Deepseek 初步使用

启动 deepseek

执行 ollama run deepseek-r1:7b 直接进入交互模式
3.1.3 Deepseek 性能优化

直接在命令行设置参数:(参数单次生效)

OLLAMA_GPU_LAYERS=35 \OLLAMA_CPU_THREADS=6 \OLLAMA_BATCH_SIZE=128 \OLLAMA_CONTEXT_SIZE=4096 \ollama run deepseek-r1:7b
3.1.4 DeepSeek 使用

显而易见:直接使用 deepseek 进行问答,返回的答案是不符合预期的,需要对知识进行修正

3.2 StarRocks 准备工作
3.2.1 集群部署

版本需求:3.4 及以上

3.2.2 配置设置

打开 vector index

ADMIN SET FRONTEND CONFIG ("enable_experimental_vector" = "true");
3.2.3 建库建表

建库:

create database knowledge_base;

建表:存储知识库向量

CREATE TABLE enterprise_knowledge (    id  BIGINT AUTO_INCREMENT,    content TEXT NOT NULL,    embedding ARRAY<FLOAT> NOT NULL,    INDEX vec_idx (embedding) USING VECTOR (        "index_type" = "hnsw",        "dim" = "3584",        "metric_type" = "l2_distance",        "M" = "16",        "efconstruction" = "40"    )) ENGINE=OLAPPRIMARY KEY(id)DISTRIBUTED BY HASH(id) BUCKETS 1PROPERTIES (    "replication_num" = "1"    );

Tips: DeepSeek 的 deepseek-r1:7b 模型(7B 参数版本)默认生成高维嵌入向量,通常是 3584 维

4、 将文本转成向量

测试通过 deepseek 将文本转为 3584 维向量

curl -X POST http://localhost:11434/api/embeddings -d '{"model": "deepseek-r1:7b", "prompt": "产品保修期是一年。"}'

下面将转化的向量数据保存在 StarRocks 中

5、 知识存储(存储向量到 StarRocks)

import pymysqlimport requestsdef get_embedding(text):    url = "http://localhost:11434/api/embeddings"    payload = {"model": "deepseek-r1:7b", "prompt": text}    response = requests.post(url, json=payload)    response.raise_for_status()    return response.json()["embedding"]try:    content = "StarRocks 的愿景是能够让用户的数据分析变得更加简单和敏捷。"    embedding = get_embedding(content)    # 将 Python 列表转换为 StarRocks 的数组格式    embedding_str = "[" + ",".join(map(str, embedding)) + "]"# 例如:[0.1,0.2,0.3]    conn = pymysql.connect(        host='X.X.X.X',        port=9030,        user='root',        password='sr123456',        database='knowledge_base'    )    cursor = conn.cursor()    # 使用格式化的数组字符串    sql = "INSERT INTO enterprise_knowledge (content, embedding) VALUES (%s, %s)"    cursor.execute(sql, (content, embedding_str))    conn.commit()    print(f"Inserted: {content} with embedding {embedding[:5]}...")except requests.RequestException as e:    print(f"Embedding API error: {e}")except pymysql.Error as db_err:    print(f"Database error: {db_err}")finally:    if'cursor'in locals():        cursor.close()    if'conn'in locals():        conn.close()

操作演示

6、知识提取

import pymysqlimport requests# 获取嵌入向量的函数def get_embedding(text):    url = "http://localhost:11434/api/embeddings"    payload = {"model": "deepseek-r1:7b", "prompt": text}    response = requests.post(url, json=payload)    response.raise_for_status()    return response.json()["embedding"]# 从 StarRocks 查询相似内容的函数def search_knowledge_base(query_embedding):    try:        conn = pymysql.connect(            host='39.98.110.249',            port=9030,            user='root',            password='sr123456',            database='knowledge_base'        )        cursor = conn.cursor()        # 将查询向量转换为 StarRocks 的数组格式        embedding_str = "[" + ",".join(map(str, query_embedding)) + "]"        # 使用 L2 距离搜索最相似的记录        sql = """        SELECT content, l2_distance(embedding, %s) AS distance        FROM enterprise_knowledge        ORDER BY distance ASC        LIMIT 1        """        cursor.execute(sql, (embedding_str,))        result = cursor.fetchone()        if result:            return result[0]  # 返回最匹配的 content        else:            return"未找到相关信息。"    except pymysql.Error as db_err:        print(f"Database error: {db_err}")        return"查询失败。"    finally:        if'cursor'in locals():            cursor.close()        if'conn'in locals():            conn.close()# 主流程try:    query = "StarRocks 的愿景是什么?"    query_embedding = get_embedding(query)  # 将查询转化为向量    answer = search_knowledge_base(query_embedding)  # 从知识库检索答案    print(f"问题: {query}")    print(f"回答: {answer}")except requests.RequestException as e:    print(f"Embedding API error: {e}")except Exception as e:    print(f"Error: {e}")

执行效果

补充说明:到目前为止的流程仅依赖 StarRocks 进行向量检索,未利用 DeepSeek LLM 进行生成,导致回答生硬且缺乏上下文整合,影响自然性和准确性。为提升效果,应引入 RAG 机制,使检索结果与生成模型深度融合,从而优化回答质量并减少幻觉问题。

7、 加入 RAG 增强

7.1 将查询知识库的结果,返回给 DeepSeek LLM ,优化回答质量
# 构造 RAG Promptdef build_rag_prompt(query, retrieved_content):    prompt = f"""    [系统指令] 你是企业智能客服,基于以下知识回答用户问题:    [知识上下文] {retrieved_content}    [用户问题] {query}    """    return prompt# 调用 DeepSeek 生成回答def generate_answer(prompt):    url = "http://localhost:11434/api/generate"    payload = {"model": "deepseek-r1:7b", "prompt": prompt}    try:        response = requests.post(url, json=payload)        response.raise_for_status()        full_response = ""        for line in response.text.splitlines():            if line.strip():  # 过滤空行                try:                    json_obj = json.loads(line)                    if"response"in json_obj:                        full_response += json_obj["response"]  # 只提取答案                    if json_obj.get("done", False):                        break                except json.JSONDecodeError as e:                    print(f"JSON 解析错误: {e}, line: {line}")        return clean_response(full_response.strip())  # 处理并去掉 <think>XXX</think>    except requests.exceptions.RequestException as e:        print(f"请求失败: {e}")        return"生成失败。"
7.2 创建 RAG 过程表:

用于记录用户问题、检索结果和生成回答,保存上下文,方便进行长对话,至于长对话,用户可自行探索。

customer_service_log 表建表语句如下:

CREATE TABLE customer_service_log (    id BIGINT AUTO_INCREMENT,    user_id VARCHAR(50),    question TEXT NOT NULL,    question_embedding ARRAY<FLOAT> NOT NULL,    retrieved_content TEXT,    generated_answer TEXT,    timestamp DATETIME NOT NULL,    feedback TINYINT DEFAULT NULL) ENGINE=OLAPPRIMARY KEY(id)DISTRIBUTED BY HASH(id) BUCKETS 1PROPERTIES (    "replication_num" = "1");

8、优化后的版本

8.1 知识提取代码
8.1.1 知识提取
import pymysqlimport requestsimport jsonfrom datetime import datetimeimport loggingimport re# 获取嵌入向量def get_embedding(text):    url = "http://localhost:11434/api/embeddings"    payload = {"model": "deepseek-r1:7b", "prompt": text,"stream": "true"}    response = requests.post(url, json=payload)    response.raise_for_status()    return response.json()["embedding"]# 从 StarRocks 检索知识def search_knowledge_base(query_embedding):    try:        conn = pymysql.connect(            host='X.X.X.X',            port=9030,            user='root',            password='sr123456',            database='knowledge_base'        )        cursor = conn.cursor()        embedding_str = "[" + ",".join(map(str, query_embedding)) + "]"        sql = """        SELECT content, l2_distance(embedding, %s) AS distance        FROM enterprise_knowledge        ORDER BY distance ASC        LIMIT 3        """        cursor.execute(sql, (embedding_str,))        results=cursor.fetchall()        content=""        for result in results:            content+=result[0]        return  content    except pymysql.Error as db_err:        print(f"Database error: {db_err}")        return"查询失败。"    finally:        cursor.close()        conn.close()def build_rag_prompt(query, retrieved_content):    prompt = f"""    [系统指令] 你是企业智能客服,基于以下知识回答用户问题:    [知识上下文] {retrieved_content}    [用户问题] {query}    """    return prompt# 调用 DeepSeek 生成回答def generate_answer(prompt):    url = "http://localhost:11434/api/generate"    payload = {"model": "deepseek-r1:7b", "prompt": prompt}    try:        response = requests.post(url, json=payload)        response.raise_for_status()        full_response = ""        for line in response.text.splitlines():            if line.strip():  # 过滤空行                try:                    json_obj = json.loads(line)                    if"response"in json_obj:                        full_response += json_obj["response"]  # 只提取答案                    if json_obj.get("done", False):                        break                except json.JSONDecodeError as e:                    print(f"JSON 解析错误: {e}, line: {line}")        return clean_response(full_response.strip())  # 处理并去掉 <think>XXX</think>    except requests.exceptions.RequestException as e:        print(f"请求失败: {e}")        return"生成失败。"# 记录对话日志def log_conversation(user_id, question, question_embedding, retrieved_content, generated_answer):    try:        conn = pymysql.connect(            host='X.X.X.X',            port=9030,            user='root',            password='sr123456',            database='knowledge_base'        )        cursor = conn.cursor()        embedding_str = "[" + ",".join(map(str, question_embedding)) + "]"        sql = """        INSERT INTO customer_service_log (user_id, question, question_embedding, retrieved_content, generated_answer, timestamp)        VALUES (%s, %s, %s, %s, %s, NOW())        """        cursor.execute(sql, (user_id, question, embedding_str, retrieved_content, generated_answer))        conn.commit()    except pymysql.Error as db_err:        print(f"Database error: {db_err}")    finally:        cursor.close()        conn.close()def clean_response(text):    # 去掉所有 <think>xxx</think> 结构    return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip()# 主流程def rag_pipeline(user_id, query):    try:        logging.info(f"开始处理查询: {query}")        query_embedding = get_embedding(query)        logging.info("获取嵌入向量成功")        retrieved_content = search_knowledge_base(query_embedding)        logging.info(f"检索到内容: {retrieved_content[:50]}...")  # 只展示前50字符        prompt = build_rag_prompt(query, retrieved_content)        generated_answer = generate_answer(prompt)        logging.info(f"生成回答: {generated_answer[:50]}...")        log_conversation(user_id, query, query_embedding, retrieved_content, generated_answer)        logging.info("日志记录完成")        return generated_answer    except Exception as e:        logging.error(f"发生错误: {e}", exc_info=True)        return"处理失败。"# 测试if __name__ == '__main__':    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")    user_id = "user123"    query = "StarRocks 的愿景是什么?"    answer = rag_pipeline(user_id, query)    print(f"问题: {query}")    print(f"回答: {answer}")
8.1.2 操作演示

总结一下 RAG 增强后的执行流程:

8.2 加上 web 可视化界面
<!DOCTYPE html><html lang="zh"><head>    <meta charset="UTF-8">    <meta name="viewport" content="width=device-width, initial-scale=1.0">    <title>智能问答客服系统</title>    <script>        async functionaskQuestion() {            let question = document.getElementById("question").value;            let response = await fetch("/ask", {                method: "POST",                headers: {                    "Content-Type": "application/json"                },                body: JSON.stringify({ question: question })            });            let data = await response.json();            document.getElementById("answer").innerText = data.answer;        }    </script></head><body>    <h1>智能问答客服系统</h1>    <input type="text" id="question" placeholder="请输入您的问题">    <button onclick="askQuestion()">提问</button>    <p id="answer"></p></body></html>
8.3 完整问答后台服务代码
8.3.1 代码结构如下

8.3.2 知识存储代码
import pymysqlimport requestsdef get_embedding(text):    url = "http://localhost:11434/api/embeddings"    payload = {"model": "deepseek-r1:7b", "prompt": text}    response = requests.post(url, json=payload)    response.raise_for_status()    return response.json()["embedding"]try:    content = "StarRocks 的愿景是能够让用户的数据分析变得更加简单和敏捷。"    embedding = get_embedding(content)    # 将 Python 列表转换为 StarRocks 的数组格式    embedding_str = "[" + ",".join(map(str, embedding)) + "]"# 例如:[0.1,0.2,0.3]    conn = pymysql.connect(        host='X.X.X.X',        port=9030,        user='root',        password='sr123456',        database='knowledge_base'    )    cursor = conn.cursor()    # 使用格式化的数组字符串    sql = "INSERT INTO enterprise_knowledge (content, embedding) VALUES (%s, %s)"    cursor.execute(sql, (content, embedding_str))    conn.commit()    print(f"Inserted: {content} with embedding {embedding[:5]}...")except requests.RequestException as e:    print(f"Embedding API error: {e}")except pymysql.Error as db_err:    print(f"Database error: {db_err}")finally:    if'cursor'in locals():        cursor.close()    if'conn'in locals():        conn.close()
8.3.3 知识提取
import pymysqlimport requestsimport jsonimport loggingimport refrom flask import Flask, request, jsonify, render_templateapp = Flask(__name__)# 配置日志logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")# 获取嵌入向量def get_embedding(text):    url = "http://localhost:11434/api/embeddings"    payload = {"model": "deepseek-r1:7b", "prompt": text, "stream": "true"}    response = requests.post(url, json=payload)    response.raise_for_status()    return response.json()["embedding"]# 从 StarRocks 检索知识def search_knowledge_base(query_embedding):    try:        conn = pymysql.connect(            host='X.X.X.X',            port=9030,            user='root',            password='sr123456',            database='knowledge_base'        )        cursor = conn.cursor()        embedding_str = "[" + ",".join(map(str, query_embedding)) + "]"        sql = """        SELECT content, l2_distance(embedding, %s) AS distance        FROM enterprise_knowledge        ORDER BY distance ASC        LIMIT 3        """        cursor.execute(sql, (embedding_str,))        results=cursor.fetchall()        content=""        for result in results:            content+=result[0]        # result = cursor.fetchone()        return  content    except pymysql.Error as db_err:        print(f"Database error: {db_err}")        return"查询失败。"    finally:        cursor.close()        conn.close()# 构造 RAG Promptdef build_rag_prompt(query, retrieved_content):    return f"""    [系统指令] 你是企业智能客服,基于以下知识回答用户问题:    [知识上下文] {retrieved_content}    [用户问题] {query}    """# 调用 DeepSeek 生成回答def generate_answer(prompt):    url = "http://localhost:11434/api/generate"    payload = {"model": "deepseek-r1:7b", "prompt": prompt}    try:        response = requests.post(url, json=payload)        response.raise_for_status()        full_response = ""        for line in response.text.splitlines():            if line.strip():                try:                    json_obj = json.loads(line)                    if"response"in json_obj:                        full_response += json_obj["response"]                    if json_obj.get("done", False):                        break                except json.JSONDecodeError as e:                    logging.warning(f"JSON 解析错误: {e}, line: {line}")        return clean_response(full_response.strip())  # 处理并去掉 <think>XXX</think>    except requests.exceptions.RequestException as e:        logging.error(f"请求失败: {e}")        return"生成失败。"# 记录对话日志def log_conversation(user_id, question, question_embedding, retrieved_content, generated_answer):    try:        conn = pymysql.connect(            host='X.X.X.X',            port=9030,            user='root',            password='sr123456',            database='knowledge_base'        )        cursor = conn.cursor()        embedding_str = "[" + ",".join(map(str, question_embedding)) + "]"        sql = """        INSERT INTO customer_service_log (user_id, question, question_embedding, retrieved_content, generated_answer, timestamp)        VALUES (%s, %s, %s, %s, %s, NOW())        """        cursor.execute(sql, (user_id, question, embedding_str, retrieved_content, generated_answer))        conn.commit()    except pymysql.Error as db_err:        logging.error(f"数据库错误: {db_err}")    finally:        cursor.close()        conn.close()# 清理回答内容,去掉 <think>XXX</think>def clean_response(text):    return re.sub(r"<think>.*?</think>", "", text, flags=re.DOTALL).strip()# RAG 处理流程def rag_pipeline(user_id,query):    try:        logging.info(f"开始处理查询: {query}")        query_embedding = get_embedding(query)        logging.info("获取嵌入向量成功")        retrieved_content = search_knowledge_base(query_embedding)        logging.info(f"检索到内容: {retrieved_content[:50]}...")  # 只展示前50字符                prompt = build_rag_prompt(query, retrieved_content)        generated_answer = generate_answer(prompt)        logging.info(f"生成回答: {generated_answer[:50]}...")        log_conversation(user_id, query, query_embedding, retrieved_content, generated_answer)        logging.info("日志记录完成")        return generated_answer    except Exception as e:        logging.error(f"发生错误: {e}", exc_info=True)        return"处理失败。"# Flask API@app.route("/")def index():    return render_template("index.html")  # 渲染前端页面@app.route("/ask", methods=["POST"])def ask():    user_id="sr_01"    data = request.json    question = data.get("question", "")    result=rag_pipeline(user_id,question)    answer = f"问题:{question}。\n 回答:{result}"    return jsonify({"answer": answer})if __name__ == "__main__":    user_id = "sr"    app.run(host="0.0.0.0", port=9033, debug=True)
8.3.4效果演示

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

RAG DeepSeek StarRocks 向量检索 大语言模型
相关文章