自适应检索助力增强型RAG系统
实现一个自适应检索系统,该系统可根据查询类型动态选择最合适的检索策略。这种方法显著提升RAG系统针对各种不同问题提供准确且相关回复的能力。
不同的问题需要不同的检索策略:
- 对查询类型进行分类(事实型、分析型、观点型或上下文型)选择合适的检索策略执行专门的检索技术生成定制化回复
具体的代码实现
配置部分
class AdaptiveRAGConfig: """自适应RAG系统配置类""" # 阿里云API配置 API_KEY = "sk-fc6ad8ecef4b446exxxxxx23" # 阿里云API密钥(请替换为实际密钥) LLM_MODEL = "qwen-max" # 通义千问主力模型 EMBEDDING_MODEL = "text-embedding-v2" # 阿里云嵌入模型 # 文档处理配置 CHUNK_SIZE = 1000 # 文档块大小 CHUNK_OVERLAP = 200 # 块间重叠大小 # 检索配置 DEFAULT_TOP_K = 4 # 默认检索数量 TEMPERATURE = 0.1 # 生成温度 # 文件路径 DEFAULT_PDF_PATH = "data/2888年Java程序员找工作最新场景题.pdf" # 默认PDF路径print("=" * 60)print("🚀 自适应RAG系统启动中...")print("📚 基于阿里云通义千问 + text-embedding-v2")print("🔍 支持智能查询分类和自适应检索策略")print("=" * 60)
辅助函数
def print_step(step_name: str, description: str = ""): """打印步骤信息""" print(f"\n{'='*10} {step_name} {'='*10}") if description: print(f"📝 {description}")def print_result(result_type: str, content: str, max_length: int = 200): """打印结果信息""" truncated = content[:max_length] + "..." if len(content) > max_length else content print(f"✅ {result_type}: {truncated}")
PDF文本提取模块
def extract_text_from_pdf(pdf_path: str) -> str: """ 从PDF文件中提取全部文本内容 Args: pdf_path: PDF文件路径 Returns: 提取的文本内容 """ print_step("PDF文本提取", f"正在处理文件: {pdf_path}") if not os.path.exists(pdf_path): raise FileNotFoundError(f"PDF文件不存在: {pdf_path}") try: with open(pdf_path, 'rb') as file: reader = PdfReader(file) all_text = "" for page_num, page in enumerate(reader.pages, 1): page_text = page.extract_text() if page_text: all_text += page_text print(f" 📄 已处理第 {page_num} 页,累计 {len(all_text)} 字符") print_result("文本提取完成", f"总页数: {len(reader.pages)},总字符数: {len(all_text)}") return all_text except Exception as e: print(f"❌ PDF提取失败: {e}") raise
文本分块模块
def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 200) -> List[str]: """ 将长文本分割为带重叠的小块,便于向量检索 Args: text: 原始长文本 chunk_size: 每块的字符数 overlap: 块间重叠字符数 Returns: 文本块列表 """ print_step("文本分块", f"分块大小: {chunk_size}, 重叠: {overlap}") if not text.strip(): print("⚠️ 输入文本为空") return [] chunks = [] start = 0 step = chunk_size - overlap while start < len(text): end = min(start + chunk_size, len(text)) chunk = text[start:end] chunks.append(chunk) if end == len(text): break start += step print_result("分块完成", f"共生成 {len(chunks)} 个文本块") for i, chunk in enumerate(chunks[:3]): # 显示前3块的预览 print(f" 📝 块 {i+1}: {chunk[:100]}...") return chunks
向量生成模块
def create_embeddings(texts: Union[str, List[str]], model: str = AdaptiveRAGConfig.EMBEDDING_MODEL) -> Union[np.ndarray, List[np.ndarray]]: """ 使用阿里云嵌入模型生成文本向量 Args: texts: 单个文本或文本列表 model: 嵌入模型名称 Returns: 对应的向量或向量列表 """ # 统一处理为列表格式 is_single = isinstance(texts, str) text_list = [texts] if is_single else texts print_step("向量生成", f"正在为 {len(text_list)} 条文本生成向量") try: response = TextEmbedding.call( model=model, input=text_list, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: embeddings = [np.array(item['embedding']) for item in response.output['embeddings']] print_result("向量生成成功", f"生成 {len(embeddings)} 个向量,维度: {len(embeddings[0])}") return embeddings[0] if is_single else embeddings else: print(f"❌ 向量生成失败: {response.message}") # 返回零向量作为fallback fallback_dim = 1536 # text-embedding-v2的维度 fallback = [np.zeros(fallback_dim) for _ in text_list] return fallback[0] if is_single else fallback except Exception as e: print(f"❌ 向量生成异常: {e}") # 返回零向量作为fallback fallback_dim = 1536 fallback = [np.zeros(fallback_dim) for _ in text_list] return fallback[0] if is_single else fallback
简单向量数据库
class SimpleVectorStore: """ 简单的向量存储和检索系统 支持向量相似度搜索和元数据管理 """ def __init__(self): """初始化向量存储""" self.vectors: List[np.ndarray] = [] # 存储向量 self.texts: List[str] = [] # 存储原始文本 self.metadata: List[Dict] = [] # 存储元数据 print("🗄️ 向量数据库初始化完成") def add_item(self, text: str, embedding: np.ndarray, metadata: Optional[Dict] = None): """ 向数据库添加一个项目 Args: text: 原始文本 embedding: 文本向量 metadata: 元数据字典 """ self.vectors.append(np.array(embedding)) self.texts.append(text) self.metadata.append(metadata or {}) def similarity_search(self, query_embedding: np.ndarray, k: int = 5, filter_func: Optional = None) -> List[Dict]: """ 基于向量相似度搜索最相关的文档 Args: query_embedding: 查询向量 k: 返回结果数量 filter_func: 过滤函数 Returns: 搜索结果列表,每个结果包含text, metadata, similarity """ if not self.vectors: return [] query_vector = np.array(query_embedding) similarities = [] for i, vector in enumerate(self.vectors): # 应用过滤器 if filter_func and not filter_func(self.metadata[i]): continue # 计算余弦相似度 similarity = np.dot(query_vector, vector) / (np.linalg.norm(query_vector) * np.linalg.norm(vector)) similarities.append((i, similarity)) # 按相似度降序排序 similarities.sort(key=lambda x: x[1], reverse=True) # 返回前k个结果 results = [] for i in range(min(k, len(similarities))): idx, score = similarities[i] results.append({ "text": self.texts[idx], "metadata": self.metadata[idx], "similarity": score }) return results
分类查询模块
def classify_query(query: str, model: str = AdaptiveRAGConfig.LLM_MODEL) -> str: """ 使用LLM对查询进行智能分类 分类标准: - Factual: 寻求具体、可验证信息的查询 - Analytical: 需要全面分析或解释的查询 - Opinion: 关于主观事物或寻求多元观点的查询 - Contextual: 依赖用户特定背景的查询 Args: query: 用户查询 model: 使用的语言模型 Returns: 查询类型 (Factual/Analytical/Opinion/Contextual) """ print_step("查询分类", f"正在分析查询: '{query}'") system_prompt = """你是一个专业的查询分类专家。请将用户查询精确分类为以下四个类别之一:1. Factual(事实性): 寻求具体、可验证的信息或定义2. Analytical(分析性): 需要综合分析、解释复杂概念或机制 3. Opinion(观点性): 关于主观判断或需要多元视角的问题4. Contextual(上下文相关): 依赖特定用户背景或情境的问题只需返回类别名称,不要添加任何解释。""" user_prompt = f"请对以下查询进行分类: {query}" try: response = Generation.call( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: # 添加安全检查 - 修正阿里云API响应格式 if hasattr(response, 'output') and hasattr(response.output, 'text'): category = response.output.text.strip() # 验证返回的类别是否有效 valid_categories = ["Factual", "Analytical", "Opinion", "Contextual"] for valid in valid_categories: if valid in category: print_result("查询分类完成", f"类型: {valid}") return valid # 如果分类失败,默认为Factual print("⚠️ 分类结果无效,默认使用 Factual 类型") return "Factual" else: print("⚠️ 响应格式异常,默认使用 Factual 类型") return "Factual" else: print(f"❌ 分类请求失败: {response.message}") return "Factual" except Exception as e: print(f"❌ 分类异常: {e}") return "Factual"
自适应检索策略模块
def factual_retrieval_strategy(query: str, vector_store: SimpleVectorStore, k: int = 4) -> List[Dict]: """ 事实性查询的检索策略 - 注重精确性 策略特点: 1. 使用LLM增强查询,提高精确度 2. 对检索结果进行相关性评分 3. 返回最相关的精确信息 Args: query: 用户查询 vector_store: 向量数据库 k: 返回结果数量 Returns: 检索结果列表 """ print_step("事实性检索策略", f"为查询 '{query}' 执行精确检索") # 使用LLM增强查询 system_prompt = """你是搜索查询优化专家。请将用户的事实性查询重新表述为更精确、更具体的搜索查询,重点关注关键实体及其关系。只返回优化后的查询,不要解释。""" user_prompt = f"优化这个事实性查询: {query}" try: response = Generation.call( model=AdaptiveRAGConfig.LLM_MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: if hasattr(response, 'output') and hasattr(response.output, 'text'): enhanced_query = response.output.text.strip() print_result("查询增强", enhanced_query) else: enhanced_query = query print("⚠️ 查询增强响应格式异常,使用原始查询") else: enhanced_query = query print("⚠️ 查询增强失败,使用原始查询") except: enhanced_query = query print("⚠️ 查询增强异常,使用原始查询") # 生成增强查询的向量并检索 query_embedding = create_embeddings(enhanced_query) initial_results = vector_store.similarity_search(query_embedding, k*2) print(f"📊 初始检索到 {len(initial_results)} 个候选结果") # 对结果进行相关性评分 ranked_results = [] for doc in initial_results: relevance_score = score_document_relevance(enhanced_query, doc["text"]) ranked_results.append({ "text": doc["text"], "metadata": doc["metadata"], "similarity": doc["similarity"], "relevance_score": relevance_score }) # 按相关性评分排序 ranked_results.sort(key=lambda x: x["relevance_score"], reverse=True) final_results = ranked_results[:k] print_result("事实性检索完成", f"返回 {len(final_results)} 个最相关结果") return final_resultsdef analytical_retrieval_strategy(query: str, vector_store: SimpleVectorStore, k: int = 4) -> List[Dict]: """ 分析性查询的检索策略 - 注重全面性 策略特点: 1. 生成多个子查询覆盖不同方面 2. 确保检索结果的多样性 3. 提供全面的信息覆盖 Args: query: 用户查询 vector_store: 向量数据库 k: 返回结果数量 Returns: 检索结果列表 """ print_step("分析性检索策略", f"为查询 '{query}' 执行全面检索") # 生成多个子查询 system_prompt = """你是分析问题专家。请为下面的分析性查询生成3个不同角度的子问题,这些子问题应该帮助全面理解主题的不同方面。每行一个问题,不要编号。""" user_prompt = f"为这个分析性查询生成子问题: {query}" try: response = Generation.call( model=AdaptiveRAGConfig.LLM_MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0.3, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: if hasattr(response, 'output') and hasattr(response.output, 'text'): sub_queries = [q.strip() for q in response.output.text.strip().split('\n') if q.strip()] print_result("子查询生成", f"生成了 {len(sub_queries)} 个子查询") for i, sq in enumerate(sub_queries, 1): print(f" {i}. {sq}") else: sub_queries = [query] print("⚠️ 子查询生成响应格式异常,使用原始查询") else: sub_queries = [query] print("⚠️ 子查询生成失败,使用原始查询") except: sub_queries = [query] print("⚠️ 子查询生成异常,使用原始查询") # 为每个子查询检索文档 all_results = [] for i, sub_query in enumerate(sub_queries, 1): print(f"🔍 执行子查询 {i}: {sub_query}") sub_query_embedding = create_embeddings(sub_query) results = vector_store.similarity_search(sub_query_embedding, k=2) all_results.extend(results) # 去重并确保多样性 unique_texts = set() diverse_results = [] for result in all_results: if result["text"] not in unique_texts: unique_texts.add(result["text"]) diverse_results.append(result) print(f"📊 去重后保留 {len(diverse_results)} 个不同结果") # 如果结果不足,用主查询补充 if len(diverse_results) < k: main_query_embedding = create_embeddings(query) main_results = vector_store.similarity_search(main_query_embedding, k=k) for result in main_results: if result["text"] not in unique_texts and len(diverse_results) < k: unique_texts.add(result["text"]) diverse_results.append(result) final_results = diverse_results[:k] print_result("分析性检索完成", f"返回 {len(final_results)} 个多样化结果") return final_resultsdef opinion_retrieval_strategy(query: str, vector_store: SimpleVectorStore, k: int = 4) -> List[Dict]: """ 观点性查询的检索策略 - 注重多样性 策略特点: 1. 识别不同的观点角度 2. 为每个观点检索代表性文档 3. 确保观点的平衡性 Args: query: 用户查询 vector_store: 向量数据库 k: 返回结果数量 Returns: 检索结果列表 """ print_step("观点性检索策略", f"为查询 '{query}' 检索多元观点") # 识别不同观点 system_prompt = """你是观点分析专家。请为下面的观点性问题识别3个不同的观点角度或立场。每行一个观点角度,不要编号,要简洁明确。""" user_prompt = f"识别这个问题的不同观点: {query}" try: response = Generation.call( model=AdaptiveRAGConfig.LLM_MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0.3, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: if hasattr(response, 'output') and hasattr(response.output, 'text'): viewpoints = [v.strip() for v in response.output.text.strip().split('\n') if v.strip()] print_result("观点识别", f"识别了 {len(viewpoints)} 个不同观点") for i, vp in enumerate(viewpoints, 1): print(f" 观点{i}: {vp}") else: viewpoints = [query] print("⚠️ 观点识别响应格式异常,使用原始查询") else: viewpoints = [query] print("⚠️ 观点识别失败,使用原始查询") except: viewpoints = [query] print("⚠️ 观点识别异常,使用原始查询") # 为每个观点检索文档 all_results = [] for i, viewpoint in enumerate(viewpoints, 1): combined_query = f"{query} {viewpoint}" print(f"🔍 检索观点 {i}: {viewpoint}") viewpoint_embedding = create_embeddings(combined_query) results = vector_store.similarity_search(viewpoint_embedding, k=2) # 标记观点来源 for result in results: result["viewpoint"] = viewpoint all_results.extend(results) # 确保每个观点至少有一个代表 selected_results = [] for viewpoint in viewpoints: viewpoint_docs = [r for r in all_results if r.get("viewpoint") == viewpoint] if viewpoint_docs: selected_results.append(viewpoint_docs[0]) # 用剩余高相似度文档填充 remaining_slots = k - len(selected_results) if remaining_slots > 0: remaining_docs = [r for r in all_results if r not in selected_results] remaining_docs.sort(key=lambda x: x["similarity"], reverse=True) selected_results.extend(remaining_docs[:remaining_slots]) final_results = selected_results[:k] print_result("观点性检索完成", f"返回 {len(final_results)} 个多元观点结果") return final_resultsdef contextual_retrieval_strategy(query: str, vector_store: SimpleVectorStore, k: int = 4, user_context: str = None) -> List[Dict]: """ 上下文相关查询的检索策略 - 注重上下文集成 策略特点: 1. 推断或使用提供的用户上下文 2. 将上下文融入查询重构 3. 考虑上下文相关性进行评分 Args: query: 用户查询 vector_store: 向量数据库 k: 返回结果数量 user_context: 用户上下文(可选) Returns: 检索结果列表 """ print_step("上下文相关检索策略", f"为查询 '{query}' 执行上下文感知检索") # 如果没有提供上下文,尝试推断 if not user_context: system_prompt = """你是上下文推断专家。请分析下面的查询,推断出什么样的背景信息或上下文对回答这个问题是重要的。返回简洁的上下文描述。""" user_prompt = f"推断这个查询的隐含上下文: {query}" try: response = Generation.call( model=AdaptiveRAGConfig.LLM_MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0.1, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: if hasattr(response, 'output') and hasattr(response.output, 'text'): user_context = response.output.text.strip() print_result("上下文推断", user_context) else: user_context = "通用技术背景" print("⚠️ 上下文推断响应格式异常,使用默认上下文") else: user_context = "通用技术背景" print("⚠️ 上下文推断失败,使用默认上下文") except: user_context = "通用技术背景" print("⚠️ 上下文推断异常,使用默认上下文") # 将上下文融入查询 system_prompt = """你是查询重构专家。请将用户查询和提供的上下文信息结合,生成一个更具体、更有针对性的搜索查询。只返回重构后的查询。""" user_prompt = f"""查询: {query}上下文: {user_context}请生成融入上下文的重构查询:""" try: response = Generation.call( model=AdaptiveRAGConfig.LLM_MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: if hasattr(response, 'output') and hasattr(response.output, 'text'): contextualized_query = response.output.text.strip() print_result("查询重构", contextualized_query) else: contextualized_query = query print("⚠️ 查询重构响应格式异常,使用原始查询") else: contextualized_query = query print("⚠️ 查询重构失败,使用原始查询") except: contextualized_query = query print("⚠️ 查询重构异常,使用原始查询") # 基于上下文化查询检索文档 query_embedding = create_embeddings(contextualized_query) initial_results = vector_store.similarity_search(query_embedding, k*2) print(f"📊 基于上下文检索到 {len(initial_results)} 个候选结果") # 按上下文相关性重新评分 ranked_results = [] for doc in initial_results: context_relevance = score_document_context_relevance(query, user_context, doc["text"]) ranked_results.append({ "text": doc["text"], "metadata": doc["metadata"], "similarity": doc["similarity"], "context_relevance": context_relevance }) # 按上下文相关性排序 ranked_results.sort(key=lambda x: x["context_relevance"], reverse=True) final_results = ranked_results[:k] print_result("上下文检索完成", f"返回 {len(final_results)} 个上下文相关结果") return final_results
文档评分辅助函数
def score_document_relevance(query: str, document: str, model: str = AdaptiveRAGConfig.LLM_MODEL) -> float: """ 使用LLM评估文档与查询的相关性 Args: query: 用户查询 document: 文档内容 model: 评分模型 Returns: 相关性分数 (0-10) """ system_prompt = """你是文档相关性评估专家。请对文档与查询的相关性进行评分,评分标准:0-3分: 完全不相关或基本不相关4-6分: 有一定相关性但不够直接7-8分: 较为相关,能部分回答问题9-10分: 高度相关,能很好地回答问题只返回0-10之间的数字分数,不要解释。""" # 截断文档以避免超长 doc_preview = document[:1500] + "..." if len(document) > 1500 else document user_prompt = f"""查询: {query}文档: {doc_preview}相关性评分 (0-10):""" try: response = Generation.call( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: if hasattr(response, 'output') and hasattr(response.output, 'text'): score_text = response.output.text.strip() # 提取数字分数 match = re.search(r'(\d+(\.\d+)?)', score_text) if match: score = float(match.group(1)) return min(10, max(0, score)) # 确保分数在0-10范围内 else: return 5.0 else: return 5.0 else: return 5.0 except: return 5.0def score_document_context_relevance(query: str, context: str, document: str, model: str = AdaptiveRAGConfig.LLM_MODEL) -> float: """ 评估文档在特定上下文下与查询的相关性 Args: query: 用户查询 context: 用户上下文 document: 文档内容 model: 评分模型 Returns: 上下文相关性分数 (0-10) """ system_prompt = """你是文档上下文相关性评估专家。请评估文档在给定上下文下对查询的相关性,评分标准:0-3分: 在此上下文下完全不相关4-6分: 在此上下文下有一定相关性7-8分: 在此上下文下较为相关9-10分: 在此上下文下高度相关只返回0-10之间的数字分数,不要解释。""" # 截断文档 doc_preview = document[:1500] + "..." if len(document) > 1500 else document user_prompt = f"""查询: {query}上下文: {context}文档: {doc_preview}在此上下文下的相关性评分 (0-10):""" try: response = Generation.call( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: if hasattr(response, 'output') and hasattr(response.output, 'text'): score_text = response.output.text.strip() match = re.search(r'(\d+(\.\d+)?)', score_text) if match: score = float(match.group(1)) return min(10, max(0, score)) else: return 5.0 else: return 5.0 else: return 5.0 except: return 5.0
核心自适应检索器
def adaptive_retrieval(query: str, vector_store: SimpleVectorStore, k: int = 4, user_context: str = None) -> List[Dict]: """ 自适应检索的核心函数 - 根据查询类型选择最合适的检索策略 这是整个自适应RAG系统的核心,它会: 1. 首先对查询进行智能分类 2. 根据分类结果选择相应的检索策略 3. 执行专门优化的检索过程 4. 返回最相关的文档 Args: query: 用户查询 vector_store: 向量数据库 k: 检索文档数量 user_context: 用户上下文(仅用于上下文相关查询) Returns: 检索到的文档列表 """ print_step("🧠 自适应检索核心", f"开始处理查询: '{query}'") # 第一步:查询分类 query_type = classify_query(query) print(f"🏷️ 查询被分类为: {query_type}") # 第二步:根据查询类型选择对应的检索策略 strategy_map = { "Factual": "🎯 事实性检索策略 - 注重精确性和准确性", "Analytical": "🔍 分析性检索策略 - 注重全面性和多角度覆盖", "Opinion": "💭 观点性检索策略 - 注重多样性和平衡性", "Contextual": "🎪 上下文检索策略 - 注重情境相关性" } print(f"📋 选择策略: {strategy_map.get(query_type, '默认策略')}") # 第三步:执行相应的检索策略 if query_type == "Factual": results = factual_retrieval_strategy(query, vector_store, k) elif query_type == "Analytical": results = analytical_retrieval_strategy(query, vector_store, k) elif query_type == "Opinion": results = opinion_retrieval_strategy(query, vector_store, k) elif query_type == "Contextual": results = contextual_retrieval_strategy(query, vector_store, k, user_context) else: # 默认使用事实性检索 print("⚠️ 未识别的查询类型,使用默认事实性检索策略") results = factual_retrieval_strategy(query, vector_store, k) print_result("自适应检索完成", f"查询类型: {query_type}, 返回 {len(results)} 个结果") return results
响应生成模块
def generate_response(query: str, retrieved_docs: List[Dict], query_type: str, model: str = AdaptiveRAGConfig.LLM_MODEL) -> str: """ 基于检索结果和查询类型生成个性化响应 不同查询类型使用不同的回答风格: - 事实性:准确、简洁、权威 - 分析性:全面、深入、多角度 - 观点性:平衡、多元、客观 - 上下文相关:针对性、个性化 Args: query: 用户查询 retrieved_docs: 检索到的文档列表 query_type: 查询类型 model: 生成模型 Returns: 生成的响应 """ print_step("📝 响应生成", f"基于 {len(retrieved_docs)} 个文档生成 {query_type} 类型回答") # 组合检索到的文档内容 context = "\n\n" + "="*50 + "\n\n".join([doc["text"] for doc in retrieved_docs]) print(f"📚 组合上下文长度: {len(context)} 字符") # 根据查询类型定制系统提示 system_prompts = { "Factual": """你是一个专业的事实查询助手。请基于提供的文档准确回答用户问题。要求:- 回答要准确、简洁、权威- 重点关注事实信息和具体数据- 如果文档中没有相关信息,请明确说明- 避免推测或添加文档中没有的信息""", "Analytical": """你是一个专业的分析师。请基于提供的文档对用户问题进行全面分析。要求:- 提供深入、多角度的分析- 涵盖问题的不同方面和层面- 使用文档中的信息支撑你的分析- 指出文档可能未涵盖的分析角度""", "Opinion": """你是一个客观的观点分析师。请基于提供的文档呈现不同观点和立场。要求:- 公平呈现多元化观点- 避免偏向任何特定立场- 指出观点的依据和局限性- 如果文档观点单一,请明确说明""", "Contextual": """你是一个上下文感知的智能助手。请结合查询背景和文档信息提供针对性回答。要求:- 考虑查询的特定背景和情境- 提供个性化、针对性的建议- 将一般信息与具体情境结合- 如果需要更多上下文信息,请主动询问""" } system_prompt = system_prompts.get(query_type, system_prompts["Factual"]) # 构建用户提示 user_prompt = f"""参考文档:{context}用户问题:{query}请基于以上文档内容回答用户问题。""" try: response = Generation.call( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=AdaptiveRAGConfig.TEMPERATURE, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: if hasattr(response, 'output') and hasattr(response.output, 'text'): generated_response = response.output.text print_result("响应生成成功", f"生成了 {len(generated_response)} 字符的回答") return generated_response else: error_msg = "抱歉,生成回答时响应格式异常,请检查API配置" print("❌ 响应生成失败: 响应格式异常") return error_msg else: error_msg = f"抱歉,生成回答时出现问题:{response.message}" print(f"❌ 响应生成失败: {response.message}") return error_msg except Exception as e: error_msg = f"抱歉,生成回答时出现异常:{str(e)}" print(f"❌ 响应生成异常: {e}") return error_msg
文档处理管道
def process_document(pdf_path: str, chunk_size: int = AdaptiveRAGConfig.CHUNK_SIZE, chunk_overlap: int = AdaptiveRAGConfig.CHUNK_OVERLAP) -> tuple: """ 完整的文档处理管道 流程: 1. 从PDF提取文本 2. 将文本分块 3. 为每个块生成向量 4. 构建向量数据库 Args: pdf_path: PDF文件路径 chunk_size: 分块大小 chunk_overlap: 块间重叠 Returns: (文本块列表, 向量数据库) """ print_step("📋 文档处理管道", f"开始处理文档: {pdf_path}") # 第一步:提取文本 extracted_text = extract_text_from_pdf(pdf_path) if not extracted_text: raise ValueError("PDF文档为空或无法提取文本") # 第二步:文本分块 text_chunks = chunk_text(extracted_text, chunk_size, chunk_overlap) if not text_chunks: raise ValueError("文本分块失败") # 第三步:初始化向量存储 vector_store = SimpleVectorStore() # 第四步:为所有块生成向量并存储 print_step("🔄 批量向量化", f"为 {len(text_chunks)} 个文本块生成向量") # 批量生成向量(如果chunks太多,可以分批处理) batch_size = 20 # 阿里云API建议批量大小 all_embeddings = [] for i in range(0, len(text_chunks), batch_size): batch_chunks = text_chunks[i:i+batch_size] print(f" 📦 处理批次 {i//batch_size + 1}/{(len(text_chunks)-1)//batch_size + 1}") batch_embeddings = create_embeddings(batch_chunks) all_embeddings.extend(batch_embeddings) # 第五步:构建向量数据库 print_step("🗄️ 构建向量数据库", "将文本块和向量添加到数据库") for i, (chunk, embedding) in enumerate(zip(text_chunks, all_embeddings)): metadata = { "chunk_id": i, "source": pdf_path, "chunk_size": len(chunk), "timestamp": datetime.now().isoformat() } vector_store.add_item(chunk, embedding, metadata) if (i + 1) % 10 == 0 or i == len(text_chunks) - 1: print(f" ✅ 已处理 {i + 1}/{len(text_chunks)} 个文本块") print_result("文档处理完成", f"成功处理 {len(text_chunks)} 个文本块,构建向量数据库") return text_chunks, vector_store
完整RAG流程
def run_adaptive_rag(pdf_path: str, query: str, k: int = AdaptiveRAGConfig.DEFAULT_TOP_K, user_context: str = None) -> Dict[str, Any]: """ 运行完整的自适应RAG流程 这是系统的主入口,包含完整的处理流程: 1. 文档处理(提取、分块、向量化) 2. 查询分类 3. 自适应检索 4. 响应生成 5. 结果整理 Args: pdf_path: PDF文档路径 query: 用户查询 k: 检索文档数量 user_context: 用户上下文(可选) Returns: 包含完整结果的字典 """ print("=" * 80) print("🚀 自适应RAG系统开始运行") print(f"📖 文档: {pdf_path}") print(f"❓ 查询: {query}") print(f"🔢 检索数量: {k}") if user_context: print(f"🎯 用户上下文: {user_context}") print("=" * 80) start_time = datetime.now() try: # 步骤1:处理文档 chunks, vector_store = process_document(pdf_path) # 步骤2:执行自适应检索 retrieved_docs = adaptive_retrieval(query, vector_store, k, user_context) # 步骤3:查询分类(已在adaptive_retrieval中完成,这里重新获取用于结果记录) query_type = classify_query(query) # 步骤4:生成响应 response = generate_response(query, retrieved_docs, query_type) # 步骤5:整理结果 end_time = datetime.now() processing_time = (end_time - start_time).total_seconds() result = { "query": query, "query_type": query_type, "retrieved_documents": retrieved_docs, "response": response, "processing_time_seconds": processing_time, "document_chunks_count": len(chunks), "user_context": user_context, "timestamp": end_time.isoformat() } # 显示最终结果 print("\n" + "=" * 80) print("✨ 自适应RAG处理完成") print("=" * 80) print(f"🏷️ 查询类型: {query_type}") print(f"📊 检索文档: {len(retrieved_docs)} 个") print(f"⏱️ 处理时间: {processing_time:.2f} 秒") print(f"\n💬 生成回答:\n{'-'*50}") print(response) print("=" * 80) return result except Exception as e: print(f"❌ RAG流程执行失败: {e}") return { "query": query, "error": str(e), "timestamp": datetime.now().isoformat() }
执行结果展示
============================================================🚀 自适应RAG系统启动中...📚 基于阿里云通义千问 + text-embedding-v2🔍 支持智能查询分类和自适应检索策略============================================================🎉 欢迎使用自适应RAG系统!本系统将演示智能查询分类和自适应检索的强大功能。📖 使用文档: data/2888年Java程序员找工作最新场景题.pdf🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪 自适应RAG系统演示🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🎪🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥 演示查询 1 🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥📝 查询类型: 事实性查询 - 寻求明确定义❓ 查询内容: 什么是Spring框架?🎯 预期类型: Factual================================================================================🚀 自适应RAG系统开始运行📖 文档: data/2888年Java程序员找工作最新场景题.pdf❓ 查询: 什么是Spring框架?🔢 检索数量: 3========================================================================================== 📋 文档处理管道 ==========📝 开始处理文档: data/2888年Java程序员找工作最新场景题.pdf========== PDF文本提取 ==========📝 正在处理文件: data/2888年Java程序员找工作最新场景题.pdf 📄 已处理第 1 页,累计 626 字符 📄 已处理第 2 页,累计 1175 字符 📄 已处理第 3 页,累计 2084 字符 📄 已处理第 4 页,累计 2941 字符 📄 已处理第 5 页,累计 3702 字符 📄 已处理第 6 页,累计 4104 字符 📄 已处理第 7 页,累计 4458 字符 📄 已处理第 8 页,累计 5666 字符 📄 已处理第 9 页,累计 6200 字符 📄 已处理第 10 页,累计 6984 字符✅ 文本提取完成: 总页数: 10,总字符数: 6984========== 文本分块 ==========📝 分块大小: 1000, 重叠: 200✅ 分块完成: 共生成 9 个文本块 📝 块 1: 2024年Java程序员找工作最新面试攻略这个文档是帮助正在找工作以及准备找工作的同学,在面试之前去复习和突击的一种方式。适合已经在技术领域有一定积累,然后不确定面试切入点,所以可以通过这个面试... 📝 块 2: 无非就两个部分,投简历、面试!很多人把重心放在投简历上,忽略了准备面试的重要性,最后的结果是获得了面试机会但是在面试过程中被刷下来了。1.1怎么写简历着手准备的第一步聚焦于简历的打造。简历是... 📝 块 3: 涉及的行业如金融、电子商务,及特定业务如客户关系管理、支付系统等。 软实力与经验:团队管理规模、项目管理经验、个人特质等,简洁明了即可。确保简历内容与目标职位的技能要求对齐,以便快速建立匹配印...🗄️ 向量数据库初始化完成========== 🔄 批量向量化 ==========📝 为 9 个文本块生成向量 📦 处理批次 1/1========== 向量生成 ==========📝 正在为 9 条文本生成向量✅ 向量生成成功: 生成 9 个向量,维度: 1536========== 🗄️ 构建向量数据库 ==========📝 将文本块和向量添加到数据库 ✅ 已处理 9/9 个文本块✅ 文档处理完成: 成功处理 9 个文本块,构建向量数据库========== 🧠 自适应检索核心 ==========📝 开始处理查询: '什么是Spring框架?'========== 查询分类 ==========📝 正在分析查询: '什么是Spring框架?'✅ 查询分类完成: 类型: Factual🏷️ 查询被分类为: Factual📋 选择策略: 🎯 事实性检索策略 - 注重精确性和准确性========== 事实性检索策略 ==========📝 为查询 '什么是Spring框架?' 执行精确检索✅ 查询增强: Spring框架是什么及其主要功能========== 向量生成 ==========📝 正在为 1 条文本生成向量✅ 向量生成成功: 生成 1 个向量,维度: 1536📊 初始检索到 6 个候选结果✅ 事实性检索完成: 返回 3 个最相关结果✅ 自适应检索完成: 查询类型: Factual, 返回 3 个结果========== 查询分类 ==========📝 正在分析查询: '什么是Spring框架?'✅ 查询分类完成: 类型: Factual========== 📝 响应生成 ==========📝 基于 3 个文档生成 Factual 类型回答📚 组合上下文长度: 2640 字符✅ 响应生成成功: 生成了 180 字符的回答================================================================================✨ 自适应RAG处理完成================================================================================🏷️ 查询类型: Factual📊 检索文档: 3 个⏱️ 处理时间: 16.63 秒💬 生成回答:--------------------------------------------------提供的文档中并没有直接定义或解释Spring框架。Spring框架是一个用于构建企业级应用程序的开源Java平台,它提供了一种全面的编程和配置模型,支持广泛的企业应用开发需求。不过,请注意,该文档主要讨论了分布式架构、网络编程技能以及项目准备等方面的内容,并未具体介绍Spring框架。对于更详细的Spring框架信息,您可能需要查阅专门的技术资料或官方文档。================================================================================✅ 类型匹配: 预期 Factual vs 实际 Factual============================================================🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥 演示查询 2 🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥📝 查询类型: 分析性查询 - 需要全面分析❓ 查询内容: Java程序员在面试中如何展示自己的技术能力和项目经验?🎯 预期类型: Analytical================================================================================🚀 自适应RAG系统开始运行📖 文档: data/2888年Java程序员找工作最新场景题.pdf❓ 查询: Java程序员在面试中如何展示自己的技术能力和项目经验?🔢 检索数量: 3========================================================================================== 📋 文档处理管道 ==========📝 开始处理文档: data/2888年Java程序员找工作最新场景题.pdf========== PDF文本提取 ==========📝 正在处理文件: data/2888年Java程序员找工作最新场景题.pdf 📄 已处理第 1 页,累计 626 字符 📄 已处理第 2 页,累计 1175 字符 📄 已处理第 3 页,累计 2084 字符 📄 已处理第 4 页,累计 2941 字符 📄 已处理第 5 页,累计 3702 字符 📄 已处理第 6 页,累计 4104 字符 📄 已处理第 7 页,累计 4458 字符 📄 已处理第 8 页,累计 5666 字符 📄 已处理第 9 页,累计 6200 字符 📄 已处理第 10 页,累计 6984 字符✅ 文本提取完成: 总页数: 10,总字符数: 6984========== 文本分块 ==========📝 分块大小: 1000, 重叠: 200✅ 分块完成: 共生成 9 个文本块 📝 块 1: 2024年Java程序员找工作最新面试攻略这个文档是帮助正在找工作以及准备找工作的同学,在面试之前去复习和突击的一种方式。适合已经在技术领域有一定积累,然后不确定面试切入点,所以可以通过这个面试... 📝 块 2: 无非就两个部分,投简历、面试!很多人把重心放在投简历上,忽略了准备面试的重要性,最后的结果是获得了面试机会但是在面试过程中被刷下来了。1.1怎么写简历着手准备的第一步聚焦于简历的打造。简历是... 📝 块 3: 涉及的行业如金融、电子商务,及特定业务如客户关系管理、支付系统等。 软实力与经验:团队管理规模、项目管理经验、个人特质等,简洁明了即可。确保简历内容与目标职位的技能要求对齐,以便快速建立匹配印...🗄️ 向量数据库初始化完成========== 🔄 批量向量化 ==========📝 为 9 个文本块生成向量 📦 处理批次 1/1========== 向量生成 ==========📝 正在为 9 条文本生成向量✅ 向量生成成功: 生成 9 个向量,维度: 1536========== 🗄️ 构建向量数据库 ==========📝 将文本块和向量添加到数据库 ✅ 已处理 9/9 个文本块✅ 文档处理完成: 成功处理 9 个文本块,构建向量数据库========== 🧠 自适应检索核心 ==========📝 开始处理查询: 'Java程序员在面试中如何展示自己的技术能力和项目经验?'========== 查询分类 ==========📝 正在分析查询: 'Java程序员在面试中如何展示自己的技术能力和项目经验?'✅ 查询分类完成: 类型: Opinion🏷️ 查询被分类为: Opinion📋 选择策略: 💭 观点性检索策略 - 注重多样性和平衡性========== 观点性检索策略 ==========📝 为查询 'Java程序员在面试中如何展示自己的技术能力和项目经验?' 检索多元观点✅ 观点识别: 识别了 3 个不同观点 观点1: 从技术深度和广度展示个人技能,包括但不限于Java语言特性、框架使用及算法理解 观点2: 通过具体项目案例来说明解决实际问题的能力,强调在团队中的角色与贡献 观点3: 准备一些在线作品或开源贡献作为补充材料,以证明持续学习和技术热情🔍 检索观点 1: 从技术深度和广度展示个人技能,包括但不限于Java语言特性、框架使用及算法理解========== 向量生成 ==========📝 正在为 1 条文本生成向量✅ 向量生成成功: 生成 1 个向量,维度: 1536🔍 检索观点 2: 通过具体项目案例来说明解决实际问题的能力,强调在团队中的角色与贡献========== 向量生成 ==========📝 正在为 1 条文本生成向量✅ 向量生成成功: 生成 1 个向量,维度: 1536🔍 检索观点 3: 准备一些在线作品或开源贡献作为补充材料,以证明持续学习和技术热情========== 向量生成 ==========📝 正在为 1 条文本生成向量✅ 向量生成成功: 生成 1 个向量,维度: 1536✅ 观点性检索完成: 返回 3 个多元观点结果✅ 自适应检索完成: 查询类型: Opinion, 返回 3 个结果========== 查询分类 ==========📝 正在分析查询: 'Java程序员在面试中如何展示自己的技术能力和项目经验?'✅ 查询分类完成: 类型: Opinion========== 📝 响应生成 ==========📝 基于 3 个文档生成 Opinion 类型回答📚 组合上下文长度: 3056 字符✅ 响应生成成功: 生成了 1038 字符的回答================================================================================✨ 自适应RAG处理完成================================================================================🏷️ 查询类型: Opinion📊 检索文档: 3 个⏱️ 处理时间: 44.72 秒💬 生成回答:--------------------------------------------------根据提供的文档内容,Java程序员在面试中展示自己的技术能力和项目经验可以从以下几个方面入手:### 1. **体系化理解Java相关技术**- **观点**:求职者需要对Java相关的技术有一个全面而深入的理解。这不仅包括基础的语法和框架知识,还应涵盖微服务架构、云原生技术以及AI与大数据等前沿领域。- **依据**:随着软件开发行业的快速发展,企业对于Java程序员的要求也在不断提高。只有具备扎实的技术功底,才能在激烈的竞争中脱颖而出。- **局限性**:这种要求可能对那些刚入行或经验较少的开发者来说是一个挑战,因为他们可能还没有机会接触到这些高级主题。### 2. **准备具体的项目案例**- **观点**:通过具体项目的描述来展示个人的技术能力是一种非常有效的方式。选择一两个你参与过且具有代表性的项目进行详细介绍。- **依据**:现代面试越来越注重实际操作能力和解决问题的能力,而不是单纯的知识点记忆。因此,能够清晰地阐述自己如何在一个真实环境中应用所学知识是非常加分的。- **局限性**:如果求职者缺乏实际工作经验或者其经历中的项目不够复杂,则可能会在这方面显得较为薄弱。### 3. **展现技术深度思考**- **观点**:除了基本技能外,面试官还会考察候选人是否能够从更深层次去理解和分析问题。这意味着你需要能够解释为什么选择了某种解决方案,并讨论其他可能的选择及其优缺点。- **依据**:随着面试形式的变化,越来越多的企业开始重视候选人的思维逻辑和技术视野。能够展现出独立思考能力的人往往更容易获得青睐。- **局限性**:这对求职者的综合素质提出了较高要求,不仅考验他们的专业知识水平,还需要良好的沟通表达技巧。### 4. **优化简历以突出亮点**- **观点**:一份精心设计的简历可以帮助你在众多应聘者中迅速抓住HR的眼球。确保你的简历简洁明了,重点突出你的核心竞争力。- **依据**:简历是你给未来雇主的第一印象,一个好的开头可以为后续的面试打下良好基础。- **局限性**:虽然优秀的简历很重要,但最终能否成功入职还是取决于面试表现和个人实力。综上所述,为了更好地展示自己的技术能力和项目经验,Java程序员应该努力提升自身的专业素养,积累丰富的实战经验,并学会如何有效地传达这些信息给潜在雇主。同时也要注意保持谦逊的态度,持续学习新知,这样才能在这个快速变化的行业中立于不败之地。================================================================================⚠️ 类型匹配: 预期 Analytical vs 实际 Opinion============================================================🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥 演示查询 3 🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥📝 查询类型: 观点性查询 - 寻求多元观点❓ 查询内容: 现在Java开发市场这么卷,是不是应该转其他技术栈?🎯 预期类型: Opinion================================================================================🚀 自适应RAG系统开始运行📖 文档: data/2888年Java程序员找工作最新场景题.pdf❓ 查询: 现在Java开发市场这么卷,是不是应该转其他技术栈?🔢 检索数量: 3========================================================================================== 📋 文档处理管道 ==========📝 开始处理文档: data/2888年Java程序员找工作最新场景题.pdf========== PDF文本提取 ==========📝 正在处理文件: data/2888年Java程序员找工作最新场景题.pdf 📄 已处理第 1 页,累计 626 字符 📄 已处理第 2 页,累计 1175 字符 📄 已处理第 3 页,累计 2084 字符 📄 已处理第 4 页,累计 2941 字符 📄 已处理第 5 页,累计 3702 字符 📄 已处理第 6 页,累计 4104 字符 📄 已处理第 7 页,累计 4458 字符 📄 已处理第 8 页,累计 5666 字符 📄 已处理第 9 页,累计 6200 字符 📄 已处理第 10 页,累计 6984 字符✅ 文本提取完成: 总页数: 10,总字符数: 6984========== 文本分块 ==========📝 分块大小: 1000, 重叠: 200✅ 分块完成: 共生成 9 个文本块 📝 块 1: 2024年Java程序员找工作最新面试攻略这个文档是帮助正在找工作以及准备找工作的同学,在面试之前去复习和突击的一种方式。适合已经在技术领域有一定积累,然后不确定面试切入点,所以可以通过这个面试... 📝 块 2: 无非就两个部分,投简历、面试!很多人把重心放在投简历上,忽略了准备面试的重要性,最后的结果是获得了面试机会但是在面试过程中被刷下来了。1.1怎么写简历着手准备的第一步聚焦于简历的打造。简历是... 📝 块 3: 涉及的行业如金融、电子商务,及特定业务如客户关系管理、支付系统等。 软实力与经验:团队管理规模、项目管理经验、个人特质等,简洁明了即可。确保简历内容与目标职位的技能要求对齐,以便快速建立匹配印...🗄️ 向量数据库初始化完成========== 🔄 批量向量化 ==========📝 为 9 个文本块生成向量 📦 处理批次 1/1========== 向量生成 ==========📝 正在为 9 条文本生成向量✅ 向量生成成功: 生成 9 个向量,维度: 1536========== 🗄️ 构建向量数据库 ==========📝 将文本块和向量添加到数据库 ✅ 已处理 9/9 个文本块✅ 文档处理完成: 成功处理 9 个文本块,构建向量数据库========== 🧠 自适应检索核心 ==========📝 开始处理查询: '现在Java开发市场这么卷,是不是应该转其他技术栈?'========== 查询分类 ==========📝 正在分析查询: '现在Java开发市场这么卷,是不是应该转其他技术栈?'✅ 查询分类完成: 类型: Opinion🏷️ 查询被分类为: Opinion📋 选择策略: 💭 观点性检索策略 - 注重多样性和平衡性========== 观点性检索策略 ==========📝 为查询 '现在Java开发市场这么卷,是不是应该转其他技术栈?' 检索多元观点✅ 观点识别: 识别了 3 个不同观点 观点1: 坚持Java,提升个人技能以增强竞争力 观点2: 转向新兴技术栈,抓住新机遇 观点3: 根据个人兴趣和职业规划选择技术方向🔍 检索观点 1: 坚持Java,提升个人技能以增强竞争力========== 向量生成 ==========📝 正在为 1 条文本生成向量✅ 向量生成成功: 生成 1 个向量,维度: 1536🔍 检索观点 2: 转向新兴技术栈,抓住新机遇========== 向量生成 ==========📝 正在为 1 条文本生成向量✅ 向量生成成功: 生成 1 个向量,维度: 1536🔍 检索观点 3: 根据个人兴趣和职业规划选择技术方向========== 向量生成 ==========📝 正在为 1 条文本生成向量✅ 向量生成成功: 生成 1 个向量,维度: 1536✅ 观点性检索完成: 返回 3 个多元观点结果✅ 自适应检索完成: 查询类型: Opinion, 返回 3 个结果========== 查询分类 ==========📝 正在分析查询: '现在Java开发市场这么卷,是不是应该转其他技术栈?'✅ 查询分类完成: 类型: Opinion========== 📝 响应生成 ==========📝 基于 3 个文档生成 Opinion 类型回答📚 组合上下文长度: 3056 字符✅ 响应生成成功: 生成了 816 字符的回答================================================================================✨ 自适应RAG处理完成================================================================================🏷️ 查询类型: Opinion📊 检索文档: 3 个⏱️ 处理时间: 27.11 秒💬 生成回答:--------------------------------------------------关于是否应该从Java开发转向其他技术栈,可以从以下几个不同的观点来分析:### 观点1:继续深耕Java**依据**:- Java作为企业级应用开发的中流砥柱,其生态系统在不断进化。随着微服务架构、云原生技术和AI与大数据的深度融合,Java程序员的角色和技能需求也在迭代升级。- 通过系统化地学习和理解Java相关技术,可以更好地突出自己的综合能力,从而在面试中脱颖而出。**局限性**:- 随着市场内卷加剧,竞争压力增大,需要投入更多时间和精力来保持竞争力。- 如果个人对Java的兴趣或适应能力不足,可能会感到疲惫和挫败感。### 观点2:转到新兴技术栈**依据**:- 新兴技术如Go、Rust等在某些领域(例如高性能计算、区块链)展现出强大的潜力和发展前景。- 转向这些新兴技术可能意味着更少的竞争者和更多的机会,尤其是在初创公司或特定行业。**局限性**:- 学习新语言和技术栈需要时间成本,短期内可能会影响职业发展速度。- 新兴技术虽然有潜力,但市场需求相对较小且不稳定,存在一定的风险。### 观点3:多技术栈并行发展**依据**:- 在当前快速变化的技术环境中,掌握多种编程语言和技术栈可以使自己更加灵活,能够应对不同类型的项目需求。- 一些公司偏好招聘具备跨领域能力的人才,这有助于提高就业竞争力。**局限性**:- 同时学习多个技术栈可能导致每个领域的深度不够,影响专业水平。- 时间和精力有限的情况下,分散注意力可能不利于形成核心竞争力。### 总结选择是否从Java转向其他技术栈取决于个人兴趣、职业规划以及对未来趋势的判断。如果对Java依然充满热情,并愿意持续深入研究,则深耕Java仍然是一个不错的选择;反之,若希望探索新的可能性或者发现某个新兴领域特别吸引自己,那么适时转型也未尝不可。最重要的是找到适合自己的发展方向,并为之不懈努力。================================================================================✅ 类型匹配: 预期 Opinion vs 实际 Opinion============================================================🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥 演示查询 4 🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥🔥📝 查询类型: 上下文相关查询 - 特定身份背景❓ 查询内容: 作为刚毕业的计算机专业学生,应该如何准备Java开发岗位的面试?🎯 预期类型: Contextual🎪 用户上下文: 应届毕业生,计算机专业,目标Java开发岗位================================================================================🚀 自适应RAG系统开始运行📖 文档: data/2888年Java程序员找工作最新场景题.pdf❓ 查询: 作为刚毕业的计算机专业学生,应该如何准备Java开发岗位的面试?🔢 检索数量: 3🎯 用户上下文: 应届毕业生,计算机专业,目标Java开发岗位========================================================================================== 📋 文档处理管道 ==========📝 开始处理文档: data/2888年Java程序员找工作最新场景题.pdf========== PDF文本提取 ==========📝 正在处理文件: data/2888年Java程序员找工作最新场景题.pdf 📄 已处理第 1 页,累计 626 字符 📄 已处理第 2 页,累计 1175 字符 📄 已处理第 3 页,累计 2084 字符 📄 已处理第 4 页,累计 2941 字符 📄 已处理第 5 页,累计 3702 字符 📄 已处理第 6 页,累计 4104 字符 📄 已处理第 7 页,累计 4458 字符 📄 已处理第 8 页,累计 5666 字符 📄 已处理第 9 页,累计 6200 字符 📄 已处理第 10 页,累计 6984 字符✅ 文本提取完成: 总页数: 10,总字符数: 6984========== 文本分块 ==========📝 分块大小: 1000, 重叠: 200✅ 分块完成: 共生成 9 个文本块 📝 块 1: 2024年Java程序员找工作最新面试攻略这个文档是帮助正在找工作以及准备找工作的同学,在面试之前去复习和突击的一种方式。适合已经在技术领域有一定积累,然后不确定面试切入点,所以可以通过这个面试... 📝 块 2: 无非就两个部分,投简历、面试!很多人把重心放在投简历上,忽略了准备面试的重要性,最后的结果是获得了面试机会但是在面试过程中被刷下来了。1.1怎么写简历着手准备的第一步聚焦于简历的打造。简历是... 📝 块 3: 涉及的行业如金融、电子商务,及特定业务如客户关系管理、支付系统等。 软实力与经验:团队管理规模、项目管理经验、个人特质等,简洁明了即可。确保简历内容与目标职位的技能要求对齐,以便快速建立匹配印...🗄️ 向量数据库初始化完成========== 🔄 批量向量化 ==========📝 为 9 个文本块生成向量 📦 处理批次 1/1========== 向量生成 ==========📝 正在为 9 条文本生成向量✅ 向量生成成功: 生成 9 个向量,维度: 1536========== 🗄️ 构建向量数据库 ==========📝 将文本块和向量添加到数据库 ✅ 已处理 9/9 个文本块✅ 文档处理完成: 成功处理 9 个文本块,构建向量数据库========== 🧠 自适应检索核心 ==========📝 开始处理查询: '作为刚毕业的计算机专业学生,应该如何准备Java开发岗位的面试?'========== 查询分类 ==========📝 正在分析查询: '作为刚毕业的计算机专业学生,应该如何准备Java开发岗位的面试?'✅ 查询分类完成: 类型: Contextual🏷️ 查询被分类为: Contextual📋 选择策略: 🎪 上下文检索策略 - 注重情境相关性========== 上下文相关检索策略 ==========📝 为查询 '作为刚毕业的计算机专业学生,应该如何准备Java开发岗位的面试?' 执行上下文感知检索✅ 查询重构: 应届毕业生计算机专业如何准备Java开发岗位面试========== 向量生成 ==========📝 正在为 1 条文本生成向量✅ 向量生成成功: 生成 1 个向量,维度: 1536📊 基于上下文检索到 6 个候选结果✅ 上下文检索完成: 返回 3 个上下文相关结果✅ 自适应检索完成: 查询类型: Contextual, 返回 3 个结果========== 查询分类 ==========📝 正在分析查询: '作为刚毕业的计算机专业学生,应该如何准备Java开发岗位的面试?'✅ 查询分类完成: 类型: Contextual========== 📝 响应生成 ==========📝 基于 3 个文档生成 Contextual 类型回答📚 组合上下文长度: 3056 字符✅ 响应生成成功: 生成了 1428 字符的回答================================================================================✨ 自适应RAG处理完成================================================================================🏷️ 查询类型: Contextual📊 检索文档: 3 个⏱️ 处理时间: 47.40 秒💬 生成回答:--------------------------------------------------作为刚毕业的计算机专业学生,准备Java开发岗位的面试需要从多个方面入手。以下是一些具体的建议:### 1. **简历优化**- **突出项目经验**:即使你是应届毕业生,也可以通过课程项目、实习经历或个人项目来展示你的技术能力。确保在简历中详细描述这些项目的背景、你所承担的任务、使用的技术栈以及最终成果。- **技能列表**:列出你掌握的Java相关技术和工具,如JVM、Spring框架、数据库(MySQL, PostgreSQL等)、版本控制工具(Git)等。注意区分“精通”、“熟悉”和“了解”,避免夸大其词。- **简洁明了**:保持简历不超过两页A4纸,采用PDF格式以保证格式一致性。可以参考LinkedIn或Microsoft Office模板来美化外观。### 2. **技术知识储备**- **基础语法**:确保你对Java的基础语法非常熟悉,包括类、对象、继承、多态、接口等。- **进阶知识**:学习并发编程、NIO、JVM调优等进阶内容。理解Spring框架的基本原理,特别是Spring Boot和Spring Cloud。- **数据结构与算法**:这是面试中的常见考点,要能够熟练地实现常见的数据结构(如链表、树、图)和算法(如排序、查找)。- **网络编程**:理解TCP/IP协议的三次握手、四次挥手过程,Socket编程基础,以及I/O多路复用技术(select、poll、epoll)。- **数据库**:掌握SQL语言,了解关系型数据库(如MySQL)和NoSQL数据库(如Redis)的基本操作和优化技巧。### 3. **项目准备**- **STAR法则**:在准备项目介绍时,使用STAR法则(情境Situation、任务Task、行动Action、结果Result)来组织你的叙述,确保内容条理清晰。- **技术细节**:在描述项目时,加入丰富的技术细节,这会让你的故事更加真实可信。- **情感表达**:真挚的情感表达能传递你的热情和自豪感,但要确保情感源自真实的体验。- **反思与成长**:在叙述中穿插你的思考、教训总结及后续的改进措施,展现你的成长和成熟。### 4. **软技能**- **沟通能力**:面试不仅是技术考核,也是对你沟通能力的考察。练习清晰、有条理地表达自己的想法。- **团队合作**:如果你有团队项目经验,强调你在团队中的角色和贡献。- **解决问题的能力**:展示你在面对技术难题时的解决思路和方法。### 5. **模拟面试**- **自我介绍**:准备一段简短而有力的自我介绍,突出你的优势和为什么适合这个职位。- **常见问题**:准备一些常见的面试问题,如“你为什么选择Java?”、“你在项目中遇到的最大挑战是什么?”等。- **技术问答**:找一些在线资源或参加模拟面试,练习回答技术问题。LeetCode、HackerRank等平台上有许多编程题可以帮助你提升解题能力。### 6. **持续学习**- **阅读官方文档**:深入阅读Java官方文档和其他相关技术的官方文档,如Spring、Redis等。- **关注行业动态**:了解最新的技术趋势和发展,如微服务架构、云原生技术、AI与大数据等。通过以上这些步骤,你可以更全面地准备Java开发岗位的面试,提高你的竞争力。祝你面试成功!================================================================================✅ 类型匹配: 预期 Contextual vs 实际 Contextual============================================================🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊 演示完成!🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊🎊✨ 自适应RAG系统成功展示了: 1. 智能查询分类 - 自动识别查询类型 2. 自适应检索策略 - 针对不同类型的专门优化 3. 个性化响应生成 - 基于查询类型的定制回答 4. 详细过程追踪 - 每个步骤的透明输出🚀 系统已准备就绪,可以处理各种类型的查询!进程已结束,退出代码为 0
完整代码示例
# -*- coding: utf-8 -*-"""自适应RAG系统 - 基于阿里云通义千问和嵌入模型============================================本脚本实现了一个智能的自适应RAG(检索增强生成)系统,能够根据查询类型自动选择最合适的检索策略。系统特点:1. 智能查询分类:自动将查询分为事实性、分析性、观点性、上下文相关四种类型2. 自适应检索策略:针对不同查询类型采用专门优化的检索方法3. 阿里云大模型:集成通义千问和text-embedding-v2模型4. 详细日志输出:每个步骤都有详细的控制台输出,便于调试和了解过程查询类型及对应策略:- 事实性查询:注重精确性,使用查询增强和相关性评分- 分析性查询:注重全面性,使用子查询分解获得多角度信息 - 观点性查询:注重多样性,检索不同观点和立场- 上下文相关查询:结合用户上下文,提供个性化检索"""import osimport sysimport numpy as npimport jsonimport refrom datetime import datetimefrom typing import List, Dict, Any, Optional, Unionfrom PyPDF2 import PdfReaderfrom dashscope import Generation, TextEmbedding# ========== 配置部分 ==========class AdaptiveRAGConfig: """自适应RAG系统配置类""" # 阿里云API配置 API_KEY = "sk-fc6ad8ecef4b4xxxxx5225372f23" # 阿里云API密钥(请替换为实际密钥) LLM_MODEL = "qwen-max" # 通义千问主力模型 EMBEDDING_MODEL = "text-embedding-v2" # 阿里云嵌入模型 # 文档处理配置 CHUNK_SIZE = 1000 # 文档块大小 CHUNK_OVERLAP = 200 # 块间重叠大小 # 检索配置 DEFAULT_TOP_K = 4 # 默认检索数量 TEMPERATURE = 0.1 # 生成温度 # 文件路径 DEFAULT_PDF_PATH = "data/2888年Java程序员找工作最新场景题.pdf" # 默认PDF路径print("=" * 60)print("🚀 自适应RAG系统启动中...")print("📚 基于阿里云通义千问 + text-embedding-v2")print("🔍 支持智能查询分类和自适应检索策略")print("=" * 60)# ========== 辅助函数 ==========def print_step(step_name: str, description: str = ""): """打印步骤信息""" print(f"\n{'='*10} {step_name} {'='*10}") if description: print(f"📝 {description}")def print_result(result_type: str, content: str, max_length: int = 200): """打印结果信息""" truncated = content[:max_length] + "..." if len(content) > max_length else content print(f"✅ {result_type}: {truncated}")# ========== PDF文本提取模块 ==========def extract_text_from_pdf(pdf_path: str) -> str: """ 从PDF文件中提取全部文本内容 Args: pdf_path: PDF文件路径 Returns: 提取的文本内容 """ print_step("PDF文本提取", f"正在处理文件: {pdf_path}") if not os.path.exists(pdf_path): raise FileNotFoundError(f"PDF文件不存在: {pdf_path}") try: with open(pdf_path, 'rb') as file: reader = PdfReader(file) all_text = "" for page_num, page in enumerate(reader.pages, 1): page_text = page.extract_text() if page_text: all_text += page_text print(f" 📄 已处理第 {page_num} 页,累计 {len(all_text)} 字符") print_result("文本提取完成", f"总页数: {len(reader.pages)},总字符数: {len(all_text)}") return all_text except Exception as e: print(f"❌ PDF提取失败: {e}") raise# ========== 文本分块模块 ==========def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 200) -> List[str]: """ 将长文本分割为带重叠的小块,便于向量检索 Args: text: 原始长文本 chunk_size: 每块的字符数 overlap: 块间重叠字符数 Returns: 文本块列表 """ print_step("文本分块", f"分块大小: {chunk_size}, 重叠: {overlap}") if not text.strip(): print("⚠️ 输入文本为空") return [] chunks = [] start = 0 step = chunk_size - overlap while start < len(text): end = min(start + chunk_size, len(text)) chunk = text[start:end] chunks.append(chunk) if end == len(text): break start += step print_result("分块完成", f"共生成 {len(chunks)} 个文本块") for i, chunk in enumerate(chunks[:3]): # 显示前3块的预览 print(f" 📝 块 {i+1}: {chunk[:100]}...") return chunks# ========== 向量生成模块 ==========def create_embeddings(texts: Union[str, List[str]], model: str = AdaptiveRAGConfig.EMBEDDING_MODEL) -> Union[np.ndarray, List[np.ndarray]]: """ 使用阿里云嵌入模型生成文本向量 Args: texts: 单个文本或文本列表 model: 嵌入模型名称 Returns: 对应的向量或向量列表 """ # 统一处理为列表格式 is_single = isinstance(texts, str) text_list = [texts] if is_single else texts print_step("向量生成", f"正在为 {len(text_list)} 条文本生成向量") try: response = TextEmbedding.call( model=model, input=text_list, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: embeddings = [np.array(item['embedding']) for item in response.output['embeddings']] print_result("向量生成成功", f"生成 {len(embeddings)} 个向量,维度: {len(embeddings[0])}") return embeddings[0] if is_single else embeddings else: print(f"❌ 向量生成失败: {response.message}") # 返回零向量作为fallback fallback_dim = 1536 # text-embedding-v2的维度 fallback = [np.zeros(fallback_dim) for _ in text_list] return fallback[0] if is_single else fallback except Exception as e: print(f"❌ 向量生成异常: {e}") # 返回零向量作为fallback fallback_dim = 1536 fallback = [np.zeros(fallback_dim) for _ in text_list] return fallback[0] if is_single else fallback# ========== 简单向量数据库 ==========class SimpleVectorStore: """ 简单的向量存储和检索系统 支持向量相似度搜索和元数据管理 """ def __init__(self): """初始化向量存储""" self.vectors: List[np.ndarray] = [] # 存储向量 self.texts: List[str] = [] # 存储原始文本 self.metadata: List[Dict] = [] # 存储元数据 print("🗄️ 向量数据库初始化完成") def add_item(self, text: str, embedding: np.ndarray, metadata: Optional[Dict] = None): """ 向数据库添加一个项目 Args: text: 原始文本 embedding: 文本向量 metadata: 元数据字典 """ self.vectors.append(np.array(embedding)) self.texts.append(text) self.metadata.append(metadata or {}) def similarity_search(self, query_embedding: np.ndarray, k: int = 5, filter_func: Optional = None) -> List[Dict]: """ 基于向量相似度搜索最相关的文档 Args: query_embedding: 查询向量 k: 返回结果数量 filter_func: 过滤函数 Returns: 搜索结果列表,每个结果包含text, metadata, similarity """ if not self.vectors: return [] query_vector = np.array(query_embedding) similarities = [] for i, vector in enumerate(self.vectors): # 应用过滤器 if filter_func and not filter_func(self.metadata[i]): continue # 计算余弦相似度 similarity = np.dot(query_vector, vector) / (np.linalg.norm(query_vector) * np.linalg.norm(vector)) similarities.append((i, similarity)) # 按相似度降序排序 similarities.sort(key=lambda x: x[1], reverse=True) # 返回前k个结果 results = [] for i in range(min(k, len(similarities))): idx, score = similarities[i] results.append({ "text": self.texts[idx], "metadata": self.metadata[idx], "similarity": score }) return results# ========== 查询分类模块 ==========def classify_query(query: str, model: str = AdaptiveRAGConfig.LLM_MODEL) -> str: """ 使用LLM对查询进行智能分类 分类标准: - Factual: 寻求具体、可验证信息的查询 - Analytical: 需要全面分析或解释的查询 - Opinion: 关于主观事物或寻求多元观点的查询 - Contextual: 依赖用户特定背景的查询 Args: query: 用户查询 model: 使用的语言模型 Returns: 查询类型 (Factual/Analytical/Opinion/Contextual) """ print_step("查询分类", f"正在分析查询: '{query}'") system_prompt = """你是一个专业的查询分类专家。请将用户查询精确分类为以下四个类别之一:1. Factual(事实性): 寻求具体、可验证的信息或定义2. Analytical(分析性): 需要综合分析、解释复杂概念或机制 3. Opinion(观点性): 关于主观判断或需要多元视角的问题4. Contextual(上下文相关): 依赖特定用户背景或情境的问题只需返回类别名称,不要添加任何解释。""" user_prompt = f"请对以下查询进行分类: {query}" try: response = Generation.call( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: # 添加安全检查 - 修正阿里云API响应格式 if hasattr(response, 'output') and hasattr(response.output, 'text'): category = response.output.text.strip() # 验证返回的类别是否有效 valid_categories = ["Factual", "Analytical", "Opinion", "Contextual"] for valid in valid_categories: if valid in category: print_result("查询分类完成", f"类型: {valid}") return valid # 如果分类失败,默认为Factual print("⚠️ 分类结果无效,默认使用 Factual 类型") return "Factual" else: print("⚠️ 响应格式异常,默认使用 Factual 类型") return "Factual" else: print(f"❌ 分类请求失败: {response.message}") return "Factual" except Exception as e: print(f"❌ 分类异常: {e}") return "Factual"# ========== 自适应检索策略模块 ==========def factual_retrieval_strategy(query: str, vector_store: SimpleVectorStore, k: int = 4) -> List[Dict]: """ 事实性查询的检索策略 - 注重精确性 策略特点: 1. 使用LLM增强查询,提高精确度 2. 对检索结果进行相关性评分 3. 返回最相关的精确信息 Args: query: 用户查询 vector_store: 向量数据库 k: 返回结果数量 Returns: 检索结果列表 """ print_step("事实性检索策略", f"为查询 '{query}' 执行精确检索") # 使用LLM增强查询 system_prompt = """你是搜索查询优化专家。请将用户的事实性查询重新表述为更精确、更具体的搜索查询,重点关注关键实体及其关系。只返回优化后的查询,不要解释。""" user_prompt = f"优化这个事实性查询: {query}" try: response = Generation.call( model=AdaptiveRAGConfig.LLM_MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: if hasattr(response, 'output') and hasattr(response.output, 'text'): enhanced_query = response.output.text.strip() print_result("查询增强", enhanced_query) else: enhanced_query = query print("⚠️ 查询增强响应格式异常,使用原始查询") else: enhanced_query = query print("⚠️ 查询增强失败,使用原始查询") except: enhanced_query = query print("⚠️ 查询增强异常,使用原始查询") # 生成增强查询的向量并检索 query_embedding = create_embeddings(enhanced_query) initial_results = vector_store.similarity_search(query_embedding, k*2) print(f"📊 初始检索到 {len(initial_results)} 个候选结果") # 对结果进行相关性评分 ranked_results = [] for doc in initial_results: relevance_score = score_document_relevance(enhanced_query, doc["text"]) ranked_results.append({ "text": doc["text"], "metadata": doc["metadata"], "similarity": doc["similarity"], "relevance_score": relevance_score }) # 按相关性评分排序 ranked_results.sort(key=lambda x: x["relevance_score"], reverse=True) final_results = ranked_results[:k] print_result("事实性检索完成", f"返回 {len(final_results)} 个最相关结果") return final_resultsdef analytical_retrieval_strategy(query: str, vector_store: SimpleVectorStore, k: int = 4) -> List[Dict]: """ 分析性查询的检索策略 - 注重全面性 策略特点: 1. 生成多个子查询覆盖不同方面 2. 确保检索结果的多样性 3. 提供全面的信息覆盖 Args: query: 用户查询 vector_store: 向量数据库 k: 返回结果数量 Returns: 检索结果列表 """ print_step("分析性检索策略", f"为查询 '{query}' 执行全面检索") # 生成多个子查询 system_prompt = """你是分析问题专家。请为下面的分析性查询生成3个不同角度的子问题,这些子问题应该帮助全面理解主题的不同方面。每行一个问题,不要编号。""" user_prompt = f"为这个分析性查询生成子问题: {query}" try: response = Generation.call( model=AdaptiveRAGConfig.LLM_MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0.3, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: if hasattr(response, 'output') and hasattr(response.output, 'text'): sub_queries = [q.strip() for q in response.output.text.strip().split('\n') if q.strip()] print_result("子查询生成", f"生成了 {len(sub_queries)} 个子查询") for i, sq in enumerate(sub_queries, 1): print(f" {i}. {sq}") else: sub_queries = [query] print("⚠️ 子查询生成响应格式异常,使用原始查询") else: sub_queries = [query] print("⚠️ 子查询生成失败,使用原始查询") except: sub_queries = [query] print("⚠️ 子查询生成异常,使用原始查询") # 为每个子查询检索文档 all_results = [] for i, sub_query in enumerate(sub_queries, 1): print(f"🔍 执行子查询 {i}: {sub_query}") sub_query_embedding = create_embeddings(sub_query) results = vector_store.similarity_search(sub_query_embedding, k=2) all_results.extend(results) # 去重并确保多样性 unique_texts = set() diverse_results = [] for result in all_results: if result["text"] not in unique_texts: unique_texts.add(result["text"]) diverse_results.append(result) print(f"📊 去重后保留 {len(diverse_results)} 个不同结果") # 如果结果不足,用主查询补充 if len(diverse_results) < k: main_query_embedding = create_embeddings(query) main_results = vector_store.similarity_search(main_query_embedding, k=k) for result in main_results: if result["text"] not in unique_texts and len(diverse_results) < k: unique_texts.add(result["text"]) diverse_results.append(result) final_results = diverse_results[:k] print_result("分析性检索完成", f"返回 {len(final_results)} 个多样化结果") return final_resultsdef opinion_retrieval_strategy(query: str, vector_store: SimpleVectorStore, k: int = 4) -> List[Dict]: """ 观点性查询的检索策略 - 注重多样性 策略特点: 1. 识别不同的观点角度 2. 为每个观点检索代表性文档 3. 确保观点的平衡性 Args: query: 用户查询 vector_store: 向量数据库 k: 返回结果数量 Returns: 检索结果列表 """ print_step("观点性检索策略", f"为查询 '{query}' 检索多元观点") # 识别不同观点 system_prompt = """你是观点分析专家。请为下面的观点性问题识别3个不同的观点角度或立场。每行一个观点角度,不要编号,要简洁明确。""" user_prompt = f"识别这个问题的不同观点: {query}" try: response = Generation.call( model=AdaptiveRAGConfig.LLM_MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0.3, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: if hasattr(response, 'output') and hasattr(response.output, 'text'): viewpoints = [v.strip() for v in response.output.text.strip().split('\n') if v.strip()] print_result("观点识别", f"识别了 {len(viewpoints)} 个不同观点") for i, vp in enumerate(viewpoints, 1): print(f" 观点{i}: {vp}") else: viewpoints = [query] print("⚠️ 观点识别响应格式异常,使用原始查询") else: viewpoints = [query] print("⚠️ 观点识别失败,使用原始查询") except: viewpoints = [query] print("⚠️ 观点识别异常,使用原始查询") # 为每个观点检索文档 all_results = [] for i, viewpoint in enumerate(viewpoints, 1): combined_query = f"{query} {viewpoint}" print(f"🔍 检索观点 {i}: {viewpoint}") viewpoint_embedding = create_embeddings(combined_query) results = vector_store.similarity_search(viewpoint_embedding, k=2) # 标记观点来源 for result in results: result["viewpoint"] = viewpoint all_results.extend(results) # 确保每个观点至少有一个代表 selected_results = [] for viewpoint in viewpoints: viewpoint_docs = [r for r in all_results if r.get("viewpoint") == viewpoint] if viewpoint_docs: selected_results.append(viewpoint_docs[0]) # 用剩余高相似度文档填充 remaining_slots = k - len(selected_results) if remaining_slots > 0: remaining_docs = [r for r in all_results if r not in selected_results] remaining_docs.sort(key=lambda x: x["similarity"], reverse=True) selected_results.extend(remaining_docs[:remaining_slots]) final_results = selected_results[:k] print_result("观点性检索完成", f"返回 {len(final_results)} 个多元观点结果") return final_resultsdef contextual_retrieval_strategy(query: str, vector_store: SimpleVectorStore, k: int = 4, user_context: str = None) -> List[Dict]: """ 上下文相关查询的检索策略 - 注重上下文集成 策略特点: 1. 推断或使用提供的用户上下文 2. 将上下文融入查询重构 3. 考虑上下文相关性进行评分 Args: query: 用户查询 vector_store: 向量数据库 k: 返回结果数量 user_context: 用户上下文(可选) Returns: 检索结果列表 """ print_step("上下文相关检索策略", f"为查询 '{query}' 执行上下文感知检索") # 如果没有提供上下文,尝试推断 if not user_context: system_prompt = """你是上下文推断专家。请分析下面的查询,推断出什么样的背景信息或上下文对回答这个问题是重要的。返回简洁的上下文描述。""" user_prompt = f"推断这个查询的隐含上下文: {query}" try: response = Generation.call( model=AdaptiveRAGConfig.LLM_MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0.1, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: if hasattr(response, 'output') and hasattr(response.output, 'text'): user_context = response.output.text.strip() print_result("上下文推断", user_context) else: user_context = "通用技术背景" print("⚠️ 上下文推断响应格式异常,使用默认上下文") else: user_context = "通用技术背景" print("⚠️ 上下文推断失败,使用默认上下文") except: user_context = "通用技术背景" print("⚠️ 上下文推断异常,使用默认上下文") # 将上下文融入查询 system_prompt = """你是查询重构专家。请将用户查询和提供的上下文信息结合,生成一个更具体、更有针对性的搜索查询。只返回重构后的查询。""" user_prompt = f"""查询: {query}上下文: {user_context}请生成融入上下文的重构查询:""" try: response = Generation.call( model=AdaptiveRAGConfig.LLM_MODEL, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: if hasattr(response, 'output') and hasattr(response.output, 'text'): contextualized_query = response.output.text.strip() print_result("查询重构", contextualized_query) else: contextualized_query = query print("⚠️ 查询重构响应格式异常,使用原始查询") else: contextualized_query = query print("⚠️ 查询重构失败,使用原始查询") except: contextualized_query = query print("⚠️ 查询重构异常,使用原始查询") # 基于上下文化查询检索文档 query_embedding = create_embeddings(contextualized_query) initial_results = vector_store.similarity_search(query_embedding, k*2) print(f"📊 基于上下文检索到 {len(initial_results)} 个候选结果") # 按上下文相关性重新评分 ranked_results = [] for doc in initial_results: context_relevance = score_document_context_relevance(query, user_context, doc["text"]) ranked_results.append({ "text": doc["text"], "metadata": doc["metadata"], "similarity": doc["similarity"], "context_relevance": context_relevance }) # 按上下文相关性排序 ranked_results.sort(key=lambda x: x["context_relevance"], reverse=True) final_results = ranked_results[:k] print_result("上下文检索完成", f"返回 {len(final_results)} 个上下文相关结果") return final_results# ========== 文档评分辅助函数 ==========def score_document_relevance(query: str, document: str, model: str = AdaptiveRAGConfig.LLM_MODEL) -> float: """ 使用LLM评估文档与查询的相关性 Args: query: 用户查询 document: 文档内容 model: 评分模型 Returns: 相关性分数 (0-10) """ system_prompt = """你是文档相关性评估专家。请对文档与查询的相关性进行评分,评分标准:0-3分: 完全不相关或基本不相关4-6分: 有一定相关性但不够直接7-8分: 较为相关,能部分回答问题9-10分: 高度相关,能很好地回答问题只返回0-10之间的数字分数,不要解释。""" # 截断文档以避免超长 doc_preview = document[:1500] + "..." if len(document) > 1500 else document user_prompt = f"""查询: {query}文档: {doc_preview}相关性评分 (0-10):""" try: response = Generation.call( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: if hasattr(response, 'output') and hasattr(response.output, 'text'): score_text = response.output.text.strip() # 提取数字分数 match = re.search(r'(\d+(\.\d+)?)', score_text) if match: score = float(match.group(1)) return min(10, max(0, score)) # 确保分数在0-10范围内 else: return 5.0 else: return 5.0 else: return 5.0 except: return 5.0def score_document_context_relevance(query: str, context: str, document: str, model: str = AdaptiveRAGConfig.LLM_MODEL) -> float: """ 评估文档在特定上下文下与查询的相关性 Args: query: 用户查询 context: 用户上下文 document: 文档内容 model: 评分模型 Returns: 上下文相关性分数 (0-10) """ system_prompt = """你是文档上下文相关性评估专家。请评估文档在给定上下文下对查询的相关性,评分标准:0-3分: 在此上下文下完全不相关4-6分: 在此上下文下有一定相关性7-8分: 在此上下文下较为相关9-10分: 在此上下文下高度相关只返回0-10之间的数字分数,不要解释。""" # 截断文档 doc_preview = document[:1500] + "..." if len(document) > 1500 else document user_prompt = f"""查询: {query}上下文: {context}文档: {doc_preview}在此上下文下的相关性评分 (0-10):""" try: response = Generation.call( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=0, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: if hasattr(response, 'output') and hasattr(response.output, 'text'): score_text = response.output.text.strip() match = re.search(r'(\d+(\.\d+)?)', score_text) if match: score = float(match.group(1)) return min(10, max(0, score)) else: return 5.0 else: return 5.0 else: return 5.0 except: return 5.0# ========== 核心自适应检索器 ==========def adaptive_retrieval(query: str, vector_store: SimpleVectorStore, k: int = 4, user_context: str = None) -> List[Dict]: """ 自适应检索的核心函数 - 根据查询类型选择最合适的检索策略 这是整个自适应RAG系统的核心,它会: 1. 首先对查询进行智能分类 2. 根据分类结果选择相应的检索策略 3. 执行专门优化的检索过程 4. 返回最相关的文档 Args: query: 用户查询 vector_store: 向量数据库 k: 检索文档数量 user_context: 用户上下文(仅用于上下文相关查询) Returns: 检索到的文档列表 """ print_step("🧠 自适应检索核心", f"开始处理查询: '{query}'") # 第一步:查询分类 query_type = classify_query(query) print(f"🏷️ 查询被分类为: {query_type}") # 第二步:根据查询类型选择对应的检索策略 strategy_map = { "Factual": "🎯 事实性检索策略 - 注重精确性和准确性", "Analytical": "🔍 分析性检索策略 - 注重全面性和多角度覆盖", "Opinion": "💭 观点性检索策略 - 注重多样性和平衡性", "Contextual": "🎪 上下文检索策略 - 注重情境相关性" } print(f"📋 选择策略: {strategy_map.get(query_type, '默认策略')}") # 第三步:执行相应的检索策略 if query_type == "Factual": results = factual_retrieval_strategy(query, vector_store, k) elif query_type == "Analytical": results = analytical_retrieval_strategy(query, vector_store, k) elif query_type == "Opinion": results = opinion_retrieval_strategy(query, vector_store, k) elif query_type == "Contextual": results = contextual_retrieval_strategy(query, vector_store, k, user_context) else: # 默认使用事实性检索 print("⚠️ 未识别的查询类型,使用默认事实性检索策略") results = factual_retrieval_strategy(query, vector_store, k) print_result("自适应检索完成", f"查询类型: {query_type}, 返回 {len(results)} 个结果") return results# ========== 响应生成模块 ==========def generate_response(query: str, retrieved_docs: List[Dict], query_type: str, model: str = AdaptiveRAGConfig.LLM_MODEL) -> str: """ 基于检索结果和查询类型生成个性化响应 不同查询类型使用不同的回答风格: - 事实性:准确、简洁、权威 - 分析性:全面、深入、多角度 - 观点性:平衡、多元、客观 - 上下文相关:针对性、个性化 Args: query: 用户查询 retrieved_docs: 检索到的文档列表 query_type: 查询类型 model: 生成模型 Returns: 生成的响应 """ print_step("📝 响应生成", f"基于 {len(retrieved_docs)} 个文档生成 {query_type} 类型回答") # 组合检索到的文档内容 context = "\n\n" + "="*50 + "\n\n".join([doc["text"] for doc in retrieved_docs]) print(f"📚 组合上下文长度: {len(context)} 字符") # 根据查询类型定制系统提示 system_prompts = { "Factual": """你是一个专业的事实查询助手。请基于提供的文档准确回答用户问题。要求:- 回答要准确、简洁、权威- 重点关注事实信息和具体数据- 如果文档中没有相关信息,请明确说明- 避免推测或添加文档中没有的信息""", "Analytical": """你是一个专业的分析师。请基于提供的文档对用户问题进行全面分析。要求:- 提供深入、多角度的分析- 涵盖问题的不同方面和层面- 使用文档中的信息支撑你的分析- 指出文档可能未涵盖的分析角度""", "Opinion": """你是一个客观的观点分析师。请基于提供的文档呈现不同观点和立场。要求:- 公平呈现多元化观点- 避免偏向任何特定立场- 指出观点的依据和局限性- 如果文档观点单一,请明确说明""", "Contextual": """你是一个上下文感知的智能助手。请结合查询背景和文档信息提供针对性回答。要求:- 考虑查询的特定背景和情境- 提供个性化、针对性的建议- 将一般信息与具体情境结合- 如果需要更多上下文信息,请主动询问""" } system_prompt = system_prompts.get(query_type, system_prompts["Factual"]) # 构建用户提示 user_prompt = f"""参考文档:{context}用户问题:{query}请基于以上文档内容回答用户问题。""" try: response = Generation.call( model=model, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], temperature=AdaptiveRAGConfig.TEMPERATURE, api_key=AdaptiveRAGConfig.API_KEY ) if response.status_code == 200: if hasattr(response, 'output') and hasattr(response.output, 'text'): generated_response = response.output.text print_result("响应生成成功", f"生成了 {len(generated_response)} 字符的回答") return generated_response else: error_msg = "抱歉,生成回答时响应格式异常,请检查API配置" print("❌ 响应生成失败: 响应格式异常") return error_msg else: error_msg = f"抱歉,生成回答时出现问题:{response.message}" print(f"❌ 响应生成失败: {response.message}") return error_msg except Exception as e: error_msg = f"抱歉,生成回答时出现异常:{str(e)}" print(f"❌ 响应生成异常: {e}") return error_msg# ========== 文档处理管道 ==========def process_document(pdf_path: str, chunk_size: int = AdaptiveRAGConfig.CHUNK_SIZE, chunk_overlap: int = AdaptiveRAGConfig.CHUNK_OVERLAP) -> tuple: """ 完整的文档处理管道 流程: 1. 从PDF提取文本 2. 将文本分块 3. 为每个块生成向量 4. 构建向量数据库 Args: pdf_path: PDF文件路径 chunk_size: 分块大小 chunk_overlap: 块间重叠 Returns: (文本块列表, 向量数据库) """ print_step("📋 文档处理管道", f"开始处理文档: {pdf_path}") # 第一步:提取文本 extracted_text = extract_text_from_pdf(pdf_path) if not extracted_text: raise ValueError("PDF文档为空或无法提取文本") # 第二步:文本分块 text_chunks = chunk_text(extracted_text, chunk_size, chunk_overlap) if not text_chunks: raise ValueError("文本分块失败") # 第三步:初始化向量存储 vector_store = SimpleVectorStore() # 第四步:为所有块生成向量并存储 print_step("🔄 批量向量化", f"为 {len(text_chunks)} 个文本块生成向量") # 批量生成向量(如果chunks太多,可以分批处理) batch_size = 20 # 阿里云API建议批量大小 all_embeddings = [] for i in range(0, len(text_chunks), batch_size): batch_chunks = text_chunks[i:i+batch_size] print(f" 📦 处理批次 {i//batch_size + 1}/{(len(text_chunks)-1)//batch_size + 1}") batch_embeddings = create_embeddings(batch_chunks) all_embeddings.extend(batch_embeddings) # 第五步:构建向量数据库 print_step("🗄️ 构建向量数据库", "将文本块和向量添加到数据库") for i, (chunk, embedding) in enumerate(zip(text_chunks, all_embeddings)): metadata = { "chunk_id": i, "source": pdf_path, "chunk_size": len(chunk), "timestamp": datetime.now().isoformat() } vector_store.add_item(chunk, embedding, metadata) if (i + 1) % 10 == 0 or i == len(text_chunks) - 1: print(f" ✅ 已处理 {i + 1}/{len(text_chunks)} 个文本块") print_result("文档处理完成", f"成功处理 {len(text_chunks)} 个文本块,构建向量数据库") return text_chunks, vector_store# ========== 完整RAG流程 ==========def run_adaptive_rag(pdf_path: str, query: str, k: int = AdaptiveRAGConfig.DEFAULT_TOP_K, user_context: str = None) -> Dict[str, Any]: """ 运行完整的自适应RAG流程 这是系统的主入口,包含完整的处理流程: 1. 文档处理(提取、分块、向量化) 2. 查询分类 3. 自适应检索 4. 响应生成 5. 结果整理 Args: pdf_path: PDF文档路径 query: 用户查询 k: 检索文档数量 user_context: 用户上下文(可选) Returns: 包含完整结果的字典 """ print("=" * 80) print("🚀 自适应RAG系统开始运行") print(f"📖 文档: {pdf_path}") print(f"❓ 查询: {query}") print(f"🔢 检索数量: {k}") if user_context: print(f"🎯 用户上下文: {user_context}") print("=" * 80) start_time = datetime.now() try: # 步骤1:处理文档 chunks, vector_store = process_document(pdf_path) # 步骤2:执行自适应检索 retrieved_docs = adaptive_retrieval(query, vector_store, k, user_context) # 步骤3:查询分类(已在adaptive_retrieval中完成,这里重新获取用于结果记录) query_type = classify_query(query) # 步骤4:生成响应 response = generate_response(query, retrieved_docs, query_type) # 步骤5:整理结果 end_time = datetime.now() processing_time = (end_time - start_time).total_seconds() result = { "query": query, "query_type": query_type, "retrieved_documents": retrieved_docs, "response": response, "processing_time_seconds": processing_time, "document_chunks_count": len(chunks), "user_context": user_context, "timestamp": end_time.isoformat() } # 显示最终结果 print("\n" + "=" * 80) print("✨ 自适应RAG处理完成") print("=" * 80) print(f"🏷️ 查询类型: {query_type}") print(f"📊 检索文档: {len(retrieved_docs)} 个") print(f"⏱️ 处理时间: {processing_time:.2f} 秒") print(f"\n💬 生成回答:\n{'-'*50}") print(response) print("=" * 80) return result except Exception as e: print(f"❌ RAG流程执行失败: {e}") return { "query": query, "error": str(e), "timestamp": datetime.now().isoformat() }# ========== 测试和演示函数 ==========def run_demo_queries(pdf_path: str): """ 运行演示查询,展示不同类型查询的自适应处理效果 Args: pdf_path: PDF文档路径 """ print("\n" + "🎪" * 30) print(" 自适应RAG系统演示") print("🎪" * 30) # 定义测试查询(覆盖四种类型,适配Java程序员找工作场景题) demo_queries = [ { "query": "什么是Spring框架?", "expected_type": "Factual", "description": "事实性查询 - 寻求明确定义" }, { "query": "Java程序员在面试中如何展示自己的技术能力和项目经验?", "expected_type": "Analytical", "description": "分析性查询 - 需要全面分析" }, { "query": "现在Java开发市场这么卷,是不是应该转其他技术栈?", "expected_type": "Opinion", "description": "观点性查询 - 寻求多元观点" }, { "query": "作为刚毕业的计算机专业学生,应该如何准备Java开发岗位的面试?", "expected_type": "Contextual", "description": "上下文相关查询 - 特定身份背景", "context": "应届毕业生,计算机专业,目标Java开发岗位" } ] # 逐一测试查询 for i, demo in enumerate(demo_queries, 1): print(f"\n{'🔥' * 20} 演示查询 {i} {'🔥' * 20}") print(f"📝 查询类型: {demo['description']}") print(f"❓ 查询内容: {demo['query']}") print(f"🎯 预期类型: {demo['expected_type']}") user_context = demo.get("context") if user_context: print(f"🎪 用户上下文: {user_context}") # 运行RAG result = run_adaptive_rag( pdf_path=pdf_path, query=demo["query"], k=3, user_context=user_context ) # 验证分类准确性 if "error" not in result: actual_type = result["query_type"] type_match = "✅" if actual_type == demo["expected_type"] else "⚠️" print(f"\n{type_match} 类型匹配: 预期 {demo['expected_type']} vs 实际 {actual_type}") print(f"\n{'=' * 60}")# ========== 主函数 ==========def main(): """ 主函数 - 演示自适应RAG系统的完整功能 """ print("🎉 欢迎使用自适应RAG系统!") print("本系统将演示智能查询分类和自适应检索的强大功能。\n") # 检查PDF文件 pdf_path = AdaptiveRAGConfig.DEFAULT_PDF_PATH if not os.path.exists(pdf_path): print(f"❌ 找不到PDF文件: {pdf_path}") print("请确保PDF文件存在或修改配置中的路径。") return print(f"📖 使用文档: {pdf_path}") # 运行演示 try: run_demo_queries(pdf_path) print("\n" + "🎊" * 30) print(" 演示完成!") print("🎊" * 30) print("\n✨ 自适应RAG系统成功展示了:") print(" 1. 智能查询分类 - 自动识别查询类型") print(" 2. 自适应检索策略 - 针对不同类型的专门优化") print(" 3. 个性化响应生成 - 基于查询类型的定制回答") print(" 4. 详细过程追踪 - 每个步骤的透明输出") print("\n🚀 系统已准备就绪,可以处理各种类型的查询!") except Exception as e: print(f"❌ 系统演示过程中出现错误: {e}") print("请检查配置和依赖是否正确安装。")if __name__ == "__main__": main()