掘金 人工智能 17小时前
基于知识图谱增强的RAG系统阅读笔记(七)GraphRAG实现(基于小说诛仙)(一)
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍了GraphRAG技术如何通过两阶段流程,从《诛仙》小说文本中提取实体、关系并构建知识图谱。第一阶段抽取并摘要实体与关系,第二阶段检测实体社区并生成摘要,实现信息整合。通过定义PERSON、FACTION、LOCATION、ARTIFACT等七类实体,并结合Neo4j数据库,为小说内容构建了结构化的图表示,为深入理解故事情节和角色关系提供了可能。文章还详述了文本预处理、实体关系抽取提示词设计以及具体的Python实现细节。

✨ GraphRAG的核心创新在于其两阶段知识图谱构建流程:首先从源文档提取并摘要实体与关系,随后检测实体社区并生成领域特定摘要,从而将碎片化信息整合成结构化的知识表示。

📚 为了实现有效的知识提取,《诛仙》小说被划分为184个文本块,并定义了PERSON、FACTION、LOCATION、ARTIFACT、CULTIVATION_SKILL、EVENT、CREATURE七类实体,为后续的实体识别和关系抽取奠定了基础。

🔗 在实体关系抽取阶段,采用了详细的提示词(Prompt)来指导大语言模型识别实体及其属性、活动,并提取实体间的明确关系、描述原因及量化关系强度,确保了输出的结构化和信息丰富度。

💾 为了应对大文档处理的耗时和潜在中断,代码实现了进度管理机制,包括自动保存处理进度、支持从中断点继续、避免重复处理以及跟踪章节处理状态(待处理、处理中、已完成、失败),并设计了重试策略以提高鲁棒性。

💡 文本预处理方面,章节内容被进一步分割成三个部分,以提高信息提取的精细度。同时,在与本地大模型的交互中加入了重试机制,允许对单个文本片段进行最多三次重试,以应对模型输出的不稳定性。

第七章 GraphRAG实现(基于小说诛仙)

[!NOTE]

基于此项目,向我已经逝去的童年致敬,那个时候我很喜欢用MP3偷偷看小说。现在虽然手上的电子设备不断迭代更新,但是再也找不到那个感觉了。

7.1 基本知识

微软GraphRAG 的一项关键创新在于其利用大语言模型通过两阶段流程构建知识图谱。第一阶段,从源文档中抽取并摘要实体与关系,形成知识图谱的基础。区别于传统方法的是,在知识图谱构建完成后,GraphRAG 会进一步检测图中的实体社区,并为紧密关联的实体群组生成领域特定的摘要。这种分层架构将来自不同文本块的碎片化信息,整合为关于指定实体、关系及社区的连贯、有序的信息表示。

这些实体级社区级摘要随后可用于 RAG应用中,作为响应用户查询的相关信息来源。借助这一结构化的知识图谱,可灵活应用多种检索策略。同时我们还将实现论文中描述的全局搜索(global search)与局部搜索(local search)两种检索方法。

GraphRAG旨在通过从非结构化文本文档中提取关键实体,并生成跨多个文本块的信息连接摘要,实现对文档内容的深度理解。为确保能够产生有意义的洞察,所选数据集不仅应富含实体信息,还需具备实体信息在多个文本块中分布的特点需要提前对实体进行定义。

理想的实体应在全文中多次出现于不同段落或章节,其信息是“碎片化分布”的。这样 GraphRAG 才能通过图谱连接这些信息,并生成连贯的实体级摘要。张小凡的信息分散在童年(草庙村)、少年(青云门学艺)、青年(七脉会武)、黑化(成为鬼厉)等多个阶段。噬魂棒的来历、能力、与主人的关系,在不同战斗场景中逐步揭示。天书的秘密贯穿全书,每卷出现都带来新线索。

同时《诛仙》中有多个由人物、门派、事件构成的强连接子图,非常适合社区检测与摘要生成:

社区构成实体可生成的社区摘要
青云门弟子圈张小凡、陆雪琪、田灵儿、宋大仁、齐昊“青云门以正道自居,强调清修与斩妖除魔,门下弟子多出自名门,但内部也存在嫉妒与权力斗争。”
鬼王宗势力联盟鬼王、碧瑶、小环、合欢派、万毒门“鬼王宗主张打破正邪界限,追求力量解放,其核心理念‘天地不仁’挑战传统秩序。”
情感关系网络张小凡 ↔ 碧瑶,张小凡 ↔ 陆雪琪“张小凡一生牵系两位女子:碧瑶为他牺牲,陆雪琪默默守候,体现了爱与责任的永恒冲突。”

(我比较懒 后面很多内容是AI生成的 味道和思路对了就行)

7.2 文本预处理

首先对文本进行了分块,这里直接按照章节进行了分块。

import re# 指定文件路径file_path = "/data/KGRAG/诛仙.txt"# 读取文件内容try:    with open(file_path, 'r', encoding='utf-8') as file:        content = file.read()except FileNotFoundError:    print("文件未找到,请检查路径是否正确。")    exit()# 使用正则表达式匹配中文章节标题,例如“第一章 青云”、“第1章”、“第100章”等pattern = r'(第[0-9一二三四五六七八九十百千]+章\s*[^\n]*)'# 分割文本parts = re.split(pattern, content)# 初始化章节列表chapters = []# 第一个部分通常是前言或引子if len(parts) > 0 and parts[0].strip():    chapters.append({        'title': '引子或前言',        'content': parts[0],        'char_count': len(parts[0]),        'token_est': int(len(parts[0]) * 1.3)    })# 后续部分成对出现:标题和内容for i in range(1, len(parts), 2):    if i + 1 < len(parts):        title = parts[i].strip()        text_content = parts[i + 1]        char_count = len(text_content)        token_est = int(char_count * 1.3)        chapters.append({            'title': title,            'content': text_content,            'char_count': char_count,            'token_est': token_est        })# 输出结果print("《诛仙》文本分块结果")print("=" * 80)print(f"共分割出 {len(chapters)} 个文本块\n")for idx, chapter in enumerate(chapters):    print(f"块 {idx + 1:3d} | 标题: {chapter['title']}")    print(f"       字符数: {chapter['char_count']:6d} | 估算 token 数: {chapter['token_est']:6d}")    preview = chapter['content'][:100]    if len(chapter['content']) > 100:        preview += "..."    print(f"       预览: {preview}")    print()print("=" * 80)print("分块完成。")
《诛仙》文本分块结果================================================================================共分割出 184 个文本块块   1 | 标题: 引子或前言       字符数:    867 | 估算 token 数:   1127       预览:   《诛仙(新修版)》作者:萧鼎  文案:  这世间本是没有什么神仙的,但自太古以来,人类眼见周遭世界,诸般奇异之事……序章  天地不仁,以万物为刍狗。  自太古以来,人类见世界有诸般...块   2 | 标题: 第1章 青云       字符数:   3186 | 估算 token 数:   4141       预览:   青云山脉巍峨高耸,虎踞中原,山阴处有大河“洪川”,山阳乃重镇“河阳城”,扼天下咽喉,地理位置十分重要。  青云山连绵百里,峰峦起伏,最高有七峰,高耸入云,平日里云雾浓重环绕山腰,不见山顶真容。...块   3 | 标题: 第2章 迷局       字符数:   4082 | 估算 token 数:   5306       预览:   那老僧不答,只用目光在这两个小孩身上细细看了看,忍不住便多看了林惊羽几眼,心道:“好资质,只是性子怎么如此偏激?”  这时张小凡踏上一步,道:“喂,你是谁啊,怎么从没见过你?”  草庙村在青...

7.3 实体与关系抽取

在原始论文《From Local to Global: A GraphRAG Approach to Query-Focused Summarization》的附录中,微软给出了专门用于的提示词,如下:

---Goal--Given a text document that is potentially relevant to this activity and a list of entity types, identify all entities of those types from the text and all relationships among the identified entities.---Steps--1. Identify all entities. For each identified entity, extract the following information:- entity_name: Name of the entity, capitalized- entity_type: One of the following types: [{', '.join(entity_types)}]- entity_description: Comprehensive description of the entity’s attributes and activitiesFormateachentityas("entity"{TUPLE_DELIMITER}<entity_name>{TUPLE_DELIMITER}<entity_type>{TUPLE_DELIMITER}<entity_description>)2. From the entities identified in step 1, identify all pairs of (source_entity, target_entity) that are *clearly related* to each other.For each pair of related entities, extract the following information:- source_entity: name of the source entity, as identified in step 1- target_entity: name of the target entity, as identified in step 1- relationship_description: explanation as to why you think the source entity and the target entity are related to each other- relationship_strength: a numeric score indicating strength of the relationship between the source entity and target entityFormateachrelationshipas("relationship"{TUPLE_DELIMITER}<source_entity>{TUPLE_DELIMITER}<target_entity>{TUPLE_DELIMITER}<relationship_description>{TUPLE_DELIMITER}<relationship_strength>)3. Return output in English as a single list of all the entities and relationships identified in steps 1 and 2. Use**{RECORD_DELIMITER}**asthelist delimiter.4. When finished, output {COMPLETION_DELIMITER}---Real Data--Entity types: {', '.join(entity_types)}Input:{text}Output:

这个提示词(Prompt)的作用是指导大语言模型(LLM)从一段非结构化文本中,系统性地识别并提取特定类型的实体及其相互关系,最终生成一个结构化的知识表示

具体来说,它的作用可以分解为以下几个核心目标:

    实体识别与分类 (Entity Recognition and Classification)

      它要求模型扫描输入文本,找出所有属于预定义类型(如 PERSON, LOCATION, ORGANIZATION 等)的实体。对每个实体,不仅要提取其名称,还要生成一个全面的描述,总结该实体在文中的关键属性和活动。

    关系抽取 (Relationship Extraction)

      在识别出实体的基础上,它要求模型进一步分析这些实体之间的关联。模型需要找出哪些实体对是“明确相关”的(例如,人物之间的互动、地点与事件的关联、组织与人物的从属关系等)。对每一对相关实体,不仅要描述它们关系的原因,还要给出一个表示关系强度的数值评分,这为后续分析提供了权重信息。

    结构化输出 (Structured Output)

      该提示词强制要求模型以一种高度结构化、机器可解析的格式输出结果。通过使用特定的分隔符(如 {tuple_delimiter}{record_delimiter}),将实体和关系分别格式化为元组(tuple)和列表,这使得后续程序可以轻松地将这些文本结果解析为代码中的数据结构(如字典、列表),并用于构建知识图谱。

    任务流程化与标准化 (Task Standardization)

      它将一个复杂的“理解文本并提取知识”的任务,分解为清晰的、可重复的步骤(先找实体,再找关系)。通过提供具体的例子(Few-shot examples),它为模型展示了期望的输入输出格式和内容深度,极大地提高了抽取结果的一致性和准确性。

针对《诛仙》,我们定义如下的实体类型:

这组实体类型覆盖了《诛仙》世界中的核心要素:人物(谁)势力(属于哪里)地点(在哪里)法宝与功法(拥有什么能力)事件(发生了什么)。定义了待抽取的实体类型,将基于这些类型与输入文本生成抽取提示,发送给LLM,并将响应解析为结构化字典格式。随后将其导入 Neo4j,建立文本的结构化图表示。

7.4 具体实现

文末给出了完整的可以直接跑的代码

初始化

import requestsimport refrom typing import List, Dictimport jsonimport osimport signalimport timefrom neo4j import GraphDatabaseimport hashlib
# 相关参数配置OLLAMA_BASE_URL = "http://localhost:11434"# 显存不够可以改小模型LLM_MODEL = "qwen3:32b"# Neo4j 数据库连接配置NEO4J_URI = "bolt://locahost:7687"NEO4J_USER = "neo4j"NEO4J_PASSWORD = "你自己的密码"# 进度文件路径# 这个是用来保存处理进度实现中断的 适合跑跑停停的朋友 毕竟大文档生成也需要时间PROGRESS_FILE = "processing_progress.json"# 初始化 Neo4j 驱动neo4j_driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
# 定义节点类型ENTITY_TYPES = [    "PERSON",           # 人物    "FACTION",          # 势力/门派    "LOCATION",         # 地点    "ARTIFACT",         # 法宝    "CULTIVATION_SKILL", # 功法    "EVENT",            # 事件    "CREATURE"          # 生物(非人)]
# 从大模型输出中解析出关系会用到TUPLE_DELIMITER = "|"RECORD_DELIMITER = "##"COMPLETION_DELIMITER = "<<COMPLETE>>"# 全局缓存章节数据chapter_data_cache = {}# 全局变量用于优雅退出shutdown_requested = False

进度管理

处理大文件时,需要很长时间。有时候可能需要走走停停,因此设计了进度管理模块。简单的持久化进度跟踪:

def load_processing_progress():    """    从文件中读取处理进度    """    if os.path.exists(PROGRESS_FILE):        try:            with open(PROGRESS_FILE, 'r', encoding='utf-8') as f:                return json.load(f)        except Exception as e:            print(f"读取进度文件失败: {e},使用默认配置")    # 默认进度配置    return {        "last_processed_index": 0,        "completed_chapters": [],        "failed_chapters": {},        "processing_status": {},        "total_chapters": 0,        "start_time": time.strftime("%Y-%m-%d %H:%M:%S"),        "batch_size": 10    }
def save_processing_progress(progress_data):    """    将进度数据持久化到文件    """    try:        with open(PROGRESS_FILE, 'w', encoding='utf-8') as f:            json.dump(progress_data, f, ensure_ascii=False, indent=2)        print("进度已保存")    except Exception as e:        print(f"保存进度文件失败: {e}")
def mark_chapter_as_processing(progress_data, chapter_title):    """    标记章节为处理中    """    progress_data["processing_status"][chapter_title] = {        "status": "processing",        "start_time": time.strftime("%Y-%m-%d %H:%M:%S")    }    save_processing_progress(progress_data)
def mark_chapter_as_completed(progress_data, chapter_title):    """    标记章节为已完成    """    # 从处理状态中移除    if chapter_title in progress_data["processing_status"]:        del progress_data["processing_status"][chapter_title]    # 添加到已完成列表    if chapter_title not in progress_data["completed_chapters"]:        progress_data["completed_chapters"].append(chapter_title)    # 从失败列表中移除(如果存在)    if chapter_title in progress_data["failed_chapters"]:        del progress_data["failed_chapters"][chapter_title]    save_processing_progress(progress_data)
def mark_chapter_as_failed(progress_data, chapter_title, error_msg, retry_count=0):    """    标记章节为失败    """    # 从处理状态中移除    if chapter_title in progress_data["processing_status"]:        del progress_data["processing_status"][chapter_title]    # 添加到失败列表    progress_data["failed_chapters"][chapter_title] = {        "error": error_msg,        "retry_count": retry_count,        "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")    }    save_processing_progress(progress_data)
def get_chapters_to_process(all_chapters, progress_data, batch_size=None, retry_failed=True):    """    智能获取需要处理的章节列表    Args:        all_chapters: 所有章节列表        progress_data: 进度数据        batch_size: 批处理大小,None表示处理所有章节        retry_failed: 是否重试失败章节    """    if batch_size is None:        batch_size = len(all_chapters)    chapters_to_process = []    completed_set = set(progress_data["completed_chapters"])    failed_chapters = progress_data["failed_chapters"]  # 直接使用字典,避免创建冗余集合    # 如果选择重试失败章节    if retry_failed and failed_chapters:        print("重试失败章节:")        for chapter_title, fail_info in failed_chapters.items():  # 直接遍历字典items,避免冗余访问            retry_count = fail_info.get("retry_count", 0)            if retry_count < 3:  # 最多重试3次                chapter_info = next((ch for ch in all_chapters if ch['title'] == chapter_title), None)                if chapter_info:                    chapters_to_process.append((chapter_info, retry_count + 1))                    print(f"  - {chapter_title} (第 {retry_count + 1} 次重试)")    # 添加未处理的章节    if len(chapters_to_process) < batch_size:        remaining_slots = batch_size - len(chapters_to_process)        added_count = 0        for chapter in all_chapters:            if added_count >= remaining_slots:                break            if chapter['title'] not in completed_set and chapter['title'] not in failed_chapters:                chapters_to_process.append((chapter, 0))  # retry_count = 0                added_count += 1    return chapters_to_process
# 收到中断信号则进行安全退出def signal_handler(sig, frame):    global shutdown_requested    print('安全退出')    shutdown_requested = True

大模型交互

# 返回用于实体和关系抽取的结构化提示词def create_extraction_prompt(entity_types: List[str], text: str) -> str:    prompt = f"""---目标--给定一个可能与该活动相关的文本文档和一组实体类型,请从文本中识别出这些类型的全部实体以及它们之间的全部关系。---步骤--1. 识别所有实体。对于每个识别出的实体,提取以下信息:- entity_name: 实体名称,使用原文中的中文名称- entity_type: 实体类型,必须是以下之一:[{', '.join(entity_types)}]- entity_description: 对实体属性和行为的详细描述(请用中文描述)每个实体格式为:("entity"{TUPLE_DELIMITER}<entity_name>{TUPLE_DELIMITER}<entity_type>{TUPLE_DELIMITER}<entity_description>)2. 在第1步识别出的实体中,找出所有 *明确相关* 的实体对 (source_entity, target_entity)。对于每对相关实体,提取以下信息:- source_entity: 来源实体名称,使用原文中的中文名称- target_entity: 目标实体名称,使用原文中的中文名称- relationship_description: 说明为什么你认为这两个实体之间存在关联(请用中文描述)- relationship_strength: 表示两者关系强度的数值评分每个关系格式为:("relationship"{TUPLE_DELIMITER}<source_entity>{TUPLE_DELIMITER}<target_entity>{TUPLE_DELIMITER}<relationship_description>{TUPLE_DELIMITER}<relationship_strength>)3. 输出应为英文格式,但描述内容必须为中文,并将所有实体和关系按顺序组成一个列表,使用**{RECORD_DELIMITER}**作为记录分隔符。4. 完成后输出 {COMPLETION_DELIMITER}---重要规则--关系输出格式必须严格按照以下6个部分:("relationship"{TUPLE_DELIMITER}<source_entity>{TUPLE_DELIMITER}<target_entity>{TUPLE_DELIMITER}<relationship_description>{TUPLE_DELIMITER}<relationship_strength>)例如:("relationship"|张小凡|青云门|张小凡加入青云门学习修真|8)---真实数据--实体类型:{', '.join(entity_types)}输入:{text}输出:"""    return prompt
# 与本地大模型进行通信def call_local_llm(prompt: str, max_retries: int = 3) -> str:    payload = {        "model": LLM_MODEL,        "prompt": prompt,        "stream": False    }    for attempt in range(max_retries):        try:            print(f"正在调用LLM (尝试 {attempt + 1}/{max_retries})...")            response = requests.post(f"{OLLAMA_BASE_URL}/api/generate", json=payload, timeout=120)            response.raise_for_status()            raw_output = response.json()["response"].strip()            return raw_output        except requests.exceptions.Timeout:            print(f"超时 (尝试 {attempt + 1}/{max_retries})")        except requests.exceptions.RequestException as e:            print(f"网络错误: {e} (尝试 {attempt + 1}/{max_retries})")        except KeyError:            print(f"LLM返回格式错误 (尝试 {attempt + 1}/{max_retries})")        except Exception as e:            print(f"调用失败: {e} (尝试 {attempt + 1}/{max_retries})")        if attempt < max_retries - 1:            import time            time.sleep(2 ** attempt)    print("调用 LLM 达到最大重试次数,返回空字符串。")    return ""
# 解析 LLM 返回的结构化输出,提取实体和关系数据def parse_extraction_output(output: str) -> Dict[str, List[Dict]]:    entities = []    relationships = []    # 移除<think>部分    think_start = output.find("<think>")    think_end = output.find("</think>") + len("</think>") if output.find("</think>") != -1 else -1    if think_start != -1 and think_end > think_start:        think_content = output[think_start:think_end]        output = output.replace(think_content, "").strip()    # 移除完成标记    output = output.replace(COMPLETION_DELIMITER, "").strip()    # 分割记录    records = [r.strip() for r in output.split(RECORD_DELIMITER) if r.strip()]    # 保存有效记录    valid_records = []    for i, record in enumerate(records):        if record.startswith('("entity"') or record.startswith('("relationship"'):            valid_records.append(record)    for record in valid_records:        try:            if record.startswith('("entity"'):                # 移除开头的 ("entity" 和结尾的 )                clean_record = record.replace('("entity"', '', 1).rstrip(')').strip()                parts = [p.strip() for p in clean_record.split(TUPLE_DELIMITER)]                if len(parts) >= 4:                    # 移除引号                    name = parts[1].strip('"')                    entity_type = parts[2].strip('"')                    description = parts[3].strip('"')                    entity = {                        "name": name,                        "type": entity_type,                        "description": description                    }                    entities.append(entity)            elif record.startswith('("relationship"'):                print(f"解析关系记录: {record[:100]}...")                clean_record = record.replace('("relationship"', '', 1).rstrip(')').strip()                parts = [p.strip() for p in clean_record.split(TUPLE_DELIMITER)]                print(f"关系分割部分数量: {len(parts)}")                if len(parts) >= 5:                    # 安全地解析强度                    strength_str = parts[4].strip('"').rstrip(')')                    try:                        strength = float(strength_str)                    except ValueError:                        print(f"无法解析关系强度 '{strength_str}', 默认为 0.0")                        strength = 0.0                    # 移除引号                    source = parts[1].strip('"')                    target = parts[2].strip('"')                    description = parts[3].strip('"')                    relationship = {                        "source": source,                        "target": target,                        "description": description,                        "strength": strength                    }                    relationships.append(relationship)        except Exception as e:            print(f"解析记录时出错: {e}. 跳过记录")                return {"entities": entities, "relationships": relationships}

文本处理

为了提升整体的颗粒度,让提取到的内容更加完善,进行实体提取的基本单位是在章节的基础上,又进行了一次分割。基于行又将一个章节分成了三份

# 每一章的内容太多 再次进出上有进行了一次分片 将一章分成三块同时不破坏其语义def split_chapter_into_three_parts(content: str) -> List[str]:    """    将章节内容按行分成三个大致相等的部分    """    if not content.strip():        return []    lines = content.split('\n')    total_lines = len(lines)    if total_lines <= 3:        return [content]  # 如果行数太少,直接返回整个内容    # 计算每部分的行数    lines_per_part = total_lines // 3    remainder = total_lines % 3    parts = []    start_idx = 0    for i in range(3):        # 为前 remainder 个部分各多分配一行        end_idx = start_idx + lines_per_part + (1 if i < remainder else 0)                if start_idx < total_lines:            part_lines = lines[start_idx:end_idx]            part_content = '\n'.join(part_lines).strip()                        if part_content:  # 只添加非空部分                parts.append(part_content)                        start_idx = end_idx    return parts

大模型的输出经常出现了问题,添加了适当的重试机制。每一份允许重试三次。

# 处理单个文本片段,提取其中信息,包含重试机制处理单个文本片段,包含重试机制def process_text_fragment_with_retry(fragment: str, max_retries: int = 3) -> Dict[str, List[Dict]]:    for attempt in range(max_retries):        try:            # 创建抽取提示            prompt = create_extraction_prompt(ENTITY_TYPES, fragment)               # 调用LLM            output = call_local_llm(prompt)            if not output:                print(f"LLM返回空输出 (尝试 {attempt + 1}/{max_retries})")                continue            # 解析输出            result = parse_extraction_output(output)                        if result and (result.get('entities') or result.get('relationships')):                return result            else:                print(f"解析结果为空 (尝试 {attempt + 1}/{max_retries})")                        except Exception as e:            print(f"处理片段失败 (尝试 {attempt + 1}/{max_retries}): {e}")                    if attempt < max_retries - 1:            print(f"等待 {2 ** attempt} 秒后重试...")            time.sleep(2 ** attempt)        print("❌ 处理片段达到最大重试次数")    return {"entities": [], "relationships": []}

数据缓存

# 将处理结果缓存到内存中,按章节和实体名称组织def cache_chapter_data(chapter_title: str, entities: List[Dict], relationships: List[Dict], fragment_index: int):    """    缓存章节数据到内存中,按实体名称聚合    """    if chapter_title not in chapter_data_cache:        chapter_data_cache[chapter_title] = {            "entities": {},      # 按实体名称聚合            "relationships": []  # 关系列表        }        # 缓存实体(按名称聚合描述)    for entity in entities:        entity_name = entity["name"]        if entity_name not in chapter_data_cache[chapter_title]["entities"]:            chapter_data_cache[chapter_title]["entities"][entity_name] = {                "type": entity["type"],                "descriptions": []            }                # 将当前描述添加到列表中(后续会融合)        chapter_data_cache[chapter_title]["entities"][entity_name]["descriptions"].append(entity["description"])        # 缓存关系(标记来源片段)    for relationship in relationships:        chapter_data_cache[chapter_title]["relationships"].append({            "relationship": relationship,            "fragment_index": fragment_index        })

融合

首先是新增章节节点:故事的发展应该是存在时间关系的,例如:人物性格的变化的,所以新增了具体的章节节点。由于对章节继续进行了分块,对于一个章节,大模型可能生成对于人物的多个描述,两个人物之间可能存在多种短息。因此使用它独立的描述和关系节点对信息进行保存,对同一章节针对相同对象的关系和描述进行了融合。将向数据库插入节点的时间从每一块的结束,修改成每一章的结束统一处理。

# 融合描述ef merge_entity_descriptions(descriptions: List[str], entity_name: str, chapter_title: str) -> str:    """    使用LLM融合实体的多个描述,保持简洁风格,自动去除<think>部分    """    if not descriptions:        return ""    # 先清理所有描述中的<think>部分    cleaned_descriptions = [clean_description(desc) for desc in descriptions]    # 过滤掉空描述    cleaned_descriptions = [desc for desc in cleaned_descriptions if desc.strip()]    if not cleaned_descriptions:        return ""    if len(cleaned_descriptions) == 1:        return cleaned_descriptions[0]    prompt = f"""请将以下关于实体"{entity_name}"在章节"{chapter_title}"中的多个描述压缩融合成一个简洁、完整的描述:描述列表:{chr(10).join([f"{i + 1}. {desc}" for i, desc in enumerate(cleaned_descriptions)])}要求:1. 保持信息完整性2. 去除重复内容3. 语言简洁明了,保持原文风格4. 不要添加额外的分析或解释5. 只返回融合后的描述,不要添加其他说明"""    try:        payload = {            "model": LLM_MODEL,            "prompt": prompt,            "stream": False        }        response = requests.post(f"{OLLAMA_BASE_URL}/api/generate", json=payload, timeout=120)        response.raise_for_status()        merged_description = response.json()["response"].strip()        # 再次清理可能产生的<think>部分        merged_description = clean_description(merged_description)        return merged_description    except Exception as e:        print(f"融合失败: {e},返回第一个清理后的描述")        return cleaned_descriptions[0] if cleaned_descriptions else ""
# 融合关系描述def merge_relationship_descriptions(relationships: List[Dict], source_entity: str, target_entity: str,                                    chapter_title: str) -> Dict:    """    使用LLM融合同一实体对的多个关系描述,保持顺序信息    """    if not relationships:        return None    # 按片段顺序排序    sorted_relationships = sorted(relationships, key=lambda x: x.get('fragment_index', 0))    # 提取关系描述列表    relationship_descriptions = [rel['relationship']['description'] for rel in sorted_relationships]    # 计算平均强度    avg_strength = sum(rel['relationship']['strength'] for rel in sorted_relationships) / len(sorted_relationships)    # 如果只有一个关系,直接返回    if len(relationship_descriptions) == 1:        return {            "description": relationship_descriptions[0],            "strength": avg_strength        }    # 构建提示词    prompt = f"""请将以下关于实体"{source_entity}"和"{target_entity}"在章节"{chapter_title}"中的多个关系描述压缩融合成一个简洁、完整的关系描述:关系描述列表(按出现顺序):{chr(10).join([f"{i + 1}. {desc}" for i, desc in enumerate(relationship_descriptions)])}要求:1. 保持信息完整性,包含所有重要的关系信息2. 去除重复内容3. 语言简洁明了,保持原文风格4. 按时间或逻辑顺序组织信息5. 只返回融合后的描述,不要添加其他说明"""    try:        payload = {            "model": LLM_MODEL,            "prompt": prompt,            "stream": False        }        response = requests.post(f"{OLLAMA_BASE_URL}/api/generate", json=payload, timeout=120)        response.raise_for_status()        merged_description = response.json()["response"].strip()        # 清理可能产生的<think>部分        merged_description = clean_description(merged_description)        return {            "description": merged_description,            "strength": avg_strength        }    except Exception as e:        print(f"融合关系描述失败: {e},返回第一个描述和平均强度")        return {            "description": relationship_descriptions[0] if relationship_descriptions else "",            "strength": avg_strength        }

数据库交互

def process_chapter_and_write_to_neo4j(chapter_title: str):    """    处理缓存的章节数据并写入Neo4j,增加关系融合功能    """    if chapter_title not in chapter_data_cache:        print(f"章节 '{chapter_title}' 没有缓存数据")        return    print(f"开始处理章节 '{chapter_title}' 的数据...")    chapter_data = chapter_data_cache[chapter_title]    entities_data = chapter_data["entities"]    relationships_data = chapter_data["relationships"]    try:        with neo4j_driver.session() as session:            # 1. 确保章节节点存在            session.run("MERGE (c:CHAPTER {title: $title})", title=chapter_title)            # 2. 处理实体 - 融合描述并创建节点及描述            for entity_name, entity_info in entities_data.items():                # 融合当前章节的多个片段描述                merged_description = merge_entity_descriptions(                    entity_info["descriptions"], entity_name, chapter_title                )                # 为描述创建唯一ID                content_hash = hashlib.md5(merged_description.encode('utf-8')).hexdigest()                desc_id = f"{entity_name}_{chapter_title}_{content_hash}"                # 创建或更新实体节点                session.run(                    """                    MERGE (e:Entity {name: $name})                    ON CREATE SET                         e.type = $entity_type,                        e.first_seen_chapter = $chapter_title                    ON MATCH SET                        e.type = $entity_type                    """,                    name=entity_name,                    entity_type=entity_info["type"],                    chapter_title=chapter_title                )                # 创建或合并描述节点(不包含extracted_at属性)                session.run(                    """                    MERGE (d:DESCRIPTION {id: $desc_id})                    ON CREATE SET                         d.content = $content,                        d.content_hash = $content_hash                    """,                    desc_id=desc_id,                    content=merged_description,                    content_hash=content_hash                )                # 连接实体和描述                session.run(                    """                    MATCH (e:Entity {name: $entity_name})                    MATCH (d:DESCRIPTION {id: $desc_id})                    MERGE (e)-[:HAS_DESCRIPTION]->(d)                    """,                    entity_name=entity_name,                    desc_id=desc_id                )                # 连接描述和章节                session.run(                    """                    MATCH (d:DESCRIPTION {id: $desc_id})                    MATCH (c:CHAPTER {title: $chapter_title})                    MERGE (d)-[:FROM_CHAPTER]->(c)                    """,                    desc_id=desc_id,                    chapter_title=chapter_title                )                print(f"  实体 '{entity_name}' 的章节描述节点已创建")            # 3. 处理关系 - 先分组再融合            # 按实体对分组关系            relationship_groups = {}            for rel_item in relationships_data:                rel = rel_item['relationship']                source = rel['source']                target = rel['target']                # 使用排序后的实体对作为键,确保 A->B 和 B->A 被视为同一组                entity_pair = tuple(sorted([source, target]))                pair_key = f"{entity_pair[0]}_{entity_pair[1]}"                if pair_key not in relationship_groups:                    relationship_groups[pair_key] = {                        'source': source,                        'target': target,                        'relationships': []                    }                relationship_groups[pair_key]['relationships'].append(rel_item)            # 处理每个实体对的关系组            for group_key, group_data in relationship_groups.items():                source_entity = group_data['source']                target_entity = group_data['target']                group_relationships = group_data['relationships']                # 融合该实体对的所有关系                merged_relationship = merge_relationship_descriptions(                    group_relationships, source_entity, target_entity, chapter_title                )                if merged_relationship and merged_relationship['description'].strip():                    # 为融合后的关系实例创建一个节点来存储上下文信息                    rel_content = f"{source_entity}_{target_entity}_{merged_relationship['description']}"                    rel_hash = hashlib.md5(rel_content.encode('utf-8')).hexdigest()                    rel_instance_id = f"{rel_hash}_{chapter_title}"                    result = session.run(                        """                        MERGE (ri:RELATIONSHIP_INSTANCE {id: $rel_id})                        ON CREATE SET                             ri.description = $description,                             ri.strength = $strength                        RETURN ri.id AS rel_instance_id                        """,                        rel_id=rel_instance_id,                        description=merged_relationship['description'],                        strength=merged_relationship['strength']                    )                    rel_instance_node_id = result.single()["rel_instance_id"]                    # 连接源实体 -> 关系实例                    session.run(                        """                        MATCH (a:Entity {name: $source})                        MATCH (ri:RELATIONSHIP_INSTANCE {id: $rel_instance_id})                        MERGE (a)-[:IS_SOURCE_OF]->(ri)                        """,                        source=source_entity,                        rel_instance_id=rel_instance_node_id                    )                    # 连接关系实例 -> 目标实体                    session.run(                        """                        MATCH (b:Entity {name: $target})                        MATCH (ri:RELATIONSHIP_INSTANCE {id: $rel_instance_id})                        MERGE (ri)-[:POINTS_TO]->(b)                        """,                        target=target_entity,                        rel_instance_id=rel_instance_node_id                    )                    # 连接关系实例 -> 章节                    session.run(                        """                        MATCH (ri:RELATIONSHIP_INSTANCE {id: $rel_instance_id})                        MATCH (c:CHAPTER {title: $chapter_title})                        MERGE (ri)-[:FROM_CHAPTER]->(c)                        """,                        rel_instance_id=rel_instance_node_id,                        chapter_title=chapter_title                    )                    print(f"  关系实例已创建并融合: {source_entity} -> {target_entity}")            print(f"章节 '{chapter_title}' 数据处理完成")            # 清理缓存            del chapter_data_cache[chapter_title]            print(f"章节 '{chapter_title}' 缓存已清理")    except Exception as e:        print(f"处理章节 '{chapter_title}' 数据失败: {e}")        raise
def create_constraints():    """    为实体节点创建唯一性约束,防止重复。    """    with neo4j_driver.session() as session:        session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (e:Entity) REQUIRE e.name IS UNIQUE")        session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (c:CHAPTER) REQUIRE c.title IS UNIQUE")        session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (d:DESCRIPTION) REQUIRE d.id IS UNIQUE")        session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (ri:RELATIONSHIP_INSTANCE) REQUIRE ri.id IS UNIQUE")    print("唯一性约束创建完成")

7.5 相关统计数据

节点展示

# 展示所有节点MATCH (n)-[r]->(m)RETURN n, r, m

会有数量限制 没法完全展示

统计量获取

def get_database_statistics():    """    获取数据库统计信息    """    try:        # 统计所有节点总数        node_count_result = driver.execute_query(            "MATCH (n) RETURN count(n) AS total_nodes"        )        total_nodes = node_count_result[0][0]['total_nodes'] if node_count_result[0] else 0        # 统计Entity节点总数        entity_count_result = driver.execute_query(            "MATCH (e:Entity) RETURN count(e) AS entity_nodes"        )        entity_nodes = entity_count_result[0][0]['entity_nodes'] if entity_count_result[0] else 0        # 统计人类节点数量(PERSON类型)        person_count_result = driver.execute_query(            "MATCH (e:Entity {type: 'PERSON'}) RETURN count(e) AS person_nodes"        )        person_nodes = person_count_result[0][0]['person_nodes'] if person_count_result[0] else 0        # 统计各类型实体数量        type_count_result = driver.execute_query(            """            MATCH (e:Entity)            RETURN e.type AS entity_type, count(e) AS count            ORDER BY count DESC            """        )        entity_types = {}        if type_count_result[0]:            for record in type_count_result[0]:                entity_types[record['entity_type']] = record['count']        # 统计关系总数        relationship_count_result = driver.execute_query(            "MATCH ()-[r]->() RETURN count(r) AS total_relationships"        )        total_relationships = relationship_count_result[0][0]['total_relationships'] if relationship_count_result[0] else 0        # 统计章节节点数量        chapter_count_result = driver.execute_query(            "MATCH (c:CHAPTER) RETURN count(c) AS chapter_nodes"        )        chapter_nodes = chapter_count_result[0][0]['chapter_nodes'] if chapter_count_result[0] else 0        return {            "total_nodes": total_nodes,            "entity_nodes": entity_nodes,            "person_nodes": person_nodes,            "entity_types": entity_types,            "total_relationships": total_relationships,            "chapter_nodes": chapter_nodes        }    except Exception as e:        print(f"查询数据库时出错: {e}")        return Nonedef get_top_person_entities(limit=10):    """    获取前N个人类实体及其描述数量    """    try:        # 使用传统的session方式来确保参数正确传递        with driver.session() as session:            result = session.run(                """                MATCH (e:Entity {type: $type})                OPTIONAL MATCH (e)-[:HAS_DESCRIPTION]->(d:DESCRIPTION)                RETURN e.name AS name, count(d) AS description_count                ORDER BY description_count DESC                LIMIT $limit                """,                type="PERSON",                limit=limit            )            top_persons = []            for record in result:                top_persons.append({                    "name": record['name'],                    "description_count": record['description_count']                })            return top_persons    except Exception as e:        print(f"查询人类实体时出错: {e}")        return []def print_statistics():    """    打印统计信息    """    stats = get_database_statistics()    if not stats:        print("无法获取统计信息")        return    print("\n各类型实体统计:")    for entity_type, count in stats['entity_types'].items():        print(f"  {entity_type}: {count:,}")    # 获取前10个人类实体    print("\n前10个人类实体:")    top_persons = get_top_person_entities(10)    if top_persons:        for i, person in enumerate(top_persons, 1):            print(f"  {i:2d}. {person['name']:<20} ({person['description_count']} 个描述)")    else:        print("  暂无数据")if __name__ == "__main__":    print_statistics()
# 注 只跑了一个多小时 当前处于小说的中间部分各类型实体统计:  PERSON: 63  ARTIFACT: 57  LOCATION: 42  EVENT: 26  CULTIVATION_SKILL: 25  FACTION: 15  CREATURE: 1210个人类实体:   1. 张小凡                  (33 个描述)   2. 田不易                  (26 个描述)   3. 田灵儿                  (24 个描述)   4. 苏茹                   (17 个描述)   5. 宋大仁                  (17 个描述)   6. 道玄真人                 (16 个描述)   7. 林惊羽                  (15 个描述)   8. 齐昊                   (15 个描述)   9. 杜必书                  (13 个描述)  10. 苍松道人                 (12 个描述)

数据应用

注意:当前的查询语句是我们自己编写的,后续我们将实现让大模型自己编写查询语句。

def query_relationships_between_characters(char1, char2):    """    查询两个角色之间的所有关系    """    try:        with driver.session() as session:            result = session.run(                """                MATCH (a:Entity {name: $char1})-[:IS_SOURCE_OF]->(ri:RELATIONSHIP_INSTANCE)-[:POINTS_TO]->(b:Entity {name: $char2})                MATCH (ri)-[:FROM_CHAPTER]->(c:CHAPTER)                RETURN c.title AS chapter, ri.description AS description, ri.strength AS strength                UNION ALL                MATCH (a:Entity {name: $char2})-[:IS_SOURCE_OF]->(ri:RELATIONSHIP_INSTANCE)-[:POINTS_TO]->(b:Entity {name: $char1})                MATCH (ri)-[:FROM_CHAPTER]->(c:CHAPTER)                RETURN c.title AS chapter, ri.description AS description, ri.strength AS strength                ORDER BY chapter                """,                char1=char1,                char2=char2            )            relationships = []            for record in result:                relationships.append({                    "chapter": record["chapter"],                    "description": record["description"],                    "strength": record["strength"]                })            return relationships    except Exception as e:        return []def format_relationships_display(relationships):    """    格式化显示关系数据    """    if not relationships:        return "未找到关系数据"    output = f"共找到 {len(relationships)} 条关系记录:\n"    output += "=" * 80 + "\n"    for i, rel in enumerate(relationships, 1):        output += f"{i:2d}. 章节: {rel['chapter']}\n"        output += f"    描述: {rel['description']}\n"        output += f"    强度: {rel['strength']}\n"        output += "-" * 60 + "\n"    return outputdef create_analysis_prompt(character1, character2, relationships):    """    创建详细的关系分析提示词    """    if not relationships:        return None    relationship_texts = []    for rel in relationships:        relationship_texts.append(f"章节 {rel['chapter']}: {rel['description']} (关系强度: {rel['strength']})")    prompt = f"""作为一名资深的文学分析师和关系研究专家,请对以下"{character1}"和"{character2}"的人物关系进行深度分析。请按照以下结构进行详细分析:1. **关系性质分析**   - 核心关系类型(友情、师徒、敌对、爱情、亲情等)   - 关系的基本特征和本质属性   - 双方在关系中的角色定位2. **关系发展历程**   - 初始阶段:关系的起点和建立背景   - 发展阶段:关键转折点和重要事件   - 当前状态:关系的现状和特点   - 时间线梳理:按时间顺序的关系演变3. **关键事件影响分析**   - 对关系产生重大影响的具体事件   - 每个事件对双方心理和行为的影响   - 事件间的因果关系和连锁反应4. **深层动机与心理分析**   - 双方维持或改变关系的内在动机   - 个人性格、价值观对关系的影响   - 外部环境和势力对关系的干预5. **文学意义与主题价值**   - 这段关系在整体故事中的作用   - 对主题表达和情节推进的贡献   - 对其他角色和故事线的影响   - 在文学创作上的典型意义6. **关系强度评估**   - 基于提供数据的强度分析   - 关系的稳定性和可持续性   - 可能的发展趋势预测请确保分析具有深度和洞察力,结合具体事例进行论证,避免空泛的表述。关系数据详情:{chr(10).join(relationship_texts)}请提供专业、详细且有深度的分析报告。"""    return promptdef call_local_llm(prompt):    """    调用本地大模型    """    try:        payload = {            "model": LLM_MODEL,            "prompt": prompt,            "stream": False,            "temperature": 0.7,            "top_p": 0.9,            "repeat_penalty": 1.1        }        print("正在调用大模型进行深度分析...")        response = requests.post(f"{OLLAMA_BASE_URL}/api/generate", json=payload, timeout=300)        response.raise_for_status()        result = response.json()        return result.get("response", "").strip()    except Exception as e:        print(f"大模型调用失败: {e}")        return Nonedef analyze_character_relationships(character1, character2):    """    分析两个角色之间的关系    """    print(f"正在查询 {character1}{character2} 的关系数据...")    relationships = query_relationships_between_characters(character1, character2)    # 格式化显示检索到的关系    print("\n检索到的关系数据:")    print(format_relationships_display(relationships))    if not relationships:        print("未找到关系数据,无法进行分析")        return    # 创建详细分析提示词    prompt = create_analysis_prompt(character1, character2, relationships)    if not prompt:        print("无法构建分析提示词")        return    # 调用大模型进行分析    analysis_result = call_local_llm(prompt)    if analysis_result:        print("\n大模型深度分析结果:")        print("=" * 100)        print(analysis_result)        print("=" * 100)    else:        print("大模型分析失败")def main():    """    主函数    """    try:        print("连接数据库...")        with driver.session() as session:            session.run("RETURN 1")        print("数据库连接成功\n")        analyze_character_relationships("林惊羽", "张小凡")    except Exception as e:        print(f"程序执行出错: {e}")    finally:        driver.close()if __name__ == "__main__":    main()
/root/anaconda3/bin/conda run -n KGRAG --no-capture-output python /data/KGRAG/test.py 找到 11 条关系记录:1. 第26章 自尊   林惊羽为张小凡旧友,同为青云门弟子 (强度: 7.0)2. 第20章 魔踪   林惊羽与张小凡有短暂对话 (强度: 6.0)3. 第14章 神通   两人因草庙村背景结为好友,林惊羽为张小凡求情 (强度: 8.0)4. 第13章 奇才   林惊羽与张小凡为老友旧识 (强度: 7.0)5. 第15章 私传   张小凡被田灵儿要求以林惊羽为目标,二人存在潜在的竞争关系 (强度: 5.0)6. 第18章 怒兽   张小凡与林惊羽自幼为好友,后成为同门,并共同参加七脉会武,互动频繁且情谊深厚。 (强度: 9.0)7. 第19章 抽签   林惊羽主动与张小凡打招呼后,两人同行并讨论王二叔的下落,随后参与抽签。 (强度: 7.5)8. 第1章 青云   张小凡与林惊羽因意气之争发生肢体冲突,林惊羽试图扼住张小凡的脖子 (强度: 8.0)9. 第24章 意外   林惊羽是张小凡在回忆中提及的同门师兄弟,曾与他共度危险夜晚;在第24章"意外"中,二人展开对话并比试。 (强度: 8.0)10. 第4章 惊变   张小凡与林惊羽自幼为玩伴,曾共同经历草庙村屠杀事件,后一同被带至青云门,此后常有互动和对话。 (强度: 9.0)11. 第5章 入门   张小凡与林惊羽为草庙村同伴,一同被青云门带入 (强度: 9.0)正在分析...分析结果:<think>嗯,用户让我分析《诛仙》里林惊羽和张小凡的关系。首先,我需要仔细看看提供的章节数据。这些数据提到了他们从童年到成为青云门弟子的过程,还有竞争和合作的情况。用户的问题有四个部分:关系性质、发展变化、关键事件和意义。首先,关系性质应该是友情,但可能带有竞争。他们小时候是玩伴,一起经历草庙村的事件,后来成为同门。不过,张小凡被田灵儿要求以林惊羽为目标,这说明有竞争关系。同时,林惊羽多次帮助张小凡,比如第14章求情,所以友情为主,但竞争存在。接下来是关系的发展变化。他们从童年好友开始,但进入青云门后可能因为资源或地位产生竞争。比如第15章提到的竞争关系,强度5.0,可能说明竞争加剧。不过,他们仍然有合作,比如共同参加七脉会武,互动频繁。需要看看数据中是否有时间线的变化,比如早期的友情,后期的冲突和合作。关键事件应该包括草庙村屠杀,这是他们关系的起点。然后是入门青云门,七脉会武,以及张小凡被要求以林惊羽为目标。还有第1章的肢体冲突,可能显示他们之间的紧张关系。另外,林惊羽为张小凡求情,说明互相支持。关系的意义方面,可能反映张小凡的成长,以及青云门内部的竞争。林惊羽作为对照,可能突出张小凡的某些特质,比如坚韧或善良。同时,他们的关系展示了友情在竞争环境中的考验。需要确认是否有其他因素,比如师徒关系?但数据里没有提到,所以应该是同门。敌对关系可能在某些阶段出现,但整体还是友情。要注意用户提供的强度评分,比如第1章冲突强度8.0,而合作章节如第4章强度9.0,说明虽然有冲突,但友情更主导。可能用户希望了解两人关系的复杂性,以及这种关系对故事发展的影响。需要综合各个章节的数据,整理出时间线和发展脉络。还要注意是否有遗漏的关键事件,比如第24章的比试,可能影响他们的关系动态。总结的话,关系性质是友情加竞争,发展从童年到同门,经历合作与竞争,关键事件包括草庙村、入门、七脉会武和比试,意义在于展示成长和人性。</think>### 林惊羽与张小凡的关系分析#### 1. **关系性质**  **核心性质:复杂友情 + 潜在竞争**  - **友情基础**:两人自幼为草庙村邻居(第4章、第5章),共同经历草庙村屠杀事件(第14章),并一同被青云门收入门下(第5章)。早期互动以信任、互助为主,如林惊羽为张小凡求情(第14章)、共同参加七脉会武(第18章)。  - **竞争关系**:青云门内资源分配(如第15章田灵儿要求张小凡以林惊羽为追赶目标)和修炼天赋差异(林惊羽被视作奇才,第13章)导致潜在竞争。但竞争未完全掩盖友情,如第24章比试后仍保持对话。  - **矛盾性**:虽偶有冲突(如第1章肢体冲突、第26章自尊之争),但未发展为彻底敌对,更多是性格和立场的碰撞。---#### 2. **关系发展变化**  **阶段一:童年至入门前(纯真友情)**  - 草庙村时期是两人关系的起点(第4章、第5章),共同经历灾难(第14章)加深羁绊。  - 互动以玩伴式信任为主,如第4章提到“自幼为玩伴”。  **阶段二:青云门同门初期(友情与竞争并存)**  - 入门后因修炼资源分配(第15章)和天赋差异(第13章)产生竞争,但合作仍占主导(如第18章共同参与七脉会武)。  - 林惊羽多次主动维护张小凡(第14章求情、第19章同行),体现友情优先。  **阶段三:成长中的冲突与疏离(矛盾激化)**  - 随着张小凡身世秘密(鬼王宗血脉)和林惊羽修炼受阻(第24章比试失利),两人立场逐渐分化。  - 第1章的肢体冲突和第26章的“自尊”事件,反映竞争对友情的冲击,但未彻底破裂。---#### 3. **影响关系的关键事件**  - **草庙村屠杀(第4章、第14章)**:两人共同创伤的起点,奠定彼此命运关联。  - **青云门入门(第5章)**:将童年友情转化为门派竞争关系,埋下矛盾种子。  - **七脉会武合作(第18章)**:短暂淡化竞争,强化同门情谊。  - **田灵儿的“追赶目标”要求(第15章)**:明确化竞争压力,成为关系转折点。  - **第24章比试与对话**:暴露修炼差距和性格冲突,标志友情开始松动。  - **第1章的意气之争**:早期冲突预示两人性格差异,为后期矛盾埋下伏笔。---#### 4. **这段关系的意义**  - **对张小凡的意义**:    林惊羽是张小凡成长的“对照镜”——既是他努力追赶的目标(第15章),也是揭露其身世秘密的线索(如第24章比试中张小凡的异常表现)。友情中的竞争迫使张小凡直面自身缺陷,加速其成长。  - **对林惊羽的意义**:    张小凡的存在既激发了他的自尊心(第26章),也因自身天赋局限(第13章)暗藏危机感。两人关系折射出青云门“优胜劣汰”体系下同门的复杂生态。  - **对主题的深化**:    两人关系展现了“友情与竞争”的辩证:即使在激烈竞争中,人性中的善意(如林惊羽的多次援手)仍能维系羁绊。同时,关系的裂痕也暗示了青云门“正道”体系对人性的压抑与扭曲。---### 总结  林惊羽与张小凡的关系是《诛仙》中“成长与命运”主题的缩影。他们从纯真友情出发,在门派竞争和命运捉弄中逐渐分化,既互相成就,也彼此伤害。这种复杂关系不仅推动人物成长,更揭示了权力体系下人性的挣扎与选择。

附录 详细代码

图谱生成

import requestsimport refrom typing import List, Dictimport jsonimport osimport signalimport timefrom neo4j import GraphDatabaseimport hashlibOLLAMA_BASE_URL = "http://localhost:11434"LLM_MODEL = "qwen3:32b"# Neo4j 数据库连接配置NEO4J_URI = "bolt://localhost:7687"NEO4J_USER = "neo4j"NEO4J_PASSWORD = "你的密码"# 进度文件路径PROGRESS_FILE = "processing_progress.json"# 初始化 Neo4j 驱动neo4j_driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))# 定义实体类型ENTITY_TYPES = [    "PERSON",    "FACTION",    "LOCATION",    "ARTIFACT",    "CULTIVATION_SKILL",    "EVENT",    "CREATURE"]# 定义分隔符TUPLE_DELIMITER = "|"RECORD_DELIMITER = "##"COMPLETION_DELIMITER = "<<COMPLETE>>"# 全局缓存章节数据chapter_data_cache = {}# 全局变量用于优雅退出shutdown_requested = False# 进度管理模块def load_processing_progress():    """    加载处理进度    """    if os.path.exists(PROGRESS_FILE):        try:            with open(PROGRESS_FILE, 'r', encoding='utf-8') as f:                return json.load(f)        except Exception as e:            print(f"读取进度文件失败: {e},使用默认配置")    # 默认进度配置    return {        "last_processed_index": 0,        "completed_chapters": [],        "failed_chapters": {},        "processing_status": {},        "total_chapters": 0,        "start_time": time.strftime("%Y-%m-%d %H:%M:%S"),        "batch_size": 10    }def save_processing_progress(progress_data):    """    保存处理进度    """    try:        with open(PROGRESS_FILE, 'w', encoding='utf-8') as f:            json.dump(progress_data, f, ensure_ascii=False, indent=2)        print("进度已保存")    except Exception as e:        print(f"保存进度文件失败: {e}")def mark_chapter_as_processing(progress_data, chapter_title):    """    标记章节为处理中    """    progress_data["processing_status"][chapter_title] = {        "status": "processing",        "start_time": time.strftime("%Y-%m-%d %H:%M:%S")    }    save_processing_progress(progress_data)def mark_chapter_as_completed(progress_data, chapter_title):    """    标记章节为已完成    """    # 从处理状态中移除    if chapter_title in progress_data["processing_status"]:        del progress_data["processing_status"][chapter_title]    # 添加到已完成列表    if chapter_title not in progress_data["completed_chapters"]:        progress_data["completed_chapters"].append(chapter_title)    # 从失败列表中移除(如果存在)    if chapter_title in progress_data["failed_chapters"]:        del progress_data["failed_chapters"][chapter_title]    save_processing_progress(progress_data)def mark_chapter_as_failed(progress_data, chapter_title, error_msg, retry_count=0):    """    标记章节为失败    """    # 从处理状态中移除    if chapter_title in progress_data["processing_status"]:        del progress_data["processing_status"][chapter_title]    # 添加到失败列表    progress_data["failed_chapters"][chapter_title] = {        "error": error_msg,        "retry_count": retry_count,        "timestamp": time.strftime("%Y-%m-%d %H:%M:%S")    }    save_processing_progress(progress_data)def get_chapters_to_process(all_chapters, progress_data, batch_size=None, retry_failed=True):    """    智能获取需要处理的章节列表    Args:        all_chapters: 所有章节列表        progress_data: 进度数据        batch_size: 批处理大小,None表示处理所有章节        retry_failed: 是否重试失败章节    """    if batch_size is None:        batch_size = len(all_chapters)    chapters_to_process = []    completed_set = set(progress_data["completed_chapters"])    failed_chapters = progress_data["failed_chapters"]  # 直接使用字典,避免创建冗余集合    # 如果选择重试失败章节    if retry_failed and failed_chapters:        print("重试失败章节:")        for chapter_title, fail_info in failed_chapters.items():  # 直接遍历字典items,避免冗余访问            retry_count = fail_info.get("retry_count", 0)            if retry_count < 3:  # 最多重试3次                chapter_info = next((ch for ch in all_chapters if ch['title'] == chapter_title), None)                if chapter_info:                    chapters_to_process.append((chapter_info, retry_count + 1))                    print(f"  - {chapter_title} (第 {retry_count + 1} 次重试)")    # 添加未处理的章节    if len(chapters_to_process) < batch_size:        remaining_slots = batch_size - len(chapters_to_process)        added_count = 0        # 移除未使用的 i 变量,使代码更简洁        for chapter in all_chapters:            if added_count >= remaining_slots:                break            # 直接使用 failed_chapters 进行检查            if chapter['title'] not in completed_set and chapter['title'] not in failed_chapters:                chapters_to_process.append((chapter, 0))  # retry_count = 0                added_count += 1    return chapters_to_process# 信号处理(安全退出)def signal_handler(sig, frame):    global shutdown_requested    print('安全退出')    shutdown_requested = True# 生成GraphRAG抽取提示词def create_extraction_prompt(entity_types: List[str], text: str) -> str:    prompt = f"""---目标--给定一个可能与该活动相关的文本文档和一组实体类型,请从文本中识别出这些类型的全部实体以及它们之间的全部关系。---步骤--1. 识别所有实体。对于每个识别出的实体,提取以下信息:- entity_name: 实体名称,使用原文中的中文名称- entity_type: 实体类型,必须是以下之一:[{', '.join(entity_types)}]- entity_description: 对实体属性和行为的详细描述(请用中文描述)每个实体格式为:("entity"{TUPLE_DELIMITER}<entity_name>{TUPLE_DELIMITER}<entity_type>{TUPLE_DELIMITER}<entity_description>)2. 在第1步识别出的实体中,找出所有 *明确相关* 的实体对 (source_entity, target_entity)。对于每对相关实体,提取以下信息:- source_entity: 来源实体名称,使用原文中的中文名称- target_entity: 目标实体名称,使用原文中的中文名称- relationship_description: 说明为什么你认为这两个实体之间存在关联(请用中文描述)- relationship_strength: 表示两者关系强度的数值评分每个关系格式为:("relationship"{TUPLE_DELIMITER}<source_entity>{TUPLE_DELIMITER}<target_entity>{TUPLE_DELIMITER}<relationship_description>{TUPLE_DELIMITER}<relationship_strength>)3. 输出应为英文格式,但描述内容必须为中文,并将所有实体和关系按顺序组成一个列表,使用**{RECORD_DELIMITER}**作为记录分隔符。4. 完成后输出 {COMPLETION_DELIMITER}---重要规则--关系输出格式必须严格按照以下6个部分:("relationship"{TUPLE_DELIMITER}<source_entity>{TUPLE_DELIMITER}<target_entity>{TUPLE_DELIMITER}<relationship_description>{TUPLE_DELIMITER}<relationship_strength>)例如:("relationship"|张小凡|青云门|张小凡加入青云门学习修真|8)---真实数据--实体类型:{', '.join(entity_types)}输入:{text}输出:"""    return prompt# 调用本地大模型def call_local_llm(prompt: str, max_retries: int = 3) -> str:    payload = {        "model": LLM_MODEL,        "prompt": prompt,        "stream": False    }    for attempt in range(max_retries):        try:            print(f"正在调用LLM (尝试 {attempt + 1}/{max_retries})...")            response = requests.post(f"{OLLAMA_BASE_URL}/api/generate", json=payload, timeout=120)            response.raise_for_status()            raw_output = response.json()["response"].strip()            return raw_output                except requests.exceptions.Timeout:            print(f"调用超时 (尝试 {attempt + 1}/{max_retries})")        except requests.exceptions.RequestException as e:            print(f"网络错误: {e} (尝试 {attempt + 1}/{max_retries})")        except KeyError:            print(f"返回格式错误 (尝试 {attempt + 1}/{max_retries})")        except Exception as e:            print(f"调用失败: {e} (尝试 {attempt + 1}/{max_retries})")        if attempt < max_retries - 1:            import time            time.sleep(2 ** attempt)    print("达到最大重试次数,返回空字符串。")    return ""# 解析LLM输出def parse_extraction_output(output: str) -> Dict[str, List[Dict]]:    entities = []    relationships = []    # 移除<think>部分    think_start = output.find("<think>")    think_end = output.find("</think>") + len("</think>") if output.find("</think>") != -1 else -1    if think_start != -1 and think_end > think_start:        think_content = output[think_start:think_end]        output = output.replace(think_content, "").strip()            # 移除完成标记    output = output.replace(COMPLETION_DELIMITER, "").strip()    # 分割记录    records = [r.strip() for r in output.split(RECORD_DELIMITER) if r.strip()]    # 添加有效记录    valid_records = []    for i, record in enumerate(records):        if record.startswith('("entity"') or record.startswith('("relationship"'):            valid_records.append(record)    for record in valid_records:        try:            if record.startswith('("entity"'):                # 移除开头的 ("entity" 和结尾的 )                clean_record = record.replace('("entity"', '', 1).rstrip(')').strip()                parts = [p.strip() for p in clean_record.split(TUPLE_DELIMITER)]                if len(parts) >= 4:                    # 移除引号                    name = parts[1].strip('"')                    entity_type = parts[2].strip('"')                    description = parts[3].strip('"')                    entity = {                        "name": name,                        "type": entity_type,                        "description": description                    }                    entities.append(entity)            elif record.startswith('("relationship"'):                # 移除开头的 ("relationship" 和结尾的 )                clean_record = record.replace('("relationship"', '', 1).rstrip(')').strip()                parts = [p.strip() for p in clean_record.split(TUPLE_DELIMITER)]                if len(parts) >= 5:                    # 安全地解析强度                    strength_str = parts[4].strip('"').rstrip(')')                    try:                        strength = float(strength_str)                    except ValueError:                        print(f"解析失败,默认为 0.0")                        strength = 0.0                    # 移除引号                    source = parts[1].strip('"')                    target = parts[2].strip('"')                    description = parts[3].strip('"')                    relationship = {                        "source": source,                        "target": target,                        "description": description,                        "strength": strength                    }                    relationships.append(relationship)        except Exception as e:            print(f"解析记录时出错: {e}. 跳过")    return {"entities": entities, "relationships": relationships}def split_chapter_into_three_parts(content: str) -> List[str]:    """    将章节内容按行分割,并尽量平均分成三份。    """    print(f"开始拆分章节,总字数: {len(content)}")    # 按行分割    lines = content.splitlines()    print(f"共 {len(lines)} 行")    # 计算每份应包含的行数    total_lines = len(lines)    part_size = total_lines // 3    remainder = total_lines % 3    parts = []    start_idx = 0    for i in range(3):        current_part_size = part_size + (1 if i < remainder else 0)        end_idx = start_idx + current_part_size        part_lines = lines[start_idx:end_idx]        part_content = "\n".join(part_lines).strip()        parts.append(part_content)        print(f"第 {i + 1} 部分: {current_part_size} 行, 字符数: {len(part_content)}")        start_idx = end_idx    return parts# 处理单个文本片段(带重试机制)def process_text_fragment_with_retry(fragment: str, max_retries: int = 3) -> Dict[str, List[Dict]]:    """    处理单个文本片段,带重试机制    """    for attempt in range(max_retries):        try:            if not fragment.strip():                return {"entities": [], "relationships": []}            # 构建提示词并调用LLM            prompt = create_extraction_prompt(ENTITY_TYPES, fragment)            llm_output = call_local_llm(prompt, max_retries=1)            if not llm_output:                raise Exception("LLM 返回空结果")            # 解析结果            result = parse_extraction_output(llm_output)            return result        except Exception as e:            print(f"第 {attempt + 1} 次处理失败: {e}")            if attempt < max_retries - 1:                import time                time.sleep(2)            else:                print("已达到最大重试次数,跳过")                return {"entities": [], "relationships": []}# 缓存章节数据def cache_chapter_data(chapter_title: str, entities: List[Dict], relationships: List[Dict], fragment_index: int):    """    缓存章节的数据,用于后续统一处理    新增 fragment_index 参数来记录关系出现的片段顺序    """    if chapter_title not in chapter_data_cache:        chapter_data_cache[chapter_title] = {            "entities": {},  # {entity_name: [description1, description2, ...]}            "relationships": []  # [{relationship_data, fragment_index}]        }    # 缓存实体描述    for entity in entities:        entity_name = entity["name"]        if entity_name not in chapter_data_cache[chapter_title]["entities"]:            chapter_data_cache[chapter_title]["entities"][entity_name] = {                "type": entity["type"],                "descriptions": []            }        chapter_data_cache[chapter_title]["entities"][entity_name]["descriptions"].append(entity["description"])    # 缓存关系    for relationship in relationships:        relationship_with_fragment = {            "relationship": relationship,            "fragment_index": fragment_index        }        chapter_data_cache[chapter_title]["relationships"].append(relationship_with_fragment)# 清理描述中的<think>部分def clean_description(description: str) -> str:    """    清理描述中的<think>部分    """    think_start = description.find("<think>")    think_end = description.find("</think>") + len("</think>") if description.find("</think>") != -1 else -1    if think_start != -1 and think_end > think_start:        think_content = description[think_start:think_end]        description = description.replace(think_content, "").strip()    return description# 融合实体描述def merge_entity_descriptions(descriptions: List[str], entity_name: str, chapter_title: str) -> str:    """    使用LLM融合实体的多个描述,保持简洁风格,自动去除<think>部分    """    if not descriptions:        return ""    # 先清理所有描述中的<think>部分    cleaned_descriptions = [clean_description(desc) for desc in descriptions]    # 过滤掉空描述    cleaned_descriptions = [desc for desc in cleaned_descriptions if desc.strip()]    if not cleaned_descriptions:        return ""    if len(cleaned_descriptions) == 1:        return cleaned_descriptions[0]    prompt = f"""请将以下关于实体"{entity_name}"在章节"{chapter_title}"中的多个描述压缩融合成一个简洁、完整的描述:描述列表:{chr(10).join([f"{i + 1}. {desc}" for i, desc in enumerate(cleaned_descriptions)])}要求:1. 保持信息完整性2. 去除重复内容3. 语言简洁明了,保持原文风格4. 不要添加额外的分析或解释5. 只返回融合后的描述,不要添加其他说明"""    try:        payload = {            "model": LLM_MODEL,            "prompt": prompt,            "stream": False        }        response = requests.post(f"{OLLAMA_BASE_URL}/api/generate", json=payload, timeout=120)        response.raise_for_status()        merged_description = response.json()["response"].strip()        # 再次清理可能产生的<think>部分        merged_description = clean_description(merged_description)        return merged_description    except Exception as e:        print(f"融合描述失败: {e},返回第一个清理后的描述")        return cleaned_descriptions[0] if cleaned_descriptions else ""# 融合关系def merge_relationship_descriptions(relationships: List[Dict], source_entity: str, target_entity: str,                                    chapter_title: str) -> Dict:    """    使用LLM融合同一实体对的多个关系描述,保持顺序信息    """    if not relationships:        return None    # 按片段顺序排序    sorted_relationships = sorted(relationships, key=lambda x: x.get('fragment_index', 0))    # 提取关系描述列表    relationship_descriptions = [rel['relationship']['description'] for rel in sorted_relationships]    # 计算平均强度    avg_strength = sum(rel['relationship']['strength'] for rel in sorted_relationships) / len(sorted_relationships)    # 如果只有一个关系,直接返回    if len(relationship_descriptions) == 1:        return {            "description": relationship_descriptions[0],            "strength": avg_strength        }    # 构建提示词    prompt = f"""请将以下关于实体"{source_entity}"和"{target_entity}"在章节"{chapter_title}"中的多个关系描述压缩融合成一个简洁、完整的关系描述:关系描述列表(按出现顺序):{chr(10).join([f"{i + 1}. {desc}" for i, desc in enumerate(relationship_descriptions)])}要求:1. 保持信息完整性,包含所有重要的关系信息2. 去除重复内容3. 语言简洁明了,保持原文风格4. 按时间或逻辑顺序组织信息5. 只返回融合后的描述,不要添加其他说明"""    try:        payload = {            "model": LLM_MODEL,            "prompt": prompt,            "stream": False        }        response = requests.post(f"{OLLAMA_BASE_URL}/api/generate", json=payload, timeout=120)        response.raise_for_status()        merged_description = response.json()["response"].strip()        # 清理可能产生的<think>部分        merged_description = clean_description(merged_description)        return {            "description": merged_description,            "strength": avg_strength        }    except Exception as e:        print(f"融合关系描述失败: {e},返回第一个描述和平均强度")        return {            "description": relationship_descriptions[0] if relationship_descriptions else "",            "strength": avg_strength        }# 处理完整章节数据并写入Neo4jdef process_chapter_and_write_to_neo4j(chapter_title: str):    """    处理缓存的章节数据并写入Neo4j,增加关系融合功能    """    if chapter_title not in chapter_data_cache:        print(f"章节 '{chapter_title}' 没有缓存数据")        return    print(f"开始处理章节 '{chapter_title}' 的数据...")    chapter_data = chapter_data_cache[chapter_title]    entities_data = chapter_data["entities"]    relationships_data = chapter_data["relationships"]    try:        with neo4j_driver.session() as session:            # 1. 确保章节节点存在            session.run("MERGE (c:CHAPTER {title: $title})", title=chapter_title)            # 2. 处理实体 - 融合描述并创建节点及描述            for entity_name, entity_info in entities_data.items():                # 融合当前章节的多个片段描述                merged_description = merge_entity_descriptions(                    entity_info["descriptions"], entity_name, chapter_title                )                # 为描述创建唯一ID                content_hash = hashlib.md5(merged_description.encode('utf-8')).hexdigest()                desc_id = f"{entity_name}_{chapter_title}_{content_hash}"                # 创建或更新实体节点                session.run(                    """                    MERGE (e:Entity {name: $name})                    ON CREATE SET                         e.type = $entity_type,                        e.first_seen_chapter = $chapter_title                    ON MATCH SET                        e.type = $entity_type                    """,                    name=entity_name,                    entity_type=entity_info["type"],                    chapter_title=chapter_title                )                # 创建或合并描述节点                session.run(                    """                    MERGE (d:DESCRIPTION {id: $desc_id})                    ON CREATE SET                         d.content = $content,                        d.content_hash = $content_hash                    """,                    desc_id=desc_id,                    content=merged_description,                    content_hash=content_hash                )                # 连接实体和描述                session.run(                    """                    MATCH (e:Entity {name: $entity_name})                    MATCH (d:DESCRIPTION {id: $desc_id})                    MERGE (e)-[:HAS_DESCRIPTION]->(d)                    """,                    entity_name=entity_name,                    desc_id=desc_id                )                # 连接描述和章节                session.run(                    """                    MATCH (d:DESCRIPTION {id: $desc_id})                    MATCH (c:CHAPTER {title: $chapter_title})                    MERGE (d)-[:FROM_CHAPTER]->(c)                    """,                    desc_id=desc_id,                    chapter_title=chapter_title                )            # 3. 处理关系 - 先分组再融合            # 按实体对分组关系            relationship_groups = {}            for rel_item in relationships_data:                rel = rel_item['relationship']                source = rel['source']                target = rel['target']                # 使用排序后的实体对作为键,确保 A->B 和 B->A 被视为同一组                entity_pair = tuple(sorted([source, target]))                pair_key = f"{entity_pair[0]}_{entity_pair[1]}"                if pair_key not in relationship_groups:                    relationship_groups[pair_key] = {                        'source': source,                        'target': target,                        'relationships': []                    }                relationship_groups[pair_key]['relationships'].append(rel_item)            # 处理每个实体对的关系组            for group_key, group_data in relationship_groups.items():                source_entity = group_data['source']                target_entity = group_data['target']                group_relationships = group_data['relationships']                # 融合该实体对的所有关系                merged_relationship = merge_relationship_descriptions(                    group_relationships, source_entity, target_entity, chapter_title                )                if merged_relationship and merged_relationship['description'].strip():                    # 为融合后的关系实例创建一个节点来存储上下文信息                    rel_content = f"{source_entity}_{target_entity}_{merged_relationship['description']}"                    rel_hash = hashlib.md5(rel_content.encode('utf-8')).hexdigest()                    rel_instance_id = f"{rel_hash}_{chapter_title}"                    result = session.run(                        """                        MERGE (ri:RELATIONSHIP_INSTANCE {id: $rel_id})                        ON CREATE SET                             ri.description = $description,                             ri.strength = $strength                        RETURN ri.id AS rel_instance_id                        """,                        rel_id=rel_instance_id,                        description=merged_relationship['description'],                        strength=merged_relationship['strength']                    )                    rel_instance_node_id = result.single()["rel_instance_id"]                    # 连接源实体 -> 关系实例                    session.run(                        """                        MATCH (a:Entity {name: $source})                        MATCH (ri:RELATIONSHIP_INSTANCE {id: $rel_instance_id})                        MERGE (a)-[:IS_SOURCE_OF]->(ri)                        """,                        source=source_entity,                        rel_instance_id=rel_instance_node_id                    )                    # 连接关系实例 -> 目标实体                    session.run(                        """                        MATCH (b:Entity {name: $target})                        MATCH (ri:RELATIONSHIP_INSTANCE {id: $rel_instance_id})                        MERGE (ri)-[:POINTS_TO]->(b)                        """,                        target=target_entity,                        rel_instance_id=rel_instance_node_id                    )                    # 连接关系实例 -> 章节                    session.run(                        """                        MATCH (ri:RELATIONSHIP_INSTANCE {id: $rel_instance_id})                        MATCH (c:CHAPTER {title: $chapter_title})                        MERGE (ri)-[:FROM_CHAPTER]->(c)                        """,                        rel_instance_id=rel_instance_node_id,                        chapter_title=chapter_title                    )            # 清理缓存            del chapter_data_cache[chapter_title]    except Exception as e:        print(f"处理章节 '{chapter_title}' 数据失败: {e}")        raise# 创建数据库约束def create_constraints():    """    为实体节点创建唯一性约束,防止重复。    """    with neo4j_driver.session() as session:        session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (e:Entity) REQUIRE e.name IS UNIQUE")        session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (c:CHAPTER) REQUIRE c.title IS UNIQUE")        session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (d:DESCRIPTION) REQUIRE d.id IS UNIQUE")        session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (ri:RELATIONSHIP_INSTANCE) REQUIRE ri.id IS UNIQUE")# 主函数def main():    global shutdown_requested    # 注册信号处理器    signal.signal(signal.SIGINT, signal_handler)    signal.signal(signal.SIGTERM, signal_handler)    # 创建数据库约束    create_constraints()    file_path = "/data/KGRAG/诛仙.txt"    try:        with open(file_path, 'r', encoding='utf-8') as f:            content = f.read()    except Exception as e:        print(f"读取文件失败: {e}")        return    # 按章节分割    pattern = r'(第[0-9一二三四五六七八九十百千]+章\s*[^\n]*)'    parts = re.split(pattern, content)    chapters = []    if len(parts) > 0 and parts[0].strip():        chapters.append({'title': '引子', 'content': parts[0]})    for i in range(1, len(parts), 2):        if i + 1 < len(parts):            chapters.append({                'title': parts[i].strip(),                'content': parts[i + 1].strip()            })    if len(chapters) < 2:        print("文本中没有足够的章节。")        return    # 加载处理进度    progress_data = load_processing_progress()    progress_data["total_chapters"] = len(chapters) - 1  # 减去引子    # 获取需要处理的章节    chapters_to_process = get_chapters_to_process(chapters[1:], progress_data, batch_size=None , retry_failed=True)    if not chapters_to_process:        print("文本为空")        return    print(f"\n处理 {len(chapters_to_process)} 个章节")    # 处理章节    for chapter_index, (chapter, retry_count) in enumerate(chapters_to_process):        if shutdown_requested:            print("\n收到退出信号,停止处理")            break        print(f"\n正在处理章节: {chapter['title']} (重试次数: {retry_count})")        # 标记章节为处理中        mark_chapter_as_processing(progress_data, chapter['title'])        try:            # 将章节内容按行拆分成三份            text_fragments = split_chapter_into_three_parts(chapter['content'])            # 处理每个片段            for i, fragment in enumerate(text_fragments):                if shutdown_requested:                    print("\n收到退出信号,中断处理...")                    break                print(f"\n处理第 {i + 1} 个片段 (字数: {len(fragment)})")                if len(fragment.strip()) > 0:                    result = process_text_fragment_with_retry(fragment)                    if result and (result['entities'] or result['relationships']):                        print(f"  -> 缓存片段 {i + 1} 的数据...")                        # 缓存数据时传入片段索引                        cache_chapter_data(                            chapter_title=chapter['title'],                            entities=result['entities'],                            relationships=result['relationships'],                            fragment_index=i  # 传入片段索引                        )                    else:                        print("片段未抽取到任何实体或关系,跳过缓存。")                else:                    print("  跳过空片段")            # 如果收到退出信号,不处理当前章节的写入            if shutdown_requested:                print(f"章节 {chapter['title']} 处理被中断,数据未写入数据库")                mark_chapter_as_failed(progress_data, chapter['title'], "处理被用户中断", retry_count)                continue            # 章节结束时统一处理并写入数据库            process_chapter_and_write_to_neo4j(chapter['title'])            # 标记章节为完成            mark_chapter_as_completed(progress_data, chapter['title'])        except Exception as e:            print(f"章节 {chapter['title']} 处理失败: {e}")            mark_chapter_as_failed(progress_data, chapter['title'], str(e), retry_count)            continue    # 关闭数据库连接    neo4j_driver.close()    print("执行完毕,Neo4j连接已关闭。")if __name__ == "__main__":    main()

内容太多,欲知后事如何,请听下回分解。

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

GraphRAG 知识图谱 自然语言处理 《诛仙》 大语言模型
相关文章