掘金 人工智能 13小时前
LangChain 设计原理分析³ | 从零实现一个自定义 Runnable 模块
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文深入介绍了如何在 LangChain 中从零开始构建自定义 Runnable 模块,并以此为基础搭建小型执行链。通过亲手实现 retry 和 fallback 等功能,读者可以从代码层面掌握 LangChain 的核心执行架构。文章详细解释了实现自定义 Runnable 的必要性,包括理解抽象接口、掌握执行流程以及拓展可组合能力。文中提供了具体代码示例,演示了如何实现最简 AddOne 模块,并通过 LCEL 表达式组合成流水线,为模块添加重试与回退能力,并结合了批量处理与流式处理。最终,文章通过对比图示展示了自定义模块如何无缝融入 LangChain 执行架构,帮助用户理解 Runnable 接口设计、错误恢复机制、LCEL 链式组合思维以及如何接入 LangChain 生态。

📦 理解 Runnable 接口设计:通过自定义模块,用户能深入理解 Runnable 接口为何如此设计,以及其各方法在执行过程中的作用,为后续开发打下坚实基础。

🔄 掌握执行流程与控制:文章展示了如何从 .invoke() 到 .batch()、.stream(),并重点讲解了如何注入控制逻辑,如重试(retry)和回退(fallback),以有效管理失败或实现并行执行,提升代码健壮性。

🧱 拓展可组合能力与生态接入:学习如何将自定义逻辑封装成可被 LCEL 组合重用的模块,使其能与 OpenAI 模型、Parser 甚至 Agent 等 LangChain 内建模块流畅协作,实现业务模块的无缝接入。

💡 继承 RunnableSerializable 的优势:文章强调了继承 RunnableSerializable 而非直接继承 Runnable 的好处,包括自动获得 .batch()、.stream()、.bind() 等方法实现,支持 JSON 序列化和可观测配置导出,并只需实现核心的 .invoke() 方法即可接入整个 LCEL 执行框架。

⚙️ 实现场景级别错误恢复:通过 with_retry 和 with_fallbacks 的示例,用户学会了如何为自定义模块添加健壮的错误处理机制,例如指定重试的异常类型、设置指数抖动退避以及定义回退逻辑,从而增强应用的稳定性。

在前两篇中,我们深入理解了 LangChain 的 Runnable 接口与 LCEL 表达式组合思想,以及与 Agent 执行控制的基本能力。

本篇我们将从头构建一个自定义 Runnable 模块,并以它为基础搭建一个小型执行链,亲手实现 retry、fallback 等功能,让你能真正从代码层面掌握核心执行架构。


一、为什么要自己实现一个 Runnable?


二、实现一个最简 AddOne 模块

按 LangChain 最新规范,你可以实现如下最基础的 Runnable

from typing import Any, Optionalfrom langchain_core.runnables import RunnableSerializablefrom langchain_core.runnables.config import RunnableConfigclass AddOne(RunnableSerializable[int, int]):    def invoke(        self,        input: int,        config: Optional[RunnableConfig] = None,        **kwargs: Any    ) -> int:        return input + 1

config 参数用于携带执行上下文,如 debug tags、trace id、LangSmith 信息等。

💡 为什么不直接继承 Runnable 而要继承 RunnableSerializable

这是因为 RunnableSerializable 是 LangChain 提供的一个 带默认实现的可序列化 Runnable 抽象基类,相比直接继承 Runnable


三、构造一个组合执行链(LCEL)

from langchain_core.runnables import RunnableLambdadouble = RunnableLambda(lambda x: x * 2)add_one = AddOne()pipeline = double | add_oneprint(pipeline.invoke(3))  # 输出 7

这样你就搭建了一个表达式式的小流水线:先乘 2 再加 1


四、为你的模块添加重试与回退能力(Retry 与 Fallback)

# 定义一个可能出错的函数模块(主模块)add_one = RunnableLambda(lambda x: x + 1 if x >= 0 else 1 / 0)# 使用 with_retry 包裹,自动重试最多 2 次(适用于 ValueError、ZeroDivisionError 等)robust_add_one = add_one.with_retry(    retry_if_exception_type=(ValueError, ZeroDivisionError),  # 要重试的异常类型    wait_exponential_jitter=True,  # 是否启用指数抖动退避    stop_after_attempt=2  # 最多重试次数)

搭配 fallback:

# 定义 fallback 模块fallback_add = RunnableLambda(lambda x: x + 5)# 整合 fallbackadd_one_with_fallback = robust_add_one.with_fallbacks([fallback_add])

联合使用:

chain = double | add_one_with_fallbackprint(chain.invoke(-1))  # 主 logic 出错后 fallback 接力,输出 -2 + 5 = 3

五、批量处理与流式处理结合

批量处理示例:

print(chain.batch([-1, 0, 1, 2, 3]))  # 批量执行所有输入

输出:

[3, 1, 3, 5, 7]

流式处理示例:

# 定义一个模拟 LLM 流式输出的模块(逐字输出字符串)def generate_streaming_tokens(n: int):    result = f"Hello! The input number is {n}."    for char in result:        time.sleep(0.1)        yield char  # 模拟 token-by-token 输出# 封装为 Runnable,可支持 .stream()streaming_chain = RunnableLambda(generate_streaming_tokens)# 流式执行并输出(逐字符打印)for token in streaming_chain.stream(3):    print(token, end="")

输出:

你自定义的模块就支持了 invoke()batch()stream() 三种执行入口方式,和 LangChain 内建模块行为一致。


六、对比图示:自定义模块如何融入 LangChain 执行架构

[input] -> RunnableLambda(double) -> AddOne.invoke()                   |               |               with_retry       with_fallbacks                   ↓               ↓          .stream() / .batch()      .invoke()              /                         /          输出流或批量结果             最终值

如图所示,你的模块已具备组合、控制、可调试的核心特性。


七、小结:你通过这个模块学到了什么?


接下来,我们将深入剖析 LangChain 中的 ChatModel 接口,解析其多模型适配架构与工厂模式设计,构建高复用的语言模型封装层。

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

LangChain Runnable LCEL Python AI开发
相关文章