增强型RAG系统的查询转换
采用三种查询转换技术,以提高RAG系统中的检索性能,而无需依赖于像LangChain这样的专门库。通过修改用户查询,我们可以显著提高检索信息的相关性和全面性。
关键转换技术
1.查询重写:使查询更加具体和详细,以提高搜索精度。2.退步提示:生成更广泛的查询以检索有用的上下文信息。3.子查询分解:将复杂的查询分解成更简单的组件进行全面检索。
具体代码实现
查询变换相关函数
查询重写
def rewrite_query(original_query: str, model: str = LLM_MODEL) -> str: """ 查询重写:让查询更具体详细,提高检索精度。 """ print(f"[查询重写] 原始问题: {original_query}") system_prompt = "你是查询优化专家,请将用户问题改写得更具体详细,包含有助于检索的相关术语和细节。" user_prompt = f"""请将下列问题改写得更具体详细,补充有助于检索的相关术语和细节:\n\n原始问题:{original_query}\n\n改写后:""" try: response = Generation.call( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], api_key=ALI_API_KEY, result_format='message' ) if response.status_code == 200: rewritten = response.output.choices[0].message.content.strip() print(f"[查询重写] 改写结果: {rewritten}\n") return rewritten else: print(f"[查询重写] 失败: {response.message}") return original_query except Exception as e: print(f"[查询重写] 异常: {e}") return original_query
生成更宽泛的背景性问题
def generate_step_back_query(original_query: str, model: str = LLM_MODEL) -> str: """ Step-back提示:生成更宽泛的背景性问题。 """ print(f"[Step-back] 原始问题: {original_query}") system_prompt = "你是检索策略专家,请将具体问题泛化为更宽泛的背景性问题,用于获取相关背景信息。" user_prompt = f"""请将下列问题泛化为更宽泛的背景性问题:\n\n原始问题:{original_query}\n\n泛化后:""" try: response = Generation.call( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], api_key=ALI_API_KEY, result_format='message' ) if response.status_code == 200: step_back = response.output.choices[0].message.content.strip() print(f"[Step-back] 泛化结果: {step_back}\n") return step_back else: print(f"[Step-back] 失败: {response.message}") return original_query except Exception as e: print(f"[Step-back] 异常: {e}") return original_query
子查询分解
def decompose_query(original_query: str, num_subqueries: int = 4, model: str = LLM_MODEL) -> List[str]: """ 子查询分解:将复杂问题拆解为若干简单子问题。 """ print(f"[子查询分解] 原始问题: {original_query}") system_prompt = "你是复杂问题拆解专家,请将复杂问题拆解为若干简单子问题,每个子问题关注不同方面。" user_prompt = f"""请将下列复杂问题拆解为{num_subqueries}个子问题,每行一个:\n\n原始问题:{original_query}\n\n子问题:""" try: response = Generation.call( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], api_key=ALI_API_KEY, result_format='message' ) if response.status_code == 200: content = response.output.choices[0].message.content.strip() lines = content.split("\n") sub_queries = [] for line in lines: if line.strip() and any(line.strip().startswith(f"{i}.") for i in range(1, 10)): q = line.strip() q = q[q.find(".")+1:].strip() sub_queries.append(q) elif line.strip(): sub_queries.append(line.strip()) print(f"[子查询分解] 子问题: {sub_queries}\n") return sub_queries else: print(f"[子查询分解] 失败: {response.message}") return [original_query] except Exception as e: print(f"[子查询分解] 异常: {e}") return [original_query]
PDF文本处理与分块
PDF文件中提取全部文本
def extract_text_from_pdf(pdf_path: str) -> str: """ 从PDF文件中提取全部文本。 """ print(f"[PDF提取] 正在提取: {pdf_path}") with open(pdf_path, 'rb') as f: reader = PdfReader(f) text = "" for i, page in enumerate(reader.pages): page_text = page.extract_text() if page_text: text += page_text print(f" - 已提取第{i+1}页") print(f"[PDF提取] 完成,总长度: {len(text)} 字符\n") return text
文本分割为带重叠的块
def chunk_text(text: str, n: int = 1000, overlap: int = 200) -> List[str]: """ 将文本分割为带重叠的块。 """ print(f"[分块] 每块{n}字符,重叠{overlap}字符") chunks = [] for i in range(0, len(text), n - overlap): chunks.append(text[i:i + n]) print(f"[分块] 完成,共{len(chunks)}块\n") return chunks
向量生成与存储
阿里embedding模型批量生成文本向量
def create_embeddings(texts, model: str = EMBEDDING_MODEL) -> List[np.ndarray]: """ 用阿里embedding模型批量生成文本向量。 支持单条或多条文本。 """ if isinstance(texts, str): texts = [texts] print(f"[嵌入生成] 正在生成{len(texts)}条文本的向量...") try: response = TextEmbedding.call( model=model, input=texts, api_key=ALI_API_KEY ) if response.status_code == 200: embeddings = [np.array(item['embedding']) for item in response.output['embeddings']] print(f"[嵌入生成] 成功,返回{len(embeddings)}条向量\n") return embeddings if len(embeddings) > 1 else embeddings[0] else: print(f"[嵌入生成] 失败: {response.message}") return [np.zeros(1536)] * len(texts) except Exception as e: print(f"[嵌入生成] 异常: {e}") return [np.zeros(1536)] * len(texts)
简单的向量存储与检索类
class SimpleVectorStore: """ 简单的向量存储与检索类。 """ def __init__(self): self.vectors = [] self.texts = [] self.metadata = [] def add_item(self, text, embedding, metadata=None): self.vectors.append(np.array(embedding)) self.texts.append(text) self.metadata.append(metadata or {}) def similarity_search(self, query_embedding, k=5): if not self.vectors: return [] query_vector = np.array(query_embedding) similarities = [] for i, vector in enumerate(self.vectors): sim = np.dot(query_vector, vector) / (np.linalg.norm(query_vector) * np.linalg.norm(vector)) similarities.append((i, sim)) similarities.sort(key=lambda x: x[1], reverse=True) results = [] for i in range(min(k, len(similarities))): idx, score = similarities[i] results.append({ "text": self.texts[idx], "metadata": self.metadata[idx], "similarity": score }) return results
主流程与RAG实现
处理PDF文档,提取文本、分块、生成向量并构建向量库
def process_document(pdf_path: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> SimpleVectorStore: """ 处理PDF文档,提取文本、分块、生成向量并构建向量库。 """ print("[主流程] 开始处理文档...") extracted_text = extract_text_from_pdf(pdf_path) text_chunks = chunk_text(extracted_text, chunk_size, chunk_overlap) print("[主流程] 初始化向量库...") vector_store = SimpleVectorStore() print("[主流程] 为每个块生成向量...") for i, chunk in enumerate(text_chunks): print(f"[块{i+1}/{len(text_chunks)}] 正在处理文本块,长度: {len(chunk)} 字符") chunk_emb = create_embeddings(chunk) vector_store.add_item(chunk, chunk_emb, {"type": "chunk", "index": i}) print("[主流程] 文档处理完毕,向量库构建完成\n") return vector_store
用变换后的查询进行检索
def transformed_search(query: str, vector_store: SimpleVectorStore, transformation_type: str, top_k: int = 3) -> List[Dict]: """ 用变换后的查询进行检索。 """ print(f"[检索] 查询变换类型: {transformation_type}") print(f"[检索] 原始问题: {query}") results = [] if transformation_type == "rewrite": transformed_query = rewrite_query(query) query_embedding = create_embeddings(transformed_query) results = vector_store.similarity_search(query_embedding, k=top_k) elif transformation_type == "step_back": transformed_query = generate_step_back_query(query) query_embedding = create_embeddings(transformed_query) results = vector_store.similarity_search(query_embedding, k=top_k) elif transformation_type == "decompose": sub_queries = decompose_query(query) print("[检索] 子问题:") for i, sub_q in enumerate(sub_queries, 1): print(f" {i}. {sub_q}") sub_query_embeddings = create_embeddings(sub_queries) if isinstance(sub_query_embeddings, np.ndarray): sub_query_embeddings = [sub_query_embeddings] all_results = [] for i, embedding in enumerate(sub_query_embeddings): sub_results = vector_store.similarity_search(embedding, k=2) all_results.extend(sub_results) seen_texts = {} for result in all_results: text = result["text"] if text not in seen_texts or result["similarity"] > seen_texts[text]["similarity"]: seen_texts[text] = result results = sorted(seen_texts.values(), key=lambda x: x["similarity"], reverse=True)[:top_k] else: query_embedding = create_embeddings(query) results = vector_store.similarity_search(query_embedding, k=top_k) print(f"[检索] 检索结果数: {len(results)}\n") return results
用大模型基于上下文生成回答
def generate_response(query: str, context: str, model: str = LLM_MODEL) -> str: """ 用大模型基于上下文生成回答。 """ print("[生成] 正在调用大模型生成回答...") system_prompt = "你是一个AI助手,只能基于给定上下文回答问题。如果上下文无法直接回答,请回复:'信息不足,无法回答。'" user_prompt = f"""上下文:\n{context}\n\n问题:{query}\n\n请只基于上述上下文简明准确作答。""" try: response = Generation.call( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], api_key=ALI_API_KEY, result_format='message' ) if response.status_code == 200: answer = response.output.choices[0].message.content.strip() print(f"[生成] 回答生成成功: {answer}\n") return answer else: print(f"[生成] 回答生成失败: {response.message}") return "" except Exception as e: print(f"[生成] 回答生成异常: {e}") return ""
完整RAG流程
def rag_with_query_transformation(pdf_path: str, query: str, transformation_type: str = None): """ 完整RAG流程,支持可选的查询变换。 """ vector_store = process_document(pdf_path) if transformation_type: results = transformed_search(query, vector_store, transformation_type) else: query_embedding = create_embeddings(query) results = vector_store.similarity_search(query_embedding, k=3) context = "\n\n".join([f"PASSAGE {i+1}:\n{result['text']}" for i, result in enumerate(results)]) response = generate_response(query, context) return { "original_query": query, "transformation_type": transformation_type, "context": context, "response": response }
附录
执行结果展示
===== RAG 查询变换增强示例 =====[主流程] 开始处理文档...[PDF提取] 正在提取: data/2888年Java程序员找工作最新场景题.pdf - 已提取第1页 - 已提取第2页 - 已提取第3页 - 已提取第4页 - 已提取第5页 - 已提取第6页 - 已提取第7页 - 已提取第8页 - 已提取第9页 - 已提取第10页[PDF提取] 完成,总长度: 6984 字符[分块] 每块1000字符,重叠200字符[分块] 完成,共9块[主流程] 初始化向量库...[主流程] 为每个块生成向量...[块1/9] 正在处理文本块,长度: 1000 字符[嵌入生成] 正在生成1条文本的向量...[嵌入生成] 成功,返回1条向量[块2/9] 正在处理文本块,长度: 1000 字符[嵌入生成] 正在生成1条文本的向量...[嵌入生成] 成功,返回1条向量[块3/9] 正在处理文本块,长度: 1000 字符[嵌入生成] 正在生成1条文本的向量...[嵌入生成] 成功,返回1条向量[块4/9] 正在处理文本块,长度: 1000 字符[嵌入生成] 正在生成1条文本的向量...[嵌入生成] 成功,返回1条向量[块5/9] 正在处理文本块,长度: 1000 字符[嵌入生成] 正在生成1条文本的向量...[嵌入生成] 成功,返回1条向量[块6/9] 正在处理文本块,长度: 1000 字符[嵌入生成] 正在生成1条文本的向量...[嵌入生成] 成功,返回1条向量[块7/9] 正在处理文本块,长度: 1000 字符[嵌入生成] 正在生成1条文本的向量...[嵌入生成] 成功,返回1条向量[块8/9] 正在处理文本块,长度: 1000 字符[嵌入生成] 正在生成1条文本的向量...[嵌入生成] 成功,返回1条向量[块9/9] 正在处理文本块,长度: 584 字符[嵌入生成] 正在生成1条文本的向量...[嵌入生成] 成功,返回1条向量[主流程] 文档处理完毕,向量库构建完成[检索] 查询变换类型: rewrite[检索] 原始问题: 人工智能对未来就业的影响有哪些?[查询重写] 原始问题: 人工智能对未来就业的影响有哪些?[查询重写] 改写结果: 改写后的问题:人工智能技术(包括但不限于机器学习、自然语言处理、计算机视觉等)在未来十年内对全球不同行业(如制造业、服务业、信息技术业等)的就业市场会产生哪些具体影响?这些影响包括但不限于工作岗位的变化趋势(新增岗位与消失岗位)、所需技能的转变以及对劳动力结构(例如年龄、教育背景)的影响。此外,还请考虑这种变化对于政策制定者、企业雇主及员工个人分别意味着什么挑战和机遇。[嵌入生成] 正在生成1条文本的向量...[嵌入生成] 成功,返回1条向量[检索] 检索结果数: 3[生成] 正在调用大模型生成回答...[生成] 回答生成成功: 信息不足,无法回答。===== 最终结果 =====原始问题: 人工智能对未来就业的影响有哪些?变换类型: rewrite检索上下文: PASSAGE 1:涉及的行业如金融、电子商务,及特定业务如客户关系管理、支付系统等。 软实力与经验:团队管理规模、项目管理经验、个人特质等,简洁明了即可。确保简历内容与目标职位的技能要求对齐,以便快速建立匹配印象。工作与教育背景需精挑细选,强调关键项目、挑战、责任及所获成就,同时,这些内容应紧密支撑你的技能陈述,避免离题。简历长度以不超过两页A4纸为宜,采用PDF格式以保证格式一致性。可借鉴LinkedIn或MicrosoftOffice模板美化外观,力求内容精炼,重点突出。记住,简历的目标是凸显你的独特之处,哪怕仅展示两三个亮点,也足以引起注意。最后,简历是打开机遇之门的第一步,尤其在竞争激烈的就业市场中,除了实质性的技能和经验,别出心裁的简历设计与正面积极的自我评价亦能增加脱颖而出的机会。即便初始条件有限,展现出积极的态度、持续的学习意愿和解决问题的能力,同样能传达出你是一个值得投资的潜力股。1.2技术知识储备在准备简历时,切记实事求是,你所列出的每一项技能都可能成为面试对话的起点。"精通"、"熟悉"、"了解"需准确区分,以免自相矛盾。对于提及的每项技术,务必把握其基础及核心概念,因为面试过程中,面试官往往会逐步深入探讨,以此评估你的实际水平。因此,系统性复习相关书籍和资料是不可或缺的步骤,以备不时之需。例如: 如你标明熟练掌握Java,那么不仅限于基础语法,还应涵盖并发编程、NIO、JVM等进阶知识,同时对Spring、Netty等流行框架的基本原理有所认识。 提及Go语言,意味着你应至少阅读过官方的《EffectiveGo》,理解其核心理念。 当列举Redis时,对其数据结构、性能调优策略、高可用部署方式及分布式锁机制等,通过官方文档的研读应达到一定的理解深度。 如声称掌握面向对象设计,熟悉《设计模式》中的经典23种模式将是基本要求。 对于分布式架构的宣称,则需对CAP原则、微服务架构、弹性设计以及SpringCloud、CloudNative等相关技术框架有深刻理解。 关于网络编程的技能,理解TCP/IP协议的三次握手、四次挥手过程,Socket编程基础,以及select、poll、epoll等I/O多路复用技术,都是必不可少的知识点。综上所述,你简历上的每一项技术标注,都应当基于你对该技术核心知识点的PASSAGE 2:本要求。 对于分布式架构的宣称,则需对CAP原则、微服务架构、弹性设计以及SpringCloud、CloudNative等相关技术框架有深刻理解。 关于网络编程的技能,理解TCP/IP协议的三次握手、四次挥手过程,Socket编程基础,以及select、poll、epoll等I/O多路复用技术,都是必不可少的知识点。综上所述,你简历上的每一项技术标注,都应当基于你对该技术核心知识点的掌握之上。这好比备考期末考试,你需要全面回顾教材,确保掌握大多数关键知识点,即使不必面面俱到,但对于80%以上的重点内容,你都应做到心中有数。这样的准备不仅是为了应对面试,更是对自己技术深度和广度的负责态度体现。1.3项目准备(非常重要)在面试过程中,分享个人项目经历或解决过的挑战几乎是每个面试官必问的环节,但令人诧异的是,许多候选人并未对此做好充分准备。以下四个经典问题频繁出现于面试之中:1.分享一个你最为自豪或最近完成的项目。2.讲述一次你攻克的最复杂或技术含量最高的难题。3.描述一个你经历过的最具挑战性或最艰难的项目。4.谈谈你曾犯下的最大技术失误或引发的技术故障。这些问题背后,面试官的意图各异: 第一个问题旨在探查你的成就顶峰、兴趣所在; 第二、三题侧重于你的问题解决能力和面对逆境时的心态韧性; 而第四题则关注你对待错误的态度,以及是否具备反思与成长的能力。值得注意的是,面试官会通过连续追问细节来验证信息的真实性,因为虚构的情节难以在严密的追问下自圆其说。为有效应对这类问题,以下建议或许能帮助你更好地准备: 构建故事框架:运用STAR法则(情境Situation、任务Task、行动Action、结果Result)来组织你的叙述,确保内容条理清晰,避免冗长繁杂。 添加细节:丰富的技术细节是说服力的关键,它能让故事显得更加真实可信。 注入情感:真挚的情感表达能传递你的热情、自豪与坚持,确保情感源自真实的体验。 融入反思:在叙述中穿插你的思考、教训总结及后续的改进措施,展现你的成长和成熟。达到这样的叙述水平并非易事,需要持续的练习与积累。日常工作中,培养即时总结的习惯,对经历进行记录与反思,是避免临阵磨枪的有效方法。此外,提升语言组织能力与逻辑思维同样重要。通过撰写工作文档和经营个人技术博客,不仅可以锻PASSAGE 3:2024年Java程序员找工作最新面试攻略这个文档是帮助正在找工作以及准备找工作的同学,在面试之前去复习和突击的一种方式。适合已经在技术领域有一定积累,然后不确定面试切入点,所以可以通过这个面试文档来预热和巩固。想直接通过刷面试文档找到工作的同学也要注意,面试文档的内容是静态的,但是面试过程是动态的,面试官对于某一个领域的考察,通常是通过连环问的方式去问,所以在面试之前,求职者要对Java相关技术有一个体系化的了解,从而更好地突出自己的综合能力。在科技日新月异的今天,软件开发行业正经历着前所未有的变革。Java,作为企业级应用开发的中流砥柱,其生态系统也在不断进化,从微服务架构的普及到云原生技术的兴起,再到AI与大数据的深度融合,Java程序员的角色和技能需求随之迭代升级。面对这样的行业背景,如何在求职路上脱颖而出,成为每位开发者必须深思的问题。随着Java这个赛道的不断内卷,这两年,Java程序员的面试,从原来的常规八股文(有标准答案)到现在,以项目、场景问题、技术深度思考为主,逐步转变成没有标准答案,需要大家基于自己的理解和技术底蕴来回答。那针对市场中新的需求,有没有最新的面试攻略呢?其实也是有的,虽然说没有标准答案,但是我们可以针对如今市场的面试变化,来针对性的设计一些面试回答的思路,让大家有一个清晰和明确的方向。这里有什么?1.针对2024年面试行情的变化设计的面试场景题以及回答思路2.如何快速通过面试的详细攻略3.简历优化技巧1.知己知彼才能百战百胜,如何做好面试前的准备工作2024年的行情,和3~4年前不同,通过海量简历投递和海量面试找工作的时代已经过去了。在如今面试机会较少,并且面试难度较大的情况下。充分做好面试的准备才是快速通过面试最有效的方法!切忌把真实面试当靶场,最后带来的代价是非常巨大的!面试无非就两个部分,投简历、面试!很多人把重心放在投简历上,忽略了准备面试的重要性,最后的结果是获得了面试机会但是在面试过程中被刷下来了。1.1怎么写简历着手准备的第一步聚焦于简历的打造。简历是他人初步了解你的窗口,其重要性不言而喻,因此精心构思简历至关重要。理想的简历应当围绕你的亲身经历构建,正如某些杰出人士仅凭一句“Unix的创造者”便足以令人印象深刻。尽管并非所有人都拥有如此耀眼大模型回答: 信息不足,无法回答。进程已结束,退出代码为 0
完整示例代码
# -*- coding: utf-8 -*-"""RAG 查询变换增强脚本(基于阿里大模型Qwen)========================================本脚本实现了三种查询变换技术:查询重写、Step-back提示、子查询分解。所有大模型调用均基于阿里云通义千问Qwen,密钥从api_keys.py读取。每一步均有详细控制台输出,便于观察效果。"""import osimport numpy as npimport jsonimport sysfrom typing import List, Dictfrom PyPDF2 import PdfReaderfrom dashscope import Generation, TextEmbedding# ========== 密钥配置 ==========# try:# from test.api_keys import ALI_API_KEY# except ImportError:# raise RuntimeError("未找到test/api_keys.py或未定义ALI_API_KEY,请配置API密钥!")ALI_API_KEY="sk-148deabc0bcf4fdeaa70a78eaa829c7e"# =============================LLM_MODEL = "qwen-max" # 通义千问主力模型EMBEDDING_MODEL = "text-embedding-v2" # 阿里云嵌入模型# ========== 查询变换相关函数 ==========def rewrite_query(original_query: str, model: str = LLM_MODEL) -> str: """ 查询重写:让查询更具体详细,提高检索精度。 """ print(f"[查询重写] 原始问题: {original_query}") system_prompt = "你是查询优化专家,请将用户问题改写得更具体详细,包含有助于检索的相关术语和细节。" user_prompt = f"""请将下列问题改写得更具体详细,补充有助于检索的相关术语和细节:\n\n原始问题:{original_query}\n\n改写后:""" try: response = Generation.call( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], api_key=ALI_API_KEY, result_format='message' ) if response.status_code == 200: rewritten = response.output.choices[0].message.content.strip() print(f"[查询重写] 改写结果: {rewritten}\n") return rewritten else: print(f"[查询重写] 失败: {response.message}") return original_query except Exception as e: print(f"[查询重写] 异常: {e}") return original_querydef generate_step_back_query(original_query: str, model: str = LLM_MODEL) -> str: """ Step-back提示:生成更宽泛的背景性问题。 """ print(f"[Step-back] 原始问题: {original_query}") system_prompt = "你是检索策略专家,请将具体问题泛化为更宽泛的背景性问题,用于获取相关背景信息。" user_prompt = f"""请将下列问题泛化为更宽泛的背景性问题:\n\n原始问题:{original_query}\n\n泛化后:""" try: response = Generation.call( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], api_key=ALI_API_KEY, result_format='message' ) if response.status_code == 200: step_back = response.output.choices[0].message.content.strip() print(f"[Step-back] 泛化结果: {step_back}\n") return step_back else: print(f"[Step-back] 失败: {response.message}") return original_query except Exception as e: print(f"[Step-back] 异常: {e}") return original_querydef decompose_query(original_query: str, num_subqueries: int = 4, model: str = LLM_MODEL) -> List[str]: """ 子查询分解:将复杂问题拆解为若干简单子问题。 """ print(f"[子查询分解] 原始问题: {original_query}") system_prompt = "你是复杂问题拆解专家,请将复杂问题拆解为若干简单子问题,每个子问题关注不同方面。" user_prompt = f"""请将下列复杂问题拆解为{num_subqueries}个子问题,每行一个:\n\n原始问题:{original_query}\n\n子问题:""" try: response = Generation.call( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], api_key=ALI_API_KEY, result_format='message' ) if response.status_code == 200: content = response.output.choices[0].message.content.strip() lines = content.split("\n") sub_queries = [] for line in lines: if line.strip() and any(line.strip().startswith(f"{i}.") for i in range(1, 10)): q = line.strip() q = q[q.find(".")+1:].strip() sub_queries.append(q) elif line.strip(): sub_queries.append(line.strip()) print(f"[子查询分解] 子问题: {sub_queries}\n") return sub_queries else: print(f"[子查询分解] 失败: {response.message}") return [original_query] except Exception as e: print(f"[子查询分解] 异常: {e}") return [original_query]# ========== PDF文本处理与分块 ==========def extract_text_from_pdf(pdf_path: str) -> str: """ 从PDF文件中提取全部文本。 """ print(f"[PDF提取] 正在提取: {pdf_path}") with open(pdf_path, 'rb') as f: reader = PdfReader(f) text = "" for i, page in enumerate(reader.pages): page_text = page.extract_text() if page_text: text += page_text print(f" - 已提取第{i+1}页") print(f"[PDF提取] 完成,总长度: {len(text)} 字符\n") return textdef chunk_text(text: str, n: int = 1000, overlap: int = 200) -> List[str]: """ 将文本分割为带重叠的块。 """ print(f"[分块] 每块{n}字符,重叠{overlap}字符") chunks = [] for i in range(0, len(text), n - overlap): chunks.append(text[i:i + n]) print(f"[分块] 完成,共{len(chunks)}块\n") return chunks# ========== 向量生成与存储 ==========def create_embeddings(texts, model: str = EMBEDDING_MODEL) -> List[np.ndarray]: """ 用阿里embedding模型批量生成文本向量。 支持单条或多条文本。 """ if isinstance(texts, str): texts = [texts] print(f"[嵌入生成] 正在生成{len(texts)}条文本的向量...") try: response = TextEmbedding.call( model=model, input=texts, api_key=ALI_API_KEY ) if response.status_code == 200: embeddings = [np.array(item['embedding']) for item in response.output['embeddings']] print(f"[嵌入生成] 成功,返回{len(embeddings)}条向量\n") return embeddings if len(embeddings) > 1 else embeddings[0] else: print(f"[嵌入生成] 失败: {response.message}") return [np.zeros(1536)] * len(texts) except Exception as e: print(f"[嵌入生成] 异常: {e}") return [np.zeros(1536)] * len(texts)class SimpleVectorStore: """ 简单的向量存储与检索类。 """ def __init__(self): self.vectors = [] self.texts = [] self.metadata = [] def add_item(self, text, embedding, metadata=None): self.vectors.append(np.array(embedding)) self.texts.append(text) self.metadata.append(metadata or {}) def similarity_search(self, query_embedding, k=5): if not self.vectors: return [] query_vector = np.array(query_embedding) similarities = [] for i, vector in enumerate(self.vectors): sim = np.dot(query_vector, vector) / (np.linalg.norm(query_vector) * np.linalg.norm(vector)) similarities.append((i, sim)) similarities.sort(key=lambda x: x[1], reverse=True) results = [] for i in range(min(k, len(similarities))): idx, score = similarities[i] results.append({ "text": self.texts[idx], "metadata": self.metadata[idx], "similarity": score }) return results# ========== 主流程与RAG实现 ==========def process_document(pdf_path: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> SimpleVectorStore: """ 处理PDF文档,提取文本、分块、生成向量并构建向量库。 """ print("[主流程] 开始处理文档...") extracted_text = extract_text_from_pdf(pdf_path) text_chunks = chunk_text(extracted_text, chunk_size, chunk_overlap) print("[主流程] 初始化向量库...") vector_store = SimpleVectorStore() print("[主流程] 为每个块生成向量...") for i, chunk in enumerate(text_chunks): print(f"[块{i+1}/{len(text_chunks)}] 正在处理文本块,长度: {len(chunk)} 字符") chunk_emb = create_embeddings(chunk) vector_store.add_item(chunk, chunk_emb, {"type": "chunk", "index": i}) print("[主流程] 文档处理完毕,向量库构建完成\n") return vector_storedef transformed_search(query: str, vector_store: SimpleVectorStore, transformation_type: str, top_k: int = 3) -> List[Dict]: """ 用变换后的查询进行检索。 """ print(f"[检索] 查询变换类型: {transformation_type}") print(f"[检索] 原始问题: {query}") results = [] if transformation_type == "rewrite": transformed_query = rewrite_query(query) query_embedding = create_embeddings(transformed_query) results = vector_store.similarity_search(query_embedding, k=top_k) elif transformation_type == "step_back": transformed_query = generate_step_back_query(query) query_embedding = create_embeddings(transformed_query) results = vector_store.similarity_search(query_embedding, k=top_k) elif transformation_type == "decompose": sub_queries = decompose_query(query) print("[检索] 子问题:") for i, sub_q in enumerate(sub_queries, 1): print(f" {i}. {sub_q}") sub_query_embeddings = create_embeddings(sub_queries) if isinstance(sub_query_embeddings, np.ndarray): sub_query_embeddings = [sub_query_embeddings] all_results = [] for i, embedding in enumerate(sub_query_embeddings): sub_results = vector_store.similarity_search(embedding, k=2) all_results.extend(sub_results) seen_texts = {} for result in all_results: text = result["text"] if text not in seen_texts or result["similarity"] > seen_texts[text]["similarity"]: seen_texts[text] = result results = sorted(seen_texts.values(), key=lambda x: x["similarity"], reverse=True)[:top_k] else: query_embedding = create_embeddings(query) results = vector_store.similarity_search(query_embedding, k=top_k) print(f"[检索] 检索结果数: {len(results)}\n") return resultsdef generate_response(query: str, context: str, model: str = LLM_MODEL) -> str: """ 用大模型基于上下文生成回答。 """ print("[生成] 正在调用大模型生成回答...") system_prompt = "你是一个AI助手,只能基于给定上下文回答问题。如果上下文无法直接回答,请回复:'信息不足,无法回答。'" user_prompt = f"""上下文:\n{context}\n\n问题:{query}\n\n请只基于上述上下文简明准确作答。""" try: response = Generation.call( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], api_key=ALI_API_KEY, result_format='message' ) if response.status_code == 200: answer = response.output.choices[0].message.content.strip() print(f"[生成] 回答生成成功: {answer}\n") return answer else: print(f"[生成] 回答生成失败: {response.message}") return "" except Exception as e: print(f"[生成] 回答生成异常: {e}") return ""def rag_with_query_transformation(pdf_path: str, query: str, transformation_type: str = None): """ 完整RAG流程,支持可选的查询变换。 """ vector_store = process_document(pdf_path) if transformation_type: results = transformed_search(query, vector_store, transformation_type) else: query_embedding = create_embeddings(query) results = vector_store.similarity_search(query_embedding, k=3) context = "\n\n".join([f"PASSAGE {i+1}:\n{result['text']}" for i, result in enumerate(results)]) response = generate_response(query, context) return { "original_query": query, "transformation_type": transformation_type, "context": context, "response": response }# ========== main方法示例 ==========if __name__ == "__main__": # 示例PDF路径(请替换为实际文件) pdf_path = "data/2888年Java程序员找工作最新场景题.pdf" # 示例问题 query = "人工智能对未来就业的影响有哪些?" # 选择变换类型:None, "rewrite", "step_back", "decompose" transformation_type = "rewrite" # 可改为step_back、decompose或None print("\n===== RAG 查询变换增强示例 =====\n") result = rag_with_query_transformation(pdf_path, query, transformation_type) print("\n===== 最终结果 =====") print(f"原始问题: {result['original_query']}") print(f"变换类型: {result['transformation_type']}") print(f"检索上下文: \n{result['context']}") print(f"大模型回答: \n{result['response']}")