掘金 人工智能 08月01日 11:14
LangChain 深入分析第二篇:Runnable 和 LCEL 是如何实现的
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

LangChain 引入了 LCEL(LangChain Expression Language),将传统的命令式编程思维转变为声明式的表达式组合。核心概念 Runnable 作为最小执行单元,通过链式语法(如 `|`)组合,实现了模块化、可组合、可观测的 AI 应用构建。LCEL 提供了 `invoke`、`stream`、`batch` 等多种执行方式,并支持 `with_retry`、`with_fallbacks`、`RunnableParallel`、`RunnableBranch` 等扩展功能,极大地增强了 AI 应用的鲁棒性、灵活性和可维护性。通过继承 Runnable,自定义模块也能无缝集成到 LCEL 生态,为开发复杂的 AI 应用提供了强大的执行架构。

✨ **核心概念 Runnable**:LCEL 的基石是 Runnable,它是 LangChain 中任何可执行单元的抽象。每个 Runnable 封装了一个工作单元,并提供 `invoke`(同步执行)、`stream`(流式输出)、`batch`(批量处理)等方法,使得不同模块能够以统一的方式进行交互和组合。

⚙️ **LCEL 表达式组合**:LangChain Expression Language (LCEL) 允许开发者通过简单的链式语法(使用 `|` 操作符)将多个 Runnable 模块连接起来,构建出复杂的数据处理流程。这种声明式的方式将执行路径抽象为数据流,从“命令式控制”转向“表达式组合”,极大提升了代码的可读性和可维护性。

🚀 **增强的执行能力**:LCEL 不仅支持基础的执行,还提供了丰富的扩展功能来增强应用鲁棒性和灵活性。例如,`with_retry` 可实现自动重试,`with_fallbacks` 提供备用执行路径,`RunnableParallel` 支持并行处理,`RunnableBranch` 实现条件分支逻辑,这些功能使得 AI 应用能够更好地应对各种异常情况和复杂场景。

🛠️ **模块化与可观测性**:LCEL 的设计理念是将 AI 应用的执行逻辑分解为独立的、可组合的模块(Runnable)。每个模块都可以被单独测试、替换和扩展。通过 `with_config` 等方法,还可以为执行过程添加元信息(如 tags、run_name),结合 LangSmith 等工具,实现对整个执行链路的精细化追踪和调试,提高了应用的可观测性。

🎨 **自定义与集成**:开发者可以通过继承 `Runnable` 基类来创建自定义的执行模块。通过重写 `_invoke` 方法,任何 Python 函数或逻辑都可以被纳入 LCEL 体系,并与其他 LangChain 模块(如 LLM、PromptTemplate、Parser 等)无缝集成,构建出结构清晰、功能强大的 AI 应用流程。

一、前言:从 "路由" 转向 "表达式"

在传统编程范式中,我们往往通过命令式结构(如 if/elsefortry/except)明确编排程序的执行路径,就像手动设计一张流程图。

LangChain 通过 LCEL(LangChain 表达式语言)将这类流程控制抽象为“可组合的执行表达式”。每个模块都被封装为 Runnable,多个模块之间通过链式语法自然组合,系统根据数据流自动推导执行顺序

这不仅仅是语法层面的变化,更代表着一种编程思维的演进:从“命令式控制”转向“表达式组合”,从“显式逻辑”转向“模块化构建”。


二、核心概念:Runnable 是什么?

Runnable 是 LangChain 架构中的核心抽象单元,是 LCEL 表达式语言的最小执行模块和构建基石。

任何可执行的单元都是 Runnable

核心基类

class Runnable(ABC, Generic[Input, Output]):    """A unit of work that can be invoked, batched, streamed, transformed and composed.    ...    """        @abstractmethod    def invoke(        self,        input: Input,        config: Optional[RunnableConfig] = None,        **kwargs: Any,    ) -> Output:        """Transform a single input into an output.        ...        """

.invoke(input) 是 LangChain 中 Runnable 接口的核心同步调用方法,用于将输入传入模块,执行并返回结果。适用于单次执行、测试链路或组合多个模块后的最终触发。

特点:

LangChain 将你所知道的:

通通都抽象为 Runnable,不同类型只是指定了不同的输入/输出数据类型

其他方法

方法作用说明典型应用场景
.stream()实时流式输出,边生成边返回聊天机器人、文本生成
.batch()批量处理输入,提高吞吐量批量分类、批量生成
.transform()对输出结果进行后续转换处理格式化、过滤、标注
.with_fallbacks()多路径备选执行,容错降级多模型容灾、工具降级
.with_retry()失败自动重试,保证稳定性网络异常、API 失败重试

三、LCEL 组合式表达式:从几个 Runnable 到一条链

LCEL 是 "LangChain Expression Language" 的缩写,目标是用最少的输入以最光明的方式构造最处理的连接流程。

示例:

from langchain_core.runnables import RunnableMap, RunnableLambdachain = RunnableMap({"name": lambda _: "LangChain"}) \    | RunnableLambda(lambda d: f"Hello, {d['name']}!")print(chain.invoke({}))# Hello, LangChain!

相当于:

    先生成一个字典 {"name": "LangChain"}传递给下一个 lambda,输出 "Hello, LangChain"

后续可以这样扩展:

| some_chat_model \| some_output_parser \| final_formatter

四、实现解析:执行链条与数据流通道

LangChain 的 Runnable 原理根基于一套“输入 - 运行 - 输出”的基本通道模型:

通过 invoke() 启动

def invoke(self, input: Input) -> Output:    return self._call_with_config(input, config)

配合 Configurable 和 RunnableBinding

它们会为 chain 中的每个 Runnable 自动分配唯一的 ID,并附加配置(config),以便于后续的日志记录、异常追踪以及 fallback 分支的排查与切换。

支持 RunnableSequence 系列化

总结核心意图

LangChain 在执行层的设计并不仅仅是为了“串起来能用”,而是希望通过 Runnable + LCEL 表达式系统,构建一个可观测、可控制、易扩展、可组合、可替换的 AI 应用执行基座。

下图是典型执行链条的简化视图,展示了 LangChain 中各个模块的基本作用和周边能力增强点:

[Prompt] -> [LLM] -> [Parser] -> [PostProcess]   \         |         |            |  Config  Retry     OutputMap    LangSmith Trace

这种结构让 LangChain 能像搭积木一样构建复杂任务,又能像监控系统一样追踪每个模块的执行情况。


五、扩展方案:如何实现自定义 Runnable

所有 Runnable 只需要继承 Runnable,重写 _invoke

示例:实现一个简单 Tool

from langchain_core.runnables import Runnableclass AddOne(Runnable[int, int]):    def invoke(self, input: int, config=None) -> int:        return input + 1

这个简单的 AddOne 类展示了 LangChain 架构的核心理念:将所有可执行逻辑抽象为统一接口 Runnable,使其天然具备组合性、控制性和可观测性。

通过继承 Runnable,即便是最简单的 Python 函数,也能被纳入 LCEL 表达式系统,与 LLM、Prompt、Parser 等模块无缝集成,构建出结构清晰、逻辑强大、易于维护的 AI 应用流程。

与其他 Runnable 组合

from langchain_core.runnables import RunnableLambdaadd = AddOne()square = RunnableLambda(lambda x: x * x)workflow = add | squareworkflow.invoke(2)  # 输出:9

六、相关扩展功能

功能说明和代码示例

from langchain_core.runnables.retry import RetryHandlerretryable = llm.with_retry(RetryHandler(max_attempts=3))response = retryable.invoke("你好")
robust_chain = llm.with_fallbacks([openai_llm, local_llm])
configured = chain.with_config(tags=["query", "qa"], run_name="qa_main_chain")
from langchain_core.runnables import RunnableParallelparallel_chain = RunnableParallel({    "qa": qa_chain,    "summary": summary_chain,})result = parallel_chain.invoke({"input": "今天的新闻..."})
from langchain_core.runnables import RunnableBranchbranch = RunnableBranch(    (lambda x: "help" in x.lower(), help_chain),    (lambda x: "buy" in x.lower(), shopping_chain),    default_chain)

扩展功能与 .stream() / .batch() 联合使用


LangChain 中的 Runnable 拥有统一的三种执行入口:

下面我们按扩展能力逐一说明如何结合使用:


✅ 1. with_retry() + .batch() / .stream()
retry_llm = llm.with_retry()results = retry_llm.batch(["你好", "请翻译", "帮我写一封邮件"])
for chunk in llm.with_retry().stream("请写一段诗"):    print(chunk, end="")

✅ 2. with_fallbacks() + .batch() / .stream()
robust = llm.with_fallbacks([backup_llm])results = robust.batch(["你是谁", "请总结这段文字"])
for chunk in robust.stream("用 fallback 模式生成一段回答"):    print(chunk, end="")

✅ 3. with_config() + .batch() / .stream()
configured = chain.with_config(tags=["批处理任务"], run_name="qa_batch")results = configured.batch(list_of_questions)
for token in chain.with_config(run_name="stream_chat").stream("你好啊"):    print(token, end="")

✅ 4. RunnableParallel + .batch() / .stream()
parallel = RunnableParallel({    "translate": translator,    "summary": summarizer})results = parallel.batch([{"text": "A"}, {"text": "B"}])

输出结果结构:

[  {"translate": "...", "summary": "..."},  {"translate": "...", "summary": "..."}]

目前 stream + 并行使用受限,不建议用于多路 stream 合并,需自定义包装


✅ 5. RunnableBranch + .batch() / .stream()
branch = RunnableBranch(    (lambda x: "翻译" in x, translator),    (lambda x: "摘要" in x, summarizer),    default_chain)results = branch.batch(["请翻译", "请摘要", "默认处理"])

每条输入匹配一个路径,独立处理


七、总结思维

Runnable 和 LCEL 不是一套 API,而是一套 执行逻辑架构模型

它为 LLM 应用提供了一套通用型执行单元,并能够被分层、并行、切换、监控和应急处理,是 LangChain 搭建模块化调度器的基础。

接下来我们将手工实现一个完整的 Runnable 模块,并构建属于自己的 LangChain 流式链条。

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

LangChain LCEL Runnable AI应用 编程范式
相关文章