掘金 人工智能 07月26日 12:00
学习 RAGFlow 的知识图谱功能
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文深入介绍了 RAGFlow 的知识图谱(GraphRAG)功能,该功能可在 v0.16.0 版本后启用,用于构建知识库的分块。知识图谱在处理嵌套逻辑和复杂实体关系的多跳问答场景中表现出色,优于传统抽取方法。文章详细阐述了 GraphRAG 的实现逻辑,包括子图构建、合并、实体消歧和社区报告生成。它支持通用(general)和轻量(light)两种图谱构建方法,并提供了实体类型、实体消歧和社区报告生成等可配置项。文章还解释了实体消歧的提示词设计以及社区报告的生成流程和内容结构。最后,文章强调了知识图谱功能在提升问答能力方面的优势,但也提醒用户其资源消耗较大,建议在测试集上验证效果后再决定是否启用。

🚀 **知识图谱构建流程**:RAGFlow 的知识图谱功能在数据抽取和索引之间构建,首先检索原始分块,然后使用 LightKGExt 或 GeneralKGExt 生成子图,接着将子图合并到全局知识图谱中,最后可选择进行实体消歧和社区报告生成。此过程在处理复杂文档时能显著提升问答效果。

🛠️ **GraphRAG 配置选项**:用户可根据需求配置实体类型(如组织、人物、事件),选择构建方法(通用或轻量,后者资源消耗更少),以及是否启用实体消歧(用于合并同义实体)和社区报告生成(为图谱中的实体簇生成摘要)。

🔗 **子图构建与合并**:子图通过 NetworkX 库创建,提取的实体和关系被表示为节点和边,并序列化为 JSON 存储。随后,文档级别的子图被合并到全局知识图谱中,过程中会更新节点和边的属性,并计算 PageRank 值来衡量实体的重要性。

🧠 **实体消歧与社区报告**:实体消歧通过大模型对相似实体进行判断,将同义实体合并,提高图谱的准确性和简洁性。社区报告则利用 Leiden 算法发现图中的社区结构,并为每个社区生成包含标题、摘要、影响评级和关键洞察的报告。

⚖️ **效用与资源权衡**:知识图谱功能尤其适用于处理具有复杂实体和关系的作品,在多跳问答中优势明显。然而,构建知识图谱会消耗大量 token、内存和计算资源,建议用户在少量测试集上验证其带来的性能提升是否与其资源消耗相匹配,再决定是否全面启用。

昨天我们学习了 RAGFlow 的 RAPTOR 分块策略,今天我们将继续学习另一种高级配置 —— 提取知识图谱(use_graphrag)

该特性自 v0.16.0 起引入,开启该配置后,RAGFlow 会在当前知识库的分块上构建知识图谱,构建步骤位于数据抽取和索引之间,如下所示:

知识图谱在涉及嵌套逻辑的多跳问答中尤其有用,当你在对书籍或具有复杂实体和关系的作品进行问答时,知识图谱的表现优于传统的抽取方法。

请注意,构建知识图谱将消耗大量 token 和时间。

开启 GraphRAG 任务

GraphRAG 的逻辑位于任务执行器的 do_handle_task() 函数中:

async def do_handle_task(task):      # ...  elif task.get("task_type", "") == "graphrag":        # 绑定聊天模型    chat_model = LLMBundle(task_tenant_id, LLMType.CHAT, llm_name=task_llm_id, lang=task_language)    # 运行 GraphRAG 逻辑    graphrag_conf = task["kb_parser_config"].get("graphrag", {})    with_resolution = graphrag_conf.get("resolution", False)    with_community = graphrag_conf.get("community", False)    async with kg_limiter:      await run_graphrag(task, task_language, with_resolution, with_community, chat_model, embedding_model, progress_callback)    return

这个函数我们之前已经详细学习过,但是跳过了 GraphRAG 相关的逻辑,今天我们就继续来看下这个 run_graphrag() 的实现细节:

async def run_graphrag(row: dict, language, with_resolution: bool, with_community: bool, chat_model, embedding_model, callback):      # 检索原始分块列表  chunks = []  for d in settings.retrievaler.chunk_list(...):    chunks.append(d["content_with_weight"])  # 使用 LightKGExt 或 GeneralKGExt 生成子图  subgraph = await generate_subgraph(    LightKGExt if row["kb_parser_config"]["graphrag"]["method"] != "general" else GeneralKGExt,    ...    row["kb_parser_config"]["graphrag"]["entity_types"],    ...  )  # 将子图合并到知识图谱中  subgraph_nodes = set(subgraph.nodes())  new_graph = await merge_subgraph(...)  # 实体消歧  if with_resolution:    await resolve_entities(new_graph, subgraph_nodes, ...)  # 社区报告  if with_community:    await extract_community(new_graph, ...)

整体的逻辑还是比较清晰的,首先通过 retrievaler.chunk_list() 检索出该文档原始的分块列表,然后基于配置的实体类型生成子图,然后将子图合并到知识图谱中,最后进行实体消歧和社区报告的生成。

和 RAPTOR 任务一样,开启知识图谱也需要先执行一次标准的分块策略,生成原始的分块列表,在完成第一个任务后,会再生成一个知识图谱类型的任务,执行上面的代码逻辑。

提取知识图谱涉及的配置参数如下:

构建子图

构建子图的逻辑位于 generate_subgraph() 函数:

async def generate_subgraph(...):  # 检查 doc_id 是否已经构建过子图  contains = await does_graph_contains(tenant_id, kb_id, doc_id)  if contains:    return None  # 创建提取器实例,提取实体和关系  ext = extractor(llm_bdl, language=language, entity_types=entity_types)  ents, rels = await ext(doc_id, chunks, callback)  # 将实体和关系构建成 NetworkX 子图  subgraph = nx.Graph()  for ent in ents:    ent["source_id"] = [doc_id]    subgraph.add_node(ent["entity_name"], **ent)  for rel in rels:    rel["source_id"] = [doc_id]    subgraph.add_edge(      rel["src_id"],      rel["tgt_id"],      **rel,    )  # 将子图序列化为 JSON 字符串,作为分块存到文档库中  subgraph.graph["source_id"] = [doc_id]  chunk = {    "content_with_weight": json.dumps(      nx.node_link_data(subgraph, edges="edges"), ensure_ascii=False    ),    "knowledge_graph_kwd": "subgraph",    "kb_id": kb_id,    "source_id": [doc_id],    "available_int": 0,    "removed_kwd": "N",  }  cid = chunk_id(chunk)  # 首先根据 doc_id 删除旧的子图  await trio.to_thread.run_sync(    lambda: settings.docStoreConn.delete(      {"knowledge_graph_kwd": "subgraph", "source_id": doc_id}, search.index_name(tenant_id), kb_id    )  )  # 然后插入新的子图  await trio.to_thread.run_sync(    lambda: settings.docStoreConn.insert(      [{"id": cid, **chunk}], search.index_name(tenant_id), kb_id    )  )  return subgraph

关键步骤已经由注释标出,这里不再赘述。主要关注三点:

    支持两种提取器,GeneralKGExtLightKGExt,提取的步骤差不多(都是经过三步:首次抽取 -> 二次抽取 -> 判断是否抽取完毕),只是使用的提示词不一样而已;子图是通过 NetworkX 库构建的,这是一种 Python 中常用的图论库,可以方便地创建、操作和分析图结构;子图会序列化为 JSON 字符串,作为分块存到文档库中,可以在 ES 中通过 "knowledge_graph_kwd": "subgraph" 条件检索出来:

感兴趣的可以看下这个 content_with_weight 字段,里面包含从文档中抽取出来的完整子图。

合并子图

上面一步生成的是文档级别的子图,接下来,将该子图合并到全局知识图谱中:

async def merge_subgraph(tenant_id: str, kb_id: str, subgraph: nx.Graph, ...):  # 检索旧的全局知识图谱  change = GraphChange()  old_graph = await get_graph(tenant_id, kb_id, subgraph.graph["source_id"])  if old_graph is not None:    # 如果旧图谱存在,则将文档子图合并到全局图谱中    new_graph = graph_merge(old_graph, subgraph, change)  else:    # 如果旧图谱不存在,则直接使用文档子图作为新的全局图谱    new_graph = subgraph    change.added_updated_nodes = set(new_graph.nodes())    change.added_updated_edges = set(new_graph.edges())  # 计算 PageRank  pr = nx.pagerank(new_graph)  for node_name, pagerank in pr.items():    new_graph.nodes[node_name]["pagerank"] = pagerank  # 保存新的全局图谱  await set_graph(tenant_id, kb_id, embedding_model, new_graph, change, callback)    return new_graph

合并的逻辑比较简单,就是遍历文档子图中的所有节点和边,判断是否已经存在于全局图谱中,如果存在,就将 descriptionkeywordssource_id 等属性拼接到全局图谱中。此外,还会使用 NetworkX 的 pagerank() 方法 对合并后的图谱计算 PageRank 值,为每个节点添加 pagerank 属性,用于衡量节点的重要性。

PageRank 最初被设计为一种对网页进行排名的算法,在 NetworkX 中,是根据指向该节点的边的个数来计算节点的排名,表示该实体在知识图谱中的重要性。

开启实体消歧

实体消歧的逻辑位于 graphrag/entity_resolution.py 文件:

class EntityResolution(Extractor):  async def __call__(self, graph: nx.Graph, subgraph_nodes: set[str], ...) -> EntityResolutionResult:        # 将节点按照实体类型分组    nodes = sorted(graph.nodes())    entity_types = sorted(set(graph.nodes[node].get('entity_type', '-') for node in nodes))    node_clusters = {entity_type: [] for entity_type in entity_types}    for node in nodes:      node_clusters[graph.nodes[node].get('entity_type', '-')].append(node)    # 在同类型实体中生成所有可能的配对组合    candidate_resolution = {entity_type: [] for entity_type in entity_types}    for k, v in node_clusters.items():      candidate_resolution[k] = [        (a, b) for a, b in itertools.combinations(v, 2)         if (a in subgraph_nodes or b in subgraph_nodes) and self.is_similarity(a, b)      ]        # 并发调用大模型进行批量消歧,大模型针对每一对实体输出明确的 Yes/No 判断    # 默认一批 100 对实体,最多并发 5 个任务    resolution_result = set()    async with trio.open_nursery() as nursery:      for candidate_resolution_i in candidate_resolution.items():        for i in range(0, len(candidate_resolution_i[1]), resolution_batch_size):          candidate_batch = candidate_resolution_i[0], candidate_resolution_i[1][i:i + resolution_batch_size]          nursery.start_soon(limited_resolve_candidate, candidate_batch, resolution_result, resolution_result_lock)    # 将消歧结果构建成新的图谱    change = GraphChange()    connect_graph = nx.Graph()    connect_graph.add_edges_from(resolution_result)    async with trio.open_nursery() as nursery:      for sub_connect_graph in nx.connected_components(connect_graph):        merging_nodes = list(sub_connect_graph)        nursery.start_soon(limited_merge_nodes, graph, merging_nodes, change)    return EntityResolutionResult(      graph=graph,      change=change,    )

实体消歧所使用的提示词核心部分如下,主要是输出部分使用的一些特殊符号,方便程序解析结果:

问题:在判断两个产品是否相同时,你应该只关注关键属性,忽略噪声因素。演示 1: 产品A的名称是:"电脑",产品B的名称是:"手机" 不,产品A和产品B是不同的产品。问题 1: 产品A的名称是:"电视机",产品B的名称是:"电视"问题 2: 产品A的名称是:"杯子",产品B的名称是:"马克杯"问题 3: 产品A的名称是:"足球",产品B的名称是:"橄榄球"问题 4: 产品A的名称是:"钢笔",产品B的名称是:"橡皮擦"使用产品的领域知识来帮助理解文本,并按以下格式回答上述4个问题:对于问题i,是的,产品A和产品B是同一个产品。或者 不,产品A和产品B是不同的产品。对于问题i+1,(重复上述程序)################输出:(对于问题 <|>1<|>,&&是&&,产品A和产品B是同一个产品。)##(对于问题 <|>2<|>,&&是&&,产品A和产品B是同一个产品。)##(对于问题 <|>3<|>,&&不&&,产品A和产品B是不同的产品。)##(对于问题 <|>4<|>,&&不&&,产品A和产品B是不同的产品。)##

生成社区报告

生成社区报告的逻辑位于 graphrag/general/community_reports_extractor.py 文件:

class CommunityReportsExtractor(Extractor):  async def __call__(self, graph: nx.Graph, callback: Callable | None = None):    # 使用 Leiden 算法来发现图中的社区结构    # 将社区组织成一个多层级的树形结构,每个层级包含多个社区    communities: dict[str, dict[str, list]] = leiden.run(graph, {})    # 遍历每一个社区,从图中提取当前社区中的所有实体和关系的描述,调用大模型生成社区报告    async with trio.open_nursery() as nursery:      for level, comm in communities.items():        logging.info(f"Level {level}: Community: {len(comm.keys())}")        for community in comm.items():          nursery.start_soon(extract_community_report, community)    return CommunityReportsResult(      structured_output=res_dict,      output=res_str,    )

整个流程比较简单,分为两步。第一步,使用 Leiden 算法 发现图中的社区结构。

在网络科学或图论中,社区(Community) 是指网络中的一组节点,其核心特征是:社区内部的节点之间连接紧密,而与社区外部节点的连接相对稀疏,这种 “内密外疏” 的结构是社区的核心标志,反映了网络中节点的聚类性和关联性。Leiden 算法是一种在图数据中识别社区结构的高效算法,由 Traag 等人在莱顿大学于 2018 年提出。它在经典的 Louvain 算法 基础上进行了改进,解决了 Louvain 算法中可能出现的 “分辨率限制” 和社区划分不精确的问题,因此在复杂网络分析中被广泛应用。

这里,RAGFlow 使用的是 graspologic 库的 hierarchical_leiden() 方法。

第二步,调用大模型为每个社区生成摘要,这被称为 社区报告(Community Report),报告以 JSON 格式输出:

{  "title": <报告标题>,  "summary": <执行摘要>,  "rating": <影响严重性评级>,  "rating_explanation": <评级说明>,  "findings": [    {      "summary":<洞察1摘要>,      "explanation": <洞察1解释>    },    {      "summary":<洞察2摘要>,      "explanation": <洞察2解释>    }  ]}

包括以下几个部分:

生成的社区报告可以在 ES 中通过 "knowledge_graph_kwd": "community_report" 条件检索出来:

小结

在今天的学习中,我们深入探讨了 RAGFlow 中的知识图谱功能,我们详细了解了提取知识图谱的流程,包括:实体和关系的提取,子图的构建和合并,实体消歧和社区报告生成等。图谱生成成功后,知识库的配置页面会多出一个 “知识图谱” 的菜单项:

通过引入知识图谱,RAGFlow 能在复杂多跳问答场景中表现得更加出色,特别是在分析具有复杂关系和实体的文档时。和昨天学习的 RAPTOR 一样,启用知识图谱功能需要大量的内存、计算资源和令牌,在使用时需要权衡利弊,建议提前在少量测试集上进行验证,只有当效果提升明显,才具有足够的性价比,才建议开启该功能。

欢迎关注

如果这篇文章对您有所帮助,欢迎关注我的同名公众号:日习一技,每天学一点新技术

我会每天花一个小时,记录下我学习的点点滴滴。内容包括但不限于:

目标是让大家用5分钟读完就能有所收获,不需要太费劲,但却可以轻松获取一些干货。不管你是技术新手还是老鸟,欢迎给我提建议,如果有想学习的技术,也欢迎交流!

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

RAGFlow 知识图谱 GraphRAG 多跳问答 实体消歧 社区报告
相关文章