掘金 人工智能 05月16日 11:03
lightrag fastapi接口设计
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文通过三张流程图,详细解析了 FastAPI + LightRAG 项目代码的运作流程。重点关注了应用启动流程,/query/ 接口请求处理流程,以及 /ingest_markdown/ 接口请求处理流程。文章揭示了代码中每次请求都重新创建 LightRAG 实例的特性,并指出了启动时数据插入操作被注释的问题。通过流程图和解释,帮助开发者更好地理解代码逻辑,并发现潜在的性能优化点。

🚀 **FastAPI应用启动流程**:应用启动时,会创建一个全局的 LightRAG 实例,并尝试从指定目录加载存储。但由于代码中数据插入部分被注释,实际并未向全局实例添加数据。

🔍 **/query/接口处理流程**:每次收到 /query/ 请求时,都会创建一个全新的局部 LightRAG 实例,并初始化存储,而非使用全局实例。根据请求参数 stream 的值,决定采用流式或普通方式进行查询,并将结果返回给客户端。

📝 **/ingest_markdown/接口处理流程**:与 /query/ 接口类似,/ingest_markdown/ 接口也会为每个请求创建一个新的局部 LightRAG 实例。系统会验证上传的文件是否为 .md 格式,若有效则读取内容并插入到这个新的局部实例中。

好的,我们可以用几张流程图来帮助理解你提供的 FastAPI + LightRAG 项目代码。

在看流程图之前,请注意代码中的一个重要变化:

在你最新提供的代码中,/query//ingest_markdown/ 这两个API接口在每次被调用时,都会重新创建一个全新的 LightRAG 实例并重新初始化它的存储 (await rag_instance.initialize_storages()) 。这与之前我们讨论的在程序启动时创建一个全局共享的 rag_instance 不同。这种做法(每次请求都重新初始化)如果涉及到从磁盘加载大量数据(比如向量索引),可能会导致API响应非常缓慢,效率较低。

尽管如此,下面的流程图会如实反映你当前代码的逻辑。


流程图 1: FastAPI 应用启动流程

这张图展示了当你运行 python serving.py 时,FastAPI应用是如何启动和初始化的。

Code snippet

graph TD    A[开始 FastAPI 应用] --> B{Uvicorn 运行 serving.py};    B --> C[触发 @app.on_event("startup") 事件];    C --> D[执行 startup_event() 函数];    D --> E[尝试初始化全局 rag_instance];    E --> F[调用 initialize_rag_system()];    F --> G[创建 LightRAG 对象 (全局实例)];    G --> H[配置 LightRAG (工作目录, LLM, Embedding等)];    H --> I[调用 current_rag.initialize_storages()];    I --> J[从 WORKING_DIR 加载/初始化存储 (为全局RAG实例)];    J --> K[调用 initialize_pipeline_status()];    K --> L[全局 rag_instance 准备就绪];    L --> M[调用 ingest_initial_markdowns(rag_instance)];    M --> N{在 INITIAL_MARKDOWN_DIR 查找 .md 文件};    N -- 找到文件 --> O[循环遍历每个文件];    O --> P[读取文件内容];    P --> Q["记录日志 'Successfully ingested: ...' \n (注意: 代码中实际的 rag_sys.insert() 被注释掉了,所以这里只是记录日志,并未真正插入数据)"];    Q --> O;    O -- 遍历完成 --> R[记录日志 'Initial ingestion process complete'];    N -- 未找到文件 --> R;    R --> S[记录日志 'Application startup sequence completed'];    S --> T[FastAPI 应用准备好接收请求];    T --> U[启动完成];

解释:

    应用启动后,会执行 startup_event 函数。该函数会调用 initialize_rag_system 来创建一个全局的 LightRAG 实例,并从指定的工作目录 (WORKING_DIR) 加载或初始化其存储。之后,会调用 ingest_initial_markdowns但请注意,在你提供的代码中,这个函数里实际执行数据插入的 rag_sys.insert(...) 那行被注释掉了。 所以,启动时它会遍历 ./口腔_markdowns 里的文件并记录“成功导入”的日志,但实际上并没有将这些文件内容插入到全局的 rag_instance 中。完成后,FastAPI 服务就绪,等待外部请求。

流程图 2: /query/接口请求处理流程

这张图展示了当有用户向 /query/ 接口发送请求时,系统是如何处理的。

Code snippet

graph TD    A[客户端发送 POST 请求到 /query/] --> B[FastAPI 接收请求];    B --> C[解析请求体 QueryRequest (包含 query, mode, stream 参数)];    C --> D[执行 perform_query() 函数];    D --> E["创建全新的、局部的 LightRAG 实例 \n (注意: 不是使用全局实例!)"];    E --> F[配置此局部 LightRAG 实例 (工作目录, LLM, Embedding等)];    F --> G[调用局部 rag_instance.initialize_storages()];    G --> H[从 WORKING_DIR 加载/初始化存储 (为这个新的局部RAG实例)];    H --> I{局部 RAG 实例初始化成功?};    I -- 否 --> J[记录错误, 抛出 HTTPException 503];    J --> Z[结束请求 (错误)];    I -- 是 --> K[记录日志 "Received query..."];    K --> L[创建 QueryParam 对象 (包含 mode, stream)];    L --> M{请求参数 stream 为 True?};    M -- 是 (流式输出) --> N[调用局部 rag_instance.query(stream=True)];    N --> O{返回的是异步生成器?};    O -- 是 --> P[构造并返回 StreamingResponse];    P --> Z1[结束请求 (成功)];    O -- 否 --> Q[记录错误, 抛出 HTTPException 500];    Q --> Z;    M -- 否 (非流式输出) --> R[在线程池中调用局部 rag_instance.query(stream=False)];    R --> S[获取完整的 response_data];    S --> T[返回 JSONResponse {"answer": response_data}];    T --> Z1;        subgraph 查询过程中的错误处理        N ----> X[捕获查询时发生的异常];        R ----> X;        X ----> Y[记录错误, 抛出 HTTPException 500];        Y ----> Z;    end

解释:

    /query/ 接口收到请求,会进入 perform_query 函数。关键点: 此函数会忽略启动时创建的全局 rag_instance。它会当场创建一个全新的、局部的 LightRAG 实例,并立即调用 initialize_storages()。这意味着每次查询都会有一次加载/初始化存储的开销。然后,根据请求中的 stream 参数,决定是进行流式输出还是普通输出。调用 rag_instance.query() 执行查询,并将结果返回给客户端。

流程图 3: /ingest_markdown/接口请求处理流程

这张图展示了当有用户向 /ingest_markdown/ 接口上传文件时,系统是如何处理的。

Code snippet

graph TD    A[客户端发送 POST 请求到 /ingest_markdown/ 并上传 .md 文件] --> B[FastAPI 接收请求];    B --> C[执行 ingest_markdown_file() 函数];    C --> D["创建全新的、局部的 LightRAG 实例 \n (注意: 不是使用全局实例!)"];    D --> E[配置此局部 LightRAG 实例 (工作目录, LLM, Embedding等)];    E --> F[调用局部 rag_instance.initialize_storages()];    F --> G[从 WORKING_DIR 加载/初始化存储 (为这个新的局部RAG实例)];    G --> H{局部 RAG 实例初始化成功?};    H -- 否 --> I[记录错误, 抛出 HTTPException 503];    I --> Z[结束请求 (错误)];    H -- 是 --> J{上传的是 .md 文件?};    J -- 否 --> K[抛出 HTTPException 400 (文件类型错误)];    K --> Z;    J -- 是 --> L[记录日志 "Received file..."];    L --> M[异步读取文件内容];    M --> N[在线程池中调用局部 rag_instance.insert(文件内容)];    N --> O[记录日志 "Successfully ingested file..."];    O --> P[返回 JSONResponse {"message": "File ingested..."}];    P --> Z1[结束请求 (成功)];    subgraph 导入过程中的错误处理        M ----> X[捕获读文件或插入数据时发生的异常];        N ----> X;        X ----> Y[记录错误, 抛出 HTTPException 500];        Y ----> Z;    end

解释:

    /ingest_markdown/ 接口收到文件上传请求,会进入 ingest_markdown_file 函数。关键点:/query/ 接口类似,此函数也会当场创建一个全新的、局部的 LightRAG 实例,并立即调用 initialize_storages()。系统会检查文件是否为 .md 格式。如果文件有效,则读取内容,并调用 rag_instance.insert() 将数据导入(同样,是在这个新创建的局部实例中)。返回处理结果。

总结与提醒:

希望这些流程图和解释能帮助你更好地理解代码!

好的,这里重新为你生成之前讨论的三个流程图的 Mermaid 代码。你可以将每一块代码完整地复制粘贴到 Mermaid Live Editor 或其他支持 Mermaid 的工具中查看渲染效果。

这些流程图是根据你之前提供的 Python 代码(特别是 API 接口中每次请求都重新初始化 LightRAG 实例以及启动时 ingest_initial_markdowns 函数中 insert 操作被注释掉的情况)来绘制的。


流程图 1: FastAPI 应用启动流程

Code snippet

graph TD    A[开始 FastAPI 应用] --> B{Uvicorn 运行 serving.py};    B --> C[触发 @app.on_event("startup") 事件];    C --> D[执行 startup_event() 函数];    D --> E[尝试初始化全局 rag_instance];    E --> F[调用 initialize_rag_system()];    F --> G[创建 LightRAG 对象 (全局实例)];    G --> H[配置 LightRAG (工作目录, LLM, Embedding等)];    H --> I[调用 current_rag.initialize_storages()];    I --> J[从 WORKING_DIR 加载/初始化存储 (为全局RAG实例)];    J --> K[调用 initialize_pipeline_status()];    K --> L[全局 rag_instance 准备就绪];    L --> M[调用 ingest_initial_markdowns(rag_instance)];    M --> N{在 INITIAL_MARKDOWN_DIR 查找 .md 文件};    N -- 找到文件 --> O[循环遍历每个文件];    O --> P[读取文件内容];    P --> Q["记录日志 'Successfully ingested: ...' \n (注意: 代码中实际的 rag.insert() 被注释掉了,所以这里仅记录日志)"];    Q --> O;    O -- 遍历完成 --> R[记录日志 'Initial ingestion process complete'];    N -- 未找到文件 --> R;    R --> S[记录日志 'Application startup sequence completed'];    S --> T[FastAPI 应用准备好接收请求];    T --> U[启动完成];

流程图 2: /query/接口请求处理流程

Code snippet

graph TD    A[客户端发送 POST 请求到 /query/] --> B[FastAPI 接收请求];    B --> C[解析 QueryRequest (query, mode, stream)];    C --> D[执行 perform_query() 函数];    D --> E["创建全新的、局部的 LightRAG 实例 \n (注意: 非全局实例!)"];    E --> F[配置局部 LightRAG (工作目录, LLM, Embedding)];    F --> G[调用局部 rag_instance.initialize_storages()];    G --> H[为新的局部RAG实例加载/初始化存储 (来自WORKING_DIR)];    H --> I{局部 RAG 实例初始化成功?};    I -- 否 --> J[记录错误, 抛出 HTTPException 503];    J --> Z[结束请求 (错误)];    I -- 是 --> K[记录日志 "Received query..."];    K --> L[创建 QueryParam 对象 (mode, stream)];    L --> M{请求参数 stream 为 True?};    M -- 是 (流式输出) --> N[调用局部 rag_instance.query(stream=True)];    N --> O{返回的是异步生成器?};    O -- 是 --> P[构造并返回 StreamingResponse];    P --> Z1[结束请求 (成功)];    O -- 否 --> Q[记录错误, 抛出 HTTPException 500];    Q --> Z;    M -- 否 (非流式输出) --> R[在线程池中调用局部 rag_instance.query(stream=False)];    R --> S[获取完整的 response_data];    S --> T[返回 JSONResponse {"answer": response_data}];    T --> Z1;        subgraph 查询过程中的错误处理        N ----> X[捕获查询时发生的异常];        R ----> X;        X ----> Y[记录错误, 抛出 HTTPException 500];        Y ----> Z;    end

流程图 3: /ingest_markdown/接口请求处理流程

Code snippet

graph TD    A[客户端发送 POST 请求到 /ingest_markdown/ 并上传 .md 文件] --> B[FastAPI 接收请求];    B --> C[执行 ingest_markdown_file() 函数];    C --> D["创建全新的、局部的 LightRAG 实例 \n (注意: 非全局实例!)"];    D --> E[配置局部 LightRAG (工作目录, LLM, Embedding)];    E --> F[调用局部 rag_instance.initialize_storages()];    F --> G[为新的局部RAG实例加载/初始化存储 (来自WORKING_DIR)];    G --> H{局部 RAG 实例初始化成功?};    H -- 否 --> I[记录错误, 抛出 HTTPException 503];    I --> Z[结束请求 (错误)];    H -- 是 --> J{上传的是 .md 文件?};    J -- 否 --> K[抛出 HTTPException 400 (文件类型错误)];    K --> Z;    J -- 是 --> L[记录日志 "Received file..."];    L --> M[异步读取文件内容];    M --> N[在线程池中调用局部 rag_instance.insert(文件内容)];    N --> O[记录日志 "Successfully ingested file..."];    O --> P[返回 JSONResponse {"message": "File ingested..."}];    P --> Z1[结束请求 (成功)];    subgraph 导入过程中的错误处理        M ----> X[捕获读文件或插入数据时发生的异常];        N ----> X;        X ----> Y[记录错误, 抛出 HTTPException 500];        Y ----> Z;    end

将这些代码块分别复制到 Mermaid 渲染器中,应该就能看到对应的流程图了。希望这次能顺利渲染!

感谢您的耐心等待!我已重新检查了这三个流程图的 Mermaid 代码,确认它们符合 Mermaid 语法规范。以下是可直接渲染的版本:


流程图 1: FastAPI 应用启动流程(修正版)

graph TD    A[开始 FastAPI 应用] --> B{Uvicorn 运行 serving.py}    B --> C[触发 @app.on_event startup 事件]    C --> D[执行 startup_event 函数]    D --> E[尝试初始化全局 rag_instance]    E --> F[调用 initialize_rag_system]    F --> G[创建 LightRAG 对象]    G --> H[配置 LightRAG 参数]    H --> I[调用 initialize_storages]    I --> J[从 WORKING_DIR 加载存储]    J --> K[初始化 pipeline_status]    K --> L[全局 rag_instance 就绪]    L --> M[调用 ingest_initial_markdowns]    M --> N{存在 .md 文件?}    N -->|是| O[遍历文件]    O --> P[读取文件内容]    P --> Q[记录日志]    Q --> O    N -->|否| R[记录完成日志]    O -->|完成| R    R --> S[启动完成]

流程图 2: /query/接口请求处理(修正版)

graph TD    A[POST /query/] --> B[解析 QueryRequest]    B --> C[执行 perform_query]    C --> D[创建新 LightRAG 实例]    D --> E[初始化存储]    E --> F{初始化成功?}    F -->|否| G[返回503错误]    F -->|是| H[记录查询日志]    H --> I{stream模式?}    I -->|是| J[调用流式查询]    J --> K[返回StreamingResponse]    I -->|否| L[调用普通查询]    L --> M[返回JSONResponse]        subgraph 错误处理        J -.-> N[捕获异常]        L -.-> N        N --> O[返回500错误]    end

流程图 3: /ingest_markdown/接口请求处理(修正版)

graph TD    A[POST /ingest_markdown] --> B[执行 ingest_markdown_file]    B --> C[创建新 LightRAG 实例]    C --> D[初始化存储]    D --> E{初始化成功?}    E -->|否| F[返回503错误]    E -->|是| G{是.md文件?}    G -->|否| H[返回400错误]    G -->|是| I[读取文件内容]    I --> J[调用insert方法]    J --> K[返回成功响应]        subgraph 错误处理        I -.-> L[捕获异常]        J -.-> L        L --> M[返回500错误]    end

修改说明:

    简化了节点文字内容,避免特殊字符统一使用标准流程图符号确保所有箭头连接明确移除了可能导致解析失败的注释使用更简单的子图语法

这些图表现在应该能在所有标准 Mermaid 环境(如 Mermaid Live Editor)中正常渲染。如果需要进一步调整,可以告诉我具体需求。

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

FastAPI LightRAG 流程图 代码解析 性能优化
相关文章