基于 NestJS 和 Elasticsearch 构建高性能 RAG 知识库系统
引言
随着人工智能的飞速发展,特别是大型语言模型(LLM)的普及,检索增强生成(Retrieval-Augmented Generation,RAG)作为一种结合外部知识的 AI 应用构建方式,正在成为企业构建智能知识库的首选技术。RAG 通过在生成回答前先检索相关文档,有效解决了 LLM 的知识时效性、专业领域知识缺乏和幻觉等问题。
本文将分享一个基于 NestJS 和 Elasticsearch 构建的完整 RAG 知识库系统,包括后端架构设计、核心组件实现、前端流式交互等内容。这个系统可以帮助企业快速构建自己的智能知识库,支持多种文档格式的处理和检索。
技术栈
- 后端框架:NestJS + TypeScript向量数据库:Elasticsearch嵌入模型:阿里的 text-Embeddings-v2大语言模型:DeepSeek-V3前端框架:Vue 3 + TypeScript流式响应:Fetch Event Source
系统架构
这个 RAG 系统主要由以下几个核心模块组成:
- 文档处理器:负责处理上传的各种格式文档(PDF、TXT、CSV、Excel 等)嵌入服务:生成文档片段的向量表示索引服务:将文档及其向量存入 Elasticsearch检索服务:基于语义相似度检索相关文档片段生成服务:利用检索到的信息通过 LLM 生成回答流式响应:支持实时输出 AI 回答
接下来,我们将详细介绍每个模块的实现。
核心模块实现
1. 文档处理模块
文档处理是 RAG 的第一步,也是建立高质量知识库的基础。我们的系统支持处理多种格式的文档:
async processDocument( file: Express.Multer.File, metadata: Record<string, any>,): Promise<Document[]> { try { const fileExtension = path.extname(file.originalname).toLowerCase(); switch (fileExtension) { case '.txt': case '.md': return this.processTextFile(file, metadata); case '.pdf': return this.processPdfFile(file, metadata); case '.csv': return this.processCSVFile(file, metadata); case '.xlsx': case '.xls': return this.processExcelFile(file, metadata); default: this.logger.warn( `不支持的文件类型: ${fileExtension},尝试作为纯文本处理`, ); return this.processTextFile(file, metadata); } } catch (error) { // 错误处理... }}
对于文本文件,我们采用智能分段策略,避免内容过长或过短:
private async processTextFile( file: Express.Multer.File, metadata: Record<string, any>,): Promise<Document[]> { const content = await fs.readFile(file.path, 'utf-8'); // 改进的块分割策略 const paragraphs = content .split(/\n\s*\n|\r\n\s*\r\n/) .filter((p) => p.trim().length > 0) .map((p) => p.trim()); return paragraphs.map((paragraph, index) => ({ id: `${file.filename}-${index}`, content: paragraph, metadata: { source: file.originalname, filename: file.filename, type: 'text', createdAt: new Date(), description: metadata.description || '', }, }));}
2. 嵌入服务
为了实现语义搜索,我们需要将文本转换为向量。这里使用 OpenAI 的嵌入 API:
async generateEmbedding(text: string): Promise<number[]> { try { const response = await this.openai.embeddings.create({ model: this.embeddingModel, input: text, }); return response.data[0].embedding; } catch (error) { this.logger.error(`生成嵌入向量失败: ${error.message}`); throw error; }}// 批量生成嵌入,提高效率async generateBatchEmbeddings(texts: string[]): Promise<number[][]> { const batchSize = 20; const batches: string[][] = []; // 分批处理,避免请求过大 for (let i = 0; i < texts.length; i += batchSize) { const batch = texts.slice(i, i + batchSize); batches.push(batch); } const embeddings: number[][] = []; for (const batch of batches) { const response = await this.openai.embeddings.create({ model: this.embeddingModel, input: batch, }); const batchEmbeddings = response.data.map((item) => item.embedding); embeddings.push(...batchEmbeddings); } return embeddings;}
3. 索引服务
索引服务负责将文档和向量存储在 Elasticsearch 中:
async indexDocuments(documents: Document[]): Promise<boolean> { try { // 分批处理文档 const batchSize = 20; const batches: Document[][] = []; for (let i = 0; i < documents.length; i += batchSize) { const batch = documents.slice(i, i + batchSize); batches.push(batch); } // 处理每一批文档 for (const batch of batches) { // 提取每个文档的内容用于生成嵌入 const contents = batch.map((doc) => doc.content); // 批量生成嵌入向量 const embeddings = await this.embeddingService.generateBatchEmbeddings(contents); // 为每个文档添加嵌入向量 for (let i = 0; i < batch.length; i++) { batch[i].metadata.embedding = embeddings[i]; } // 构建批量索引请求 const bulkBody = this.buildBulkRequestBody(batch); // 执行批量索引 await this.bulkIndex(bulkBody); } return true; } catch (error) { // 错误处理... }}
索引创建时,我们需要定义特殊的向量字段:
private async createIndex(indexUrl: string): Promise<void> { const indexConfig = { mappings: { properties: { content: { type: 'text', analyzer: 'standard' }, metadata: { properties: { // ...其他元数据字段... embedding: { type: 'dense_vector', dims: 1536, // 默认维度为1536 index: true, // 启用向量搜索 similarity: 'cosine', // 使用余弦相似度 }, }, }, }, }, settings: { number_of_shards: 1, number_of_replicas: 0, }, }; // 创建索引...}
4. 检索服务
检索服务是 RAG 的核心,它结合了文本匹配和向量相似度搜索:
async retrieveDocuments( query: string, filters?: string[], limit: number = 5,): Promise<Document[]> { try { // 1. 为查询生成嵌入向量 const queryEmbedding = await this.embeddingService.generateEmbedding(query); // 2. 构建混合查询 const esQuery: EsQuery = { size: limit, query: { script_score: { query: { bool: { must: [{ match: { content: query } }], }, }, script: { source: "cosineSimilarity(params.query_vector, 'metadata.embedding') + 1.0", params: { query_vector: queryEmbedding }, }, }, }, }; // 3. 执行查询 const results = await this.customElasticsearchQuery( this.INDEX_NAME, esQuery, ); } catch (error) { // 错误处理... }}
这个查询组合了文本相关性(match
)和向量相似度(cosineSimilarity
),可以找到语义相关的文档。
5. 生成服务
生成服务负责将用户查询和检索到的文档发送给 LLM 以生成回答:
async generateAnswer(query: string, documents: Document[]): Promise<string> { try { // 1. 准备上下文信息 const context = this.prepareContext(documents); // 2. 构建提示词 const prompt = this.buildPrompt(query, context); // 3. 调用LLM生成回答 const completion = await this.openai.chat.completions.create({ model: 'deepseek-v3', messages: [ { role: 'system', content: `你是一个知识助手,请根据提供的相关文档回答用户的问题。你的回答应该:1. 基于提供的相关文档内容2. 精确、清晰、专业3. 如果相关文档中没有足够信息回答问题,请明确说明,不要编造内容4. 回答中应引用相关的来源文件名`, }, { role: 'user', content: prompt }, ], temperature: 0.3, // 保持较低的温度以获得更准确的回答 }); return completion.choices[0].message.content || '无法生成回答'; } catch (error) { // 错误处理... }}
6. 流式响应实现
为了提供更好的用户体验,我们实现了流式响应:
async queryStream(queryDto: RagQueryDto, response: Response): Promise<void> { try { response.setHeader('Content-Type', 'text/event-stream'); response.setHeader('Cache-Control', 'no-cache'); response.setHeader('Connection', 'keep-alive'); // 1. 检索相关文档 const relevantDocuments = await this.retrieverService.retrieveDocuments( queryDto.query, queryDto.filters, ); // 2. 生成流式回答 const stream = await this.generatorService.generateAnswerStream( queryDto.query, relevantDocuments, ); // 3. 处理流式响应 for await (const chunk of stream) { const content = chunk.choices[0]?.delta?.content || ''; if (content) { response.write(`data: ${JSON.stringify({ content })}\n\n`); } } // 发送结束标记 response.write('data: [DONE]\n\n'); response.end(); } catch (error) { // 错误处理... }}
前端实现
前端采用 Vue 3 构建,主要包括文档上传和问答交互两个模块。其中,问答部分使用 fetchEventSource
处理流式响应:
// 发送问题const sendQuery = async () => { const query = userInput.value.trim() if (!query || loading.value) return // 添加用户消息 messages.value.push({ role: 'user', content: query }) userInput.value = '' loading.value = true try { // 添加一个空的助手消息,用于流式填充 const assistantMessageIndex = messages.value.length messages.value.push({ role: 'assistant', content: '' }) // 使用 fetchEventSource 进行流式请求 fetchEventSource('/api/rag/query-stream', { method: 'POST', openWhenHidden: true, headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ query, filters: ['text'] }), onmessage(event) { if (event.data === ' [DONE]') { loading.value = false return } try { // 解析事件数据 const data = JSON.parse(event.data) if (data.content) { // 更新助手消息内容 messages.value[assistantMessageIndex] = { role: 'assistant', content: messages.value[assistantMessageIndex].content + data.content, } // 滚动到底部 scrollToBottom() } } catch (e) { console.error('解析事件数据出错:', e) } }, // 其他事件处理... }) } catch (error) { // 错误处理... }}
总结
基于 NestJS 和 Elasticsearch 构建的 RAG 知识库系统提供了一个完整的解决方案,用于构建企业级的智能问答应用。该系统具有以下优势:
- 多格式支持:处理各类文档格式,便于知识导入语义检索:结合文本匹配和向量搜索,提高检索准确性流式响应:提供类似 ChatGPT 的流式体验模块化设计:便于扩展和维护可扩展架构:支持不同的模型和数据源
通过这样的 RAG 系统,企业可以快速构建自己的知识库应用,提高信息检索效率,降低大语言模型的幻觉问题,实现更加准确的智能问答服务。
欢迎在评论区分享你对 RAG 系统的看法或疑问,我们一起探讨更多关于知识库构建的技术细节!