MarkTechPost@AI 前天 15:50
A Step-by-Step Guide to Build an Automated Knowledge Graph Pipeline Using LangGraph and NetworkX
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍如何使用LangGraph和NetworkX构建一个自动化的知识图谱(KG)pipeline。该pipeline模拟一系列智能体协同工作,执行数据收集、实体提取、关系识别、实体解析和图验证等任务。从用户提供的主题(如“人工智能”)开始,系统有条不紊地提取相关实体和关系,解决重复问题,并将信息整合到一个有凝聚力的图形结构中。通过可视化最终的知识图谱,开发者和数据科学家可以清晰地了解概念之间的复杂相互关系,这使得该方法对语义分析、自然语言处理和知识管理等应用非常有益。

🤖️ 该知识图谱Pipeline的核心是利用LangGraph和NetworkX,LangGraph用于创建和编排基于智能体的计算工作流程,NetworkX用于构建和可视化图谱。

📚 Pipeline模拟了多个智能体协同工作,包括数据收集、实体提取、关系识别、实体解析和图验证。每个智能体负责特定任务,例如,`data_gatherer`模拟收集原始文本数据,`entity_extractor`提取文本中的实体。

🔄 实体提取后,`relation_extractor`函数检测实体间的语义关系。它使用预定义的正则表达式模式,识别实体对之间的短语(例如“影响”或“是...的类型”),并将其作为三元组添加到关系列表中。

✅ 为了避免重复和不一致,`entity_resolver`函数标准化实体名称。它通过将每个实体转换为小写并替换空格来创建映射。然后,将此映射应用于提取的关系中的所有主语和宾语,以生成已解析的关系。

📊 最后,`graph_integrator`函数使用NetworkX构建实际的知识图谱。它迭代已解析的三元组,确保两个节点都存在,然后添加带关系的定向边。`graph_validator`函数对构建的知识图谱执行基本健康检查,生成验证报告。

In this tutorial, we demonstrate how to construct an automated Knowledge Graph (KG) pipeline using LangGraph and NetworkX. The pipeline simulates a sequence of intelligent agents that collaboratively perform tasks such as data gathering, entity extraction, relation identification, entity resolution, and graph validation. Starting from a user-provided topic, such as “Artificial Intelligence,” the system methodically extracts relevant entities and relationships, resolves duplicates, and integrates the information into a cohesive graphical structure. By visualizing the final knowledge graph, developers and data scientists gain clear insights into complex interrelations among concepts, making this approach highly beneficial for applications in semantic analysis, natural language processing, and knowledge management.

!pip install langgraph langchain_core

We install two essential Python libraries: LangGraph, which is used for creating and orchestrating agent-based computational workflows, and LangChain Core, which provides foundational classes and utilities for building language model-powered applications. These libraries enable seamless integration of agents into intelligent data pipelines.

import reimport networkx as nximport matplotlib.pyplot as pltfrom typing import TypedDict, List, Tuple, Dict, Anyfrom langchain_core.messages import HumanMessage, AIMessagefrom langgraph.graph import StateGraph, END

We import essential libraries to build an automated knowledge graph pipeline. It includes re for regular expression-based text processing, NetworkX and matplotlib for creating and visualizing graphs, TypedDict and typing annotations for structured data handling, and LangGraph along with langchain_core for orchestrating the interaction between AI agents within the workflow.

class KGState(TypedDict):    topic: str    raw_text: str    entities: List[str]    relations: List[Tuple[str, str, str]]    resolved_relations: List[Tuple[str, str, str]]    graph: Any    validation: Dict[str, Any]    messages: List[Any]    current_agent: str

We define a structured data type, KGState, using Python’s TypedDict. It outlines the schema for managing state across different steps of the knowledge graph pipeline. It includes details like the chosen topic, gathered text, identified entities and relationships, resolved duplicates, the constructed graph object, validation results, interaction messages, and tracking the currently active agent.

def data_gatherer(state: KGState) -> KGState:    topic = state["topic"]    print(f" Data Gatherer: Searching for information about '{topic}'")       collected_text = f"{topic} is an important concept. It relates to various entities like EntityA, EntityB, and EntityC. EntityA influences EntityB. EntityC is a type of EntityB."       state["messages"].append(AIMessage(content=f"Collected raw text about {topic}"))       state["raw_text"] = collected_text    state["current_agent"] = "entity_extractor"       return state

This function, data_gatherer, acts as the first step in the pipeline. It simulates gathering raw text data about a provided topic (stored in state[“topic”]). It then stores this simulated data into state[“raw_text”], adds a message indicating the data collection completion, and updates the pipeline’s state by setting the next agent (entity_extractor) as active.

def entity_extractor(state: KGState) -> KGState:    print(" Entity Extractor: Identifying entities in the text")    text = state["raw_text"]       entities = re.findall(r"Entity[A-Z]", text)       entities = [state["topic"]] + entities    state["entities"] = list(set(entities))       state["messages"].append(AIMessage(content=f"Extracted entities: {state['entities']}"))    print(f"   Found entities: {state['entities']}")       state["current_agent"] = "relation_extractor"       return state

The entity_extractor function identifies entities from the collected raw text using a simple regular expression pattern that matches terms like “EntityA”, “EntityB”, etc. It also includes the main topic as an entity and ensures uniqueness by converting the list to a set. The extracted entities are stored in the state, an AI message logs the result, and the pipeline advances to the relation_extractor agent.

def relation_extractor(state: KGState) -> KGState:    print(" Relation Extractor: Identifying relationships between entities")    text = state["raw_text"]    entities = state["entities"]    relations = []       relation_patterns = [        (r"([A-Za-z]+) relates to ([A-Za-z]+)", "relates_to"),        (r"([A-Za-z]+) influences ([A-Za-z]+)", "influences"),        (r"([A-Za-z]+) is a type of ([A-Za-z]+)", "is_type_of")    ]       for e1 in entities:        for e2 in entities:            if e1 != e2:                for pattern, rel_type in relation_patterns:                    if re.search(f"{e1}.*{rel_type}.*{e2}", text.replace("_", " "), re.IGNORECASE) or \                       re.search(f"{e1}.*{e2}", text, re.IGNORECASE):                        relations.append((e1, rel_type, e2))       state["relations"] = relations    state["messages"].append(AIMessage(content=f"Extracted relations: {relations}"))    print(f"   Found relations: {relations}")       state["current_agent"] = "entity_resolver"       return state

The relation_extractor function detects semantic relationships between entities within the raw text. It uses predefined regex patterns to identify phrases like “influences” or “is a type of” between entity pairs. When a match is found, it adds the corresponding relation as a triple (subject, predicate, object) to the relations list. These extracted relations are stored in the state, a message is logged for agent communication, and control moves to the next agent: entity_resolver.

def entity_resolver(state: KGState) -> KGState:    print(" Entity Resolver: Resolving duplicate entities")       entity_map = {}    for entity in state["entities"]:        canonical_name = entity.lower().replace(" ", "_")        entity_map[entity] = canonical_name       resolved_relations = []    for s, p, o in state["relations"]:        s_resolved = entity_map.get(s, s)        o_resolved = entity_map.get(o, o)        resolved_relations.append((s_resolved, p, o_resolved))       state["resolved_relations"] = resolved_relations    state["messages"].append(AIMessage(content=f"Resolved relations: {resolved_relations}"))       state["current_agent"] = "graph_integrator"       return state

The entity_resolver function standardizes entity names to avoid duplication and inconsistencies. It creates a mapping (entity_map) by converting each entity to lowercase and replacing spaces with underscores. Then, this mapping is applied to all subjects and objects in the extracted relations to produce resolved relations. These normalized triples are added to the state, a confirmation message is logged, and control is passed to the graph_integrator agent.

def graph_integrator(state: KGState) -> KGState:    print(" Graph Integrator: Building the knowledge graph")    G = nx.DiGraph()       for s, p, o in state["resolved_relations"]:        if not G.has_node(s):            G.add_node(s)        if not G.has_node(o):            G.add_node(o)        G.add_edge(s, o, relation=p)       state["graph"] = G    state["messages"].append(AIMessage(content=f"Built graph with {len(G.nodes)} nodes and {len(G.edges)} edges"))       state["current_agent"] = "graph_validator"       return state

The graph_integrator function constructs the actual knowledge graph using networkx.DiGraph() supports directed relationships. It iterates over the resolved triples (subject, predicate, object), ensures both nodes exist, and then adds a directed edge with the relation as metadata. The resulting graph is saved in the state, a summary message is appended, and the pipeline transitions to the graph_validator agent for final validation.

def graph_validator(state: KGState) -> KGState:    print(" Graph Validator: Validating knowledge graph")    G = state["graph"]       validation_report = {        "num_nodes": len(G.nodes),        "num_edges": len(G.edges),        "is_connected": nx.is_weakly_connected(G) if G.nodes else False,        "has_cycles": not nx.is_directed_acyclic_graph(G) if G.nodes else False    }       state["validation"] = validation_report    state["messages"].append(AIMessage(content=f"Validation report: {validation_report}"))    print(f"   Validation report: {validation_report}")       state["current_agent"] = END       return state

The graph_validator function performs a basic health check on the constructed knowledge graph. It compiles a validation report containing the number of nodes and edges, whether the graph is weakly connected (i.e., every node is reachable if direction is ignored), and whether the graph contains cycles. This report is added to the state and logged as an AI message. Once validation is complete, the pipeline is marked as finished by setting the current_agent to END.

def router(state: KGState) -> str:    return state["current_agent"]def visualize_graph(graph):    plt.figure(figsize=(10, 6))    pos = nx.spring_layout(graph)       nx.draw(graph, pos, with_labels=True, node_color='skyblue', node_size=1500, font_size=10)       edge_labels = nx.get_edge_attributes(graph, 'relation')    nx.draw_networkx_edge_labels(graph, pos, edge_labels=edge_labels)       plt.title("Knowledge Graph")    plt.tight_layout()    plt.show()

The router function directs the pipeline to the next agent based on the current_agent field in the state. Meanwhile, the visualize_graph function uses matplotlib and networkx to display the final knowledge graph, showing nodes, edges, and labeled relationships for intuitive visual understanding.

def build_kg_graph():    workflow = StateGraph(KGState)       workflow.add_node("data_gatherer", data_gatherer)    workflow.add_node("entity_extractor", entity_extractor)    workflow.add_node("relation_extractor", relation_extractor)    workflow.add_node("entity_resolver", entity_resolver)    workflow.add_node("graph_integrator", graph_integrator)    workflow.add_node("graph_validator", graph_validator)       workflow.add_conditional_edges("data_gatherer", router,                                {"entity_extractor": "entity_extractor"})    workflow.add_conditional_edges("entity_extractor", router,                                {"relation_extractor": "relation_extractor"})    workflow.add_conditional_edges("relation_extractor", router,                                {"entity_resolver": "entity_resolver"})    workflow.add_conditional_edges("entity_resolver", router,                                {"graph_integrator": "graph_integrator"})    workflow.add_conditional_edges("graph_integrator", router,                                {"graph_validator": "graph_validator"})    workflow.add_conditional_edges("graph_validator", router,                                {END: END})       workflow.set_entry_point("data_gatherer")       return workflow.compile()

The build_kg_graph function defines the complete knowledge graph workflow using LangGraph. It sequentially adds each agent as a node, from data collection to graph validation, and connects them through conditional transitions based on the current agent. The entry point is set to data_gatherer, and the graph is compiled into an executable workflow that guides the automated pipeline from start to finish.

def run_knowledge_graph_pipeline(topic):    print(f" Starting knowledge graph pipeline for: {topic}")       initial_state = {        "topic": topic,        "raw_text": "",        "entities": [],        "relations": [],        "resolved_relations": [],        "graph": None,        "validation": {},        "messages": [HumanMessage(content=f"Build a knowledge graph about {topic}")],        "current_agent": "data_gatherer"    }       kg_app = build_kg_graph()    final_state = kg_app.invoke(initial_state)       print(f" Knowledge graph construction complete for: {topic}")       return final_state

The run_knowledge_graph_pipeline function initializes the pipeline by setting up an empty state dictionary with the provided topic. It builds the workflow using build_kg_graph(), then runs it by invoking the compiled graph with the initial state. As each agent processes the data, the state evolves, and the final result contains the complete knowledge graph, validated and ready for use.

if __name__ == "__main__":    topic = "Artificial Intelligence"    result = run_knowledge_graph_pipeline(topic)       visualize_graph(result["graph"])

Finally, this block serves as the script’s entry point. When executed directly, it triggers the knowledge graph pipeline for the topic “Artificial Intelligence,” runs through all agent stages, and finally visualizes the resulting graph using the visualize_graph() function. It provides an end-to-end demonstration of automated knowledge graph generation.

Output Generated from Knowledge Graph Execution

In conclusion, we have learned how to seamlessly integrate multiple specialized agents into a cohesive knowledge graph pipeline through this structured approach, leveraging LangGraph and NetworkX. This workflow automates entity and relation extraction processes and visualizes intricate relationships, offering a clear and actionable representation of gathered information. By adjusting and enhancing individual agents, such as employing more sophisticated entity recognition methods or integrating real-time data sources, this foundational framework can be scaled and customized for advanced knowledge graph construction tasks across various domains.


Check out the Colab Notebook. All credit for this research goes to the researchers of this project. Also, feel free to follow us on Twitter and don’t forget to join our 90k+ ML SubReddit.

The post A Step-by-Step Guide to Build an Automated Knowledge Graph Pipeline Using LangGraph and NetworkX appeared first on MarkTechPost.

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

LangGraph 知识图谱 自动化Pipeline
相关文章