第五章 Agentic RAG:基于代理的RAG
5.1 理论
5.1 .1 Agentic RAG
Agentic RAG系统是一种包含多种检索代理(retrieval agents) 的系统,能够根据用户查询的需求调用相应的代理来获取所需数据。Agentic RAG 系统的入口通常是一个检索器路由模块(retriever router) ,其作用是为当前任务选择最合适的一个或多个检索器(retriever)。实现 Agentic RAG 系统的一种常见方法是利用大语言模型(LLM)的工具调用能力(有时称为函数调用,function calling)。
Agentic 系统的核心思想是:系统能够代表用户主动执行任务。Agentic RAG 系统通常需要包含以下几个基础组件:
- 检索器路由(Retriever Router) —— 一个接收用户问题并返回最适配的检索器(或多个检索器)的函数检索代理(Retriever Agents) —— 实际用于检索数据的检索模块,可用于回答用户问题答案评判器(Answer Critic) —— 一个接收检索结果并验证原始问题是否被正确回答的函数
5.2 实现
import requestsimport jsonimport randomfrom neo4j import GraphDatabasefrom typing import Any, List, Dict, Optionalimport re# Neo4j 数据库连接配置NEO4J_URI = "bolt://localhost:7687"NEO4J_USER = "neo4j"NEO4J_PASSWORD = "你的密码"# Ollama 本地大语言模型配置OLLAMA_BASE_URL = "http://localhost:11434"LLM_MODEL = "qwen3:32b"# 初始化 Neo4j 数据库连接neo4j_driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))def remove_think_tags(text: str) -> str: """ 从文本中移除 <think> 标签及其内容 """ pattern = r'<think>.*?</think>' cleaned_text = re.sub(pattern, '', text, flags=re.DOTALL) return cleaned_text.strip()def extract_json_from_response(response_text: str) -> Optional[str]: """ 从LLM响应中提取JSON内容,忽略<think>标签 """ cleaned_response = remove_think_tags(response_text) if not cleaned_response: print(f"[JSON解析] 清理后的响应为空") return None start_brace_index = cleaned_response.find('{') start_bracket_index = cleaned_response.find('[') start_index = -1 if start_brace_index != -1 and (start_bracket_index == -1 or start_brace_index < start_bracket_index): start_index = start_brace_index elif start_bracket_index != -1: start_index = start_bracket_index if start_index != -1: potential_json = cleaned_response[start_index:].strip() if potential_json: try: parsed = json.loads(potential_json) if isinstance(parsed, (dict, list)): return potential_json except json.JSONDecodeError as e: print(f"[JSON解析] JSON验证失败: {e}") print(f"[JSON解析] 无法提取有效的JSON内容") return Nonedef call_local_llm(prompt: str, model: str = LLM_MODEL) -> str: """ 调用本地 Ollama 大语言模型 """ payload = { "model": model, "prompt": prompt, "stream": False } try: response = requests.post(f"{OLLAMA_BASE_URL}/api/generate", json=payload) response.raise_for_status() return response.json()["response"].strip() except Exception as e: print(f"调用 LLM 失败: {e}") return ""def execute_cypher_query(cypher: str) -> List[Dict]: """ 执行 Cypher 查询并返回结果 """ try: with neo4j_driver.session() as session: result = session.run(cypher) return [dict(record) for record in result] except Exception as e: print(f"执行 Cypher 查询失败: {e}") return []def serialize_neo4j_result(records): """ 将 Neo4j 查询结果转换为可 JSON 序列化的格式 """ def node_to_dict(node): return dict(node) def value_to_serializable(val): if hasattr(val, '_properties'): # 是 Node 或 Relationship return node_to_dict(val) elif isinstance(val, list): return [value_to_serializable(item) for item in val] else: return val serialized = [] for record in records: serialized_record = {} for key in record.keys(): serialized_record[key] = value_to_serializable(record[key]) serialized.append(serialized_record) return serializeddef generate_cypher_from_question(question: str) -> str: """ 根据自然语言问题生成 Cypher 查询语句 """ prompt = f"""你是一个专业的 Cypher 查询生成器。请根据以下图数据库结构,将自然语言问题转换为正确的 Cypher 查询。图数据库结构:- Movie 节点:属性 title (电影标题)- Person 节点:属性 name (人物姓名)- Country 节点:属性 name (国家名称) - Reviewer 节点:属性 name (影评人姓名)关系类型:- [:ACTED_IN] - 演员与电影的关系- [:DIRECTED] - 导演与电影的关系- [:PRODUCED_IN] - 电影与制作国家的关系- [:REVIEWED] - 影评人与电影的关系,包含 score 属性表示评分问题:{question}请返回一个有效的 Cypher 查询语句,只返回查询语句,不要其他内容,查询语句必须写在一行内:""" result = call_local_llm(prompt) print(f"[LLM原始响应 - Cypher生成] {result}") # 移除<think>标签并提取查询语句 cleaned_result = remove_think_tags(result) # 简单提取 Cypher 查询语句 lines = cleaned_result.split('\n') for line in lines: line = line.strip() if line.upper().startswith('MATCH') or line.upper().startswith('RETURN'): return line return cleaned_result.strip()# 检索器工具定义def text2cypher_retriever(question: str) -> List[Dict]: """ 通用文本转 Cypher 检索器 """ print(f"[检索器] 使用通用 text2cypher 检索器处理问题: {question}") # 第一步:生成 Cypher 查询 cypher = generate_cypher_from_question(question) print(f"[检索器] 生成的 Cypher 查询: {cypher}") # 第二步:执行查询 results = execute_cypher_query(cypher) print(f"[检索器] 查询返回 {len(results)} 条结果") return resultsdef movie_title_retriever(title: str) -> List[Dict]: """ 根据电影标题检索电影信息的专用检索器 """ print(f"[检索器] 使用电影标题检索器查找: {title}") query = """ MATCH (m:Movie) WHERE toLower(m.title) CONTAINS $title OPTIONAL MATCH (m)<-[:ACTED_IN]-(a:Person) OPTIONAL MATCH (m)<-[:DIRECTED]-(d:Person) RETURN m AS movie, collect(a.name) AS cast, collect(d.name) AS directors """ try: with neo4j_driver.session() as session: result = session.run(query, title=title.lower()) results = [] for record in result: movie_node = record["movie"] movie_dict = { "title": movie_node["title"], "year": movie_node.get("year"), } results.append({ "movie": movie_dict, "cast": list(record["cast"]), "directors": list(record["directors"]) }) print(f"[检索器] 找到 {len(results)} 部相关电影") return results except Exception as e: print(f"[检索器] 电影标题检索失败: {e}") return []def actor_movies_retriever(actor: str) -> List[Dict]: """ 根据演员姓名检索相关电影信息的专用检索器 """ print(f"[检索器] 使用演员检索器查找: {actor}") query = """ MATCH (a:Person)-[:ACTED_IN]->(m:Movie) WHERE toLower(a.name) CONTAINS $actor OPTIONAL MATCH (m)<-[:ACTED_IN]-(co_actor:Person) OPTIONAL MATCH (m)<-[:DIRECTED]-(d:Person) RETURN m AS movie, collect(co_actor.name) AS cast, collect(d.name) AS directors """ try: with neo4j_driver.session() as session: result = session.run(query, actor=actor.lower()) results = [] for record in result: movie_node = record["movie"] movie_dict = { "title": movie_node["title"], "year": movie_node.get("year"), } results.append({ "movie": movie_dict, "cast": list(record["cast"]), "directors": list(record["directors"]) }) print(f"[检索器] 找到 {len(results)} 部相关电影") return results except Exception as e: print(f"[检索器] 演员检索失败: {e}") return []def direct_answer_retriever(answer: str) -> str: """ 直接答案检索器 """ print(f"[检索器] 使用直接答案检索器: {answer}") return answer# 检索器工具注册表RETRIEVER_TOOLS = { "text2cypher": { "description": "通过用户问题查询数据库。当其他工具不适用时,作为默认选项使用。", "function": text2cypher_retriever, "parameters": ["question"] }, "movie_info_by_title": { "description": "通过提供电影标题获取电影信息,包括演员和导演", "function": movie_title_retriever, "parameters": ["title"] }, "movies_info_by_actor": { "description": "通过提供演员姓名获取该演员参演的电影信息", "function": actor_movies_retriever, "parameters": ["actor"] }, "answer_given": { "description": "若问题的完整答案已存在于对话中,请使用此工具提取答案", "function": direct_answer_retriever, "parameters": ["answer"] }}def choose_retriever_tool(question: str, conversation_history: List[Dict]) -> Dict: """ 根据问题选择最合适的检索器工具 """ print(f"[路由器] 为问题选择合适的检索器: {question}") prompt = f"""你需要为用户问题选择最合适的检索工具。可用工具:""" for tool_name, tool_info in RETRIEVER_TOOLS.items(): prompt += f"- {tool_name}: {tool_info['description']}\n" prompt += f"""用户问题: {question}请选择最合适的工具并提供参数,按以下JSON格式返回:{{"function": {{"name": "工具名", "arguments": "{{\"参数名\": \"参数值\"}}"}}}}只返回JSON,不要其他内容:""" response = call_local_llm(prompt) print(f"[LLM原始响应 - 工具选择] {response}") json_str = extract_json_from_response(response) if json_str: try: tool_choice = json.loads(json_str) tool_name = tool_choice["function"]["name"] arguments = json.loads(tool_choice["function"]["arguments"]) print(f"[路由器] 选择工具: {tool_name}, 参数: {arguments}") return {"tool": tool_name, "arguments": arguments} except json.JSONDecodeError as e: print(f"[路由器] JSON 解析失败: {e}") else: print(f"[路由器] 无法提取JSON,使用默认工具") return {"tool": "text2cypher", "arguments": {"question": question}}def execute_retriever_tool(tool_name: str, arguments: Dict) -> Any: """ 执行指定的检索器工具 """ if tool_name not in RETRIEVER_TOOLS: print(f"[执行器] 未知工具: {tool_name},使用默认工具") tool_name = "text2cypher" arguments = {"question": str(arguments)} tool_info = RETRIEVER_TOOLS[tool_name] function = tool_info["function"] try: print(f"[执行器] 执行工具: {tool_name}") result = function(**arguments) return result except Exception as e: print(f"[执行器] 工具执行失败: {e}") return []def update_question_with_context(original_question: str, conversation_history: List[Dict]) -> str: """ 根据对话历史优化和更新问题 """ if not conversation_history: return original_question print(f"[查询更新器] 根据上下文优化问题: {original_question}") context = "" for entry in conversation_history: if entry.get("role") == "assistant": context += f"之前的回答: {entry.get('content', '')}\n" prompt = f"""你是问题优化专家。根据对话历史,将问题更新为更完整、具体且易于回答的形式。规则:1. 只在必要时修改问题2. 利用之前的答案补充缺失信息3. 不要超出原问题的范围4. 如果原问题已经足够清楚,保持不变对话历史:{context}原始问题: {original_question}请返回优化后的问题,只返回问题文本:""" updated_question = call_local_llm(prompt) print(f"[LLM原始响应 - 问题优化] {updated_question}") cleaned_question = remove_think_tags(updated_question) if cleaned_question and cleaned_question.strip(): print(f"[查询更新器] 问题已优化为: {cleaned_question}") return cleaned_question.strip() else: print(f"[查询更新器] 保持原问题不变") return original_questiondef evaluate_answer_completeness(original_question: str, conversation_history: List[Dict]) -> Dict[str, Any]: """ 评估答案是否完整 """ print(f"[答案评判器] 评估问题答案的完整性: {original_question}") context = "" for entry in conversation_history: if entry.get("role") == "assistant": context += f"已获得的信息: {entry.get('content', '')}\n" prompt = f"""你是答案完整性评估专家。判断原始问题是否已被完全回答。原始问题: {original_question}已获得的信息:{context}请严格按照以下JSON格式返回结果:如果信息充分回答了问题,返回: {{"success": true, "follow_up_questions": []}}如果信息不够,生成需要补充的问题列表,格式: {{"success": false, "follow_up_questions": ["补充问题1", "补充问题2"]}}只返回JSON,不要其他内容:""" response = call_local_llm(prompt) print(f"[LLM原始响应 - 答案评估] {response}") json_str = extract_json_from_response(response) if json_str: try: result = json.loads(json_str) if isinstance(result, dict) and "success" in result and "follow_up_questions" in result: success = result["success"] follow_up_questions = result["follow_up_questions"] print(f"[答案评判器] 评估结果: Success={success}, Follow-up Questions={len(follow_up_questions)}") return result except json.JSONDecodeError as e: print(f"[答案评判器] JSON解析失败: {e}") else: print(f"[答案评判器] 无法提取JSON") print(f"[答案评判器] 使用默认评估结果: Success=False") return {"success": False, "follow_up_questions": [original_question]}def process_supplemental_questions(original_question: str, follow_up_questions: List[str], conversation_history: List[Dict]) -> List[Dict]: """ 处理补充问题列表 """ print(f"[补充检索] 开始处理 {len(follow_up_questions)} 个补充问题") for i, question in enumerate(follow_up_questions, 1): print(f"\n[补充检索] 处理补充问题 {i}/{len(follow_up_questions)}: {question}") conversation_history = process_single_question(question, conversation_history) return conversation_historydef process_single_question(question: str, conversation_history: List[Dict] = None) -> List[Dict]: """ 处理单个问题 """ if conversation_history is None: conversation_history = [] print(f"[主流程] 开始处理问题: {question}") optimized_question = update_question_with_context(question, conversation_history) tool_choice = choose_retriever_tool(optimized_question, conversation_history) retrieval_result = execute_retriever_tool(tool_choice["tool"], tool_choice["arguments"]) # 确保结果是可序列化的 if isinstance(retrieval_result, list): # 如果是列表,尝试序列化每个元素 serialized_results = [] for item in retrieval_result: if isinstance(item, dict): serialized_results.append(item) else: serialized_results.append(str(item)) retrieval_result = serialized_results elif not isinstance(retrieval_result, (str, int, float, bool, type(None))): # 如果不是基本类型,转换为字符串 retrieval_result = str(retrieval_result) result_text = f"针对问题:'{optimized_question}',使用工具 {tool_choice['tool']} 获得结果:{json.dumps(retrieval_result, ensure_ascii=False)}" conversation_history.append({ "role": "assistant", "content": result_text }) print(f"[主流程] 问题处理完成") return conversation_historydef generate_final_answer(original_question: str, conversation_history: List[Dict]) -> str: """ 基于检索到的信息生成最终答案 """ print(f"[答案生成器] 基于检索信息生成最终答案") context = "" for entry in conversation_history: if entry.get("role") == "assistant": context += f"{entry.get('content', '')}\n" prompt = f"""你是一个知识助手,需要基于提供的信息回答用户问题。重要规则:1. 只能使用提供的信息回答问题2. 如果信息不足,明确说明缺少什么信息3. 不能编造或猜测信息4. 回答要简洁明了用户问题: {original_question}可用信息:{context}请基于以上信息回答用户问题:""" final_answer = call_local_llm(prompt) print(f"[LLM原始响应 - 最终答案] {final_answer}") print(f"[答案生成器] 最终答案生成完成") return final_answerdef answer_question(user_question: str) -> str: """ 回答用户问题的主入口函数 """ print(f"\n开始处理用户问题: {user_question}") print(f"{'=' * 60}") print(f"\n第一阶段:初始信息检索") conversation_history = process_single_question(user_question) print(f"\n第二阶段:答案完整性评估") evaluation_result = evaluate_answer_completeness(user_question, conversation_history) if not evaluation_result.get("success", False): follow_up_questions = evaluation_result.get("follow_up_questions", []) if follow_up_questions: print(f"\n第三阶段:补充信息检索") conversation_history = process_supplemental_questions(user_question, follow_up_questions, conversation_history) else: print(f"\n第三阶段:跳过,评判器未提供补充问题但标记为不成功") else: print(f"\n第三阶段:跳过,答案已完整") print(f"\n第四阶段:生成最终答案") final_answer = generate_final_answer(user_question, conversation_history) return final_answerdef clear_database_small(): """ 清空 Neo4j 数据库中所有节点和关系 """ with neo4j_driver.session() as session: session.run("MATCH (n) DETACH DELETE n") print("数据库已清空")def create_constraints_small(): """ 创建示范用唯一性约束 """ with neo4j_driver.session() as session: session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (m:Movie) REQUIRE m.title IS UNIQUE") session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (p:Person) REQUIRE p.name IS UNIQUE") session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (r:Reviewer) REQUIRE r.name IS UNIQUE") session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (c:Country) REQUIRE c.name IS UNIQUE") print("唯一性约束已创建")def populate_small_demo_data(): """ 写入演示数据 """ movies = [ {"title": "侏罗纪公园", "year": 1993, "country": "美国"}, {"title": "辛德勒的名单", "year": 1993, "country": "美国"}, {"title": "拯救大兵瑞恩", "year": 1998, "country": "美国"}, ] director = "史蒂文·斯皮尔伯格" reviewers = ["Alice", "Bob", "Charlie"] actors = ["山姆·尼尔", "杰夫·高布伦", "朱莉安·摩尔", "连姆·尼森", "本·金斯利", "汤姆·汉克斯", "马特·达蒙"] with neo4j_driver.session() as session: # 国家 session.run("MERGE (:Country {name: $name})", name="美国") # 导演 session.run("MERGE (:Person {name: $name})", name=director) # 电影及 PRODUCED_IN、DIRECTED for m in movies: session.run("MERGE (:Movie {title: $title, year: $year})", title=m["title"], year=m["year"]) session.run( """ MATCH (mov:Movie {title: $title}) MATCH (c:Country {name: $country}) MERGE (mov)-[:PRODUCED_IN]->(c) """, title=m["title"], country=m["country"], ) session.run( """ MATCH (d:Person {name: $director}) MATCH (mov:Movie {title: $title}) MERGE (d)-[:DIRECTED]->(mov) """, director=director, title=m["title"], ) # 评论者与评分 for name in reviewers: session.run("MERGE (:Reviewer {name: $name})", name=name) for name in reviewers: sampled = random.sample(movies, k=2) for m in sampled: score = round(random.gauss(8.2, 0.8), 1) score = max(1.0, min(10.0, score)) session.run( """ MATCH (r:Reviewer {name: $name}) MATCH (mov:Movie {title: $title}) MERGE (r)-[rev:REVIEWED]->(mov) SET rev.score = $score """, name=name, title=m["title"], score=score, ) # 少量演员与参演 for a in actors: session.run("MERGE (:Person {name: $name})", name=a) movie_actors = { "侏罗纪公园": ["山姆·尼尔", "杰夫·高布伦"], "辛德勒的名单": ["连姆·尼森", "本·金斯利"], "拯救大兵瑞恩": ["汤姆·汉克斯", "马特·达蒙"], } for title, cast in movie_actors.items(): for a in cast: session.run( """ MATCH (p:Person {name: $actor}) MATCH (m:Movie {title: $title}) MERGE (p)-[:ACTED_IN]->(m) """, actor=a, title=title, ) print("小规模演示数据已写入")def main(): """主程序入口""" clear_database_small() create_constraints_small() populate_small_demo_data() # 测试问题 test_question = "侏罗纪公园的导演是谁?" try: # 调用主处理函数 answer = answer_question(test_question) # 输出结果 print(f"\n最终结果:") print(f"问题: {test_question}") print(f"答案: {answer}") except Exception as e: print(f"处理问题时发生错误: {e}") import traceback traceback.print_exc() finally: # 关闭数据库连接 print(f"\n关闭数据库连接") neo4j_driver.close()if __name__ == "__main__": main()
数据库已清空唯一性约束已创建小规模演示数据已写入开始处理用户问题: 侏罗纪公园的导演是谁?============================================================第一阶段:初始信息检索[主流程] 开始处理问题: 侏罗纪公园的导演是谁?[路由器] 为问题选择合适的检索器: 侏罗纪公园的导演是谁?[LLM原始响应 - 工具选择] <think>好的,用户问的是“侏罗纪公园的导演是谁?”。我需要选择合适的工具来回答这个问题。首先看可用的工具,有text2cypher、movie_info_by_title、movies_info_by_actor和answer_given。用户的问题是关于电影《侏罗纪公园》的导演,所以应该使用与电影信息相关的工具。movie_info_by_title这个工具是通过电影标题获取信息的,包括演员和导演,所以这个应该是最合适的。而movies_info_by_actor是通过演员来找电影的,这里不需要。text2cypher可能需要查询数据库,但如果有直接获取信息的工具,应该优先用那个。answer_given这里显然问题没有被回答过,所以不需要用。所以应该选movie_info_by_title,参数是电影标题“侏罗纪公园”。这样就能得到导演的信息了。</think>{"function": {"name": "movie_info_by_title", "arguments": "{"title": "侏罗纪公园"}"}}[路由器] 选择工具: movie_info_by_title, 参数: {'title': '侏罗纪公园'}[执行器] 执行工具: movie_info_by_title[检索器] 使用电影标题检索器查找: 侏罗纪公园[检索器] 找到 1 部相关电影[主流程] 问题处理完成第二阶段:答案完整性评估[答案评判器] 评估问题答案的完整性: 侏罗纪公园的导演是谁?[LLM原始响应 - 答案评估] <think>好的,我需要评估用户的问题“侏罗纪公园的导演是谁?”是否已经被完全回答。根据提供的已获得信息,使用工具movie_info_by_title得到了结果,显示导演是史蒂文·斯皮尔伯格,而且有两个条目,但都是同一个人。这说明信息已经明确给出了导演的名字,没有矛盾或缺失。因此,问题已经充分回答,不需要补充问题。返回success为true,follow_up_questions为空列表。</think>{"success": true, "follow_up_questions": []}[答案评判器] 评估结果: Success=True, Follow-up Questions=0第三阶段:跳过,答案已完整第四阶段:生成最终答案[答案生成器] 基于检索信息生成最终答案[LLM原始响应 - 最终答案] <think>好的,用户问的是侏罗纪公园的导演是谁。我需要先检查提供的可用信息。根据工具movie_info_by_title返回的结果,电影侏罗纪公园的导演列表中有两个“史蒂文·斯皮尔伯格”。这可能是个重复,但根据常识,侏罗纪公园确实是史蒂文·斯皮尔伯格导演的。不过按照规则,只能使用提供的信息,所以需要确认信息是否正确。这里虽然导演名字重复了,但明显是同一个导演,所以应该回答史蒂文·斯皮尔伯格。同时,确保没有使用其他信息来源,只基于给定的数据。用户的问题已经得到明确答案,不需要额外信息。</think>侏罗纪公园的导演是史蒂文·斯皮尔伯格。[答案生成器] 最终答案生成完成最终结果:问题: 侏罗纪公园的导演是谁?答案: <think>好的,用户问的是侏罗纪公园的导演是谁。我需要先检查提供的可用信息。根据工具movie_info_by_title返回的结果,电影侏罗纪公园的导演列表中有两个“史蒂文·斯皮尔伯格”。这可能是个重复,但根据常识,侏罗纪公园确实是史蒂文·斯皮尔伯格导演的。不过按照规则,只能使用提供的信息,所以需要确认信息是否正确。这里虽然导演名字重复了,但明显是同一个导演,所以应该回答史蒂文·斯皮尔伯格。同时,确保没有使用其他信息来源,只基于给定的数据。用户的问题已经得到明确答案,不需要额外信息。</think>侏罗纪公园的导演是史蒂文·斯皮尔伯格。关闭数据库连接进程已结束,退出代码为 0
flowchart TD A[开始处理用户问题] --> B[第一阶段: 初始信息检索] B --> C[优化问题上下文] C --> D[选择合适的检索器工具] D --> E[执行检索器工具] E --> F[将结果添加到对话历史] F --> G[第二阶段: 答案完整性评估] G --> H{答案是否完整?} H -->|否| I[第三阶段: 补充信息检索] H -->|是| J[跳过补充检索] I --> K[处理补充问题列表] K --> L[递归调用问题处理流程] L --> M[第四阶段: 生成最终答案] J --> M M --> N[基于检索信息生成自然语言答案] N --> O[返回最终答案] subgraph 第一阶段 C D E F end subgraph 第二阶段 G H end subgraph 第三阶段 I K L end subgraph 第四阶段 M N O end style A fill:#e1f5fe style B fill:#f3e5f5 style G fill:#f3e5f5 style I fill:#f3e5f5 style M fill:#f3e5f5 style O fill:#e8f5e8