在前两篇中,我们深入理解了 LangChain 的
Runnable
接口与 LCEL 表达式组合思想,以及与 Agent 执行控制的基本能力。本篇我们将从头构建一个自定义
Runnable
模块,并以它为基础搭建一个小型执行链,亲手实现 retry、fallback 等功能,让你能真正从代码层面掌握核心执行架构。
一、为什么要自己实现一个 Runnable?
- 📦 理解抽象接口:你能知道
Runnable
为什么要设计成这样,以及每个方法在执行过程中扮演的角色。🔄 掌握执行流程:从 .invoke()
到 .batch()
、.stream()
,并掌握如何注入控制逻辑来管理失败或并行执行。🧱 拓展可组合能力:让你写出的自定义逻辑,能被 LCEL 组合重用,和 OpenAI 模型、Parser 甚至 Agent 流畅协作。二、实现一个最简 AddOne
模块
按 LangChain 最新规范,你可以实现如下最基础的 Runnable
:
from typing import Any, Optionalfrom langchain_core.runnables import RunnableSerializablefrom langchain_core.runnables.config import RunnableConfigclass AddOne(RunnableSerializable[int, int]): def invoke( self, input: int, config: Optional[RunnableConfig] = None, **kwargs: Any ) -> int: return input + 1
config
参数用于携带执行上下文,如 debug tags、trace id、LangSmith 信息等。
💡 为什么不直接继承 Runnable
而要继承 RunnableSerializable
?
这是因为 RunnableSerializable
是 LangChain 提供的一个 带默认实现的可序列化 Runnable
抽象基类,相比直接继承 Runnable
:
- 它自动提供了
.batch()
、.stream()
、.bind()
等方法的实现;它支持 JSON 序列化和可观测配置导出(用于 LangSmith 等工具);它要求你 只需实现核心方法 如 .invoke()
,就能立刻接入整个 LCEL 执行框架。三、构造一个组合执行链(LCEL)
from langchain_core.runnables import RunnableLambdadouble = RunnableLambda(lambda x: x * 2)add_one = AddOne()pipeline = double | add_oneprint(pipeline.invoke(3)) # 输出 7
这样你就搭建了一个表达式式的小流水线:先乘 2 再加 1。
四、为你的模块添加重试与回退能力(Retry 与 Fallback)
# 定义一个可能出错的函数模块(主模块)add_one = RunnableLambda(lambda x: x + 1 if x >= 0 else 1 / 0)# 使用 with_retry 包裹,自动重试最多 2 次(适用于 ValueError、ZeroDivisionError 等)robust_add_one = add_one.with_retry( retry_if_exception_type=(ValueError, ZeroDivisionError), # 要重试的异常类型 wait_exponential_jitter=True, # 是否启用指数抖动退避 stop_after_attempt=2 # 最多重试次数)
搭配 fallback:
# 定义 fallback 模块fallback_add = RunnableLambda(lambda x: x + 5)# 整合 fallbackadd_one_with_fallback = robust_add_one.with_fallbacks([fallback_add])
联合使用:
chain = double | add_one_with_fallbackprint(chain.invoke(-1)) # 主 logic 出错后 fallback 接力,输出 -2 + 5 = 3
五、批量处理与流式处理结合
批量处理示例:
print(chain.batch([-1, 0, 1, 2, 3])) # 批量执行所有输入
输出:
[3, 1, 3, 5, 7]
流式处理示例:
# 定义一个模拟 LLM 流式输出的模块(逐字输出字符串)def generate_streaming_tokens(n: int): result = f"Hello! The input number is {n}." for char in result: time.sleep(0.1) yield char # 模拟 token-by-token 输出# 封装为 Runnable,可支持 .stream()streaming_chain = RunnableLambda(generate_streaming_tokens)# 流式执行并输出(逐字符打印)for token in streaming_chain.stream(3): print(token, end="")
输出:
你自定义的模块就支持了 invoke()
、batch()
、stream()
三种执行入口方式,和 LangChain 内建模块行为一致。
六、对比图示:自定义模块如何融入 LangChain 执行架构
[input] -> RunnableLambda(double) -> AddOne.invoke() | | with_retry with_fallbacks ↓ ↓ .stream() / .batch() .invoke() / / 输出流或批量结果 最终值
如图所示,你的模块已具备组合、控制、可调试的核心特性。
七、小结:你通过这个模块学到了什么?
- ✅ 理解
Runnable
接口的设计;✅ 实现场景级别的错误恢复机制;✅ 掌握 LCEL 链式组合思维;✅ 学会如何让自己的业务模块无缝接入 LangChain 生态。接下来,我们将深入剖析 LangChain 中的 ChatModel 接口,解析其多模型适配架构与工厂模式设计,构建高复用的语言模型封装层。