一、整体思路
长网页文本往往超过 LLM 单次处理的 token 限制,我们需要设计一个 map-reduce 流水线来拆分、局部总结、归并:
- 加载网页内容拆分成可控大小的 chunk对每个 chunk 做初步总结 (map)汇总所有初步总结 (reduce)如有需要递归 reduce 直到满足 token 限制输出最终总结
接下来我们用代码实现!
二、准备工作
1. 初始化 LLM
首先我们通过 init_chat_model
加载 LLM:
# llm_env.pyfrom langchain.chat_models import init_chat_modelllm = init_chat_model("gpt-4o-mini", model_provider="openai")
三、主程序 main.py
1. 导入依赖 & 初始化
import osimport syssys.path.append(os.getcwd())from langchain_community.document_loaders import WebBaseLoaderfrom langchain.chains.combine_documents import create_stuff_documents_chainfrom langchain.chains.llm import LLMChainfrom langchain_core.prompts import ChatPromptTemplatefrom langchain_text_splitters import CharacterTextSplitterimport operatorfrom typing import Annotated, List, Literal, TypedDictfrom langchain.chains.combine_documents.reduce import collapse_docs, split_list_of_docsfrom langchain_core.documents import Documentfrom langgraph.constants import Sendfrom langgraph.graph import END, START, StateGraphfrom llm_set import llm_envllm = llm_env.llm
2. 加载网页
loader = WebBaseLoader("https://en.wikipedia.org/wiki/Artificial_intelligence")docs = loader.load()
通过 WebBaseLoader 可以轻松加载网页文本到 docs
列表中。
3. 定义 Prompt 模板
- Map 阶段 Prompt
map_prompt = ChatPromptTemplate.from_messages( [("system", "Write a concise summary of the following: \n\n{context}")])
- Reduce 阶段 Prompt
reduce_template = """The following is a set of summaries:{docs}Take these and distill it into a final, consolidated summaryof the main themes."""reduce_prompt = ChatPromptTemplate([("human", reduce_template)])
4. 拆分文档 chunk
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)split_docs = text_splitter.split_documents(docs)print(f"Split into {len(split_docs)} chunks")
将网页内容拆分成多个 chunk,chunk 大小设置 1000 tokens,便于单次处理。
5. 定义 Token 长度计算
token_max = 1000def length_function(documents: List[Document]) -> int: return sum(llm.get_num_tokens(d.page_content) for d in documents)
计算输入文档 token 总量,用于判断是否需要继续 collapse。
6. 定义状态
主状态:
class OverallState(TypedDict): contents: List[str] summaries: Annotated[list, operator.add] collapsed_summaries: List[Document] final_summary: str
Map 阶段状态:
class SummaryState(TypedDict): content: str
7. 生成初步 summary (Map 阶段)
def generate_summary(state: SummaryState): prompt = map_prompt.invoke(state["content"]) response = llm.invoke(prompt) return {"summaries": [response.content]}
8. Map 调度逻辑
def map_summaries(state: OverallState): return [ Send("generate_summary", {"content": content}) for content in state["contents"] ]
9. 收集 summary
def collect_summaries(state: OverallState): return { "collapsed_summaries": [Document(summary) for summary in state["summaries"]] }
10. Reduce 逻辑
- 内部 reduce 函数
def _reduce(input: dict) -> str: prompt = reduce_prompt.invoke(input) response = llm.invoke(prompt) return response.content
- Collapse summaries
def collapse_summaries(state: OverallState): docs_lists = split_list_of_docs( state["collapsed_summaries"], length_function, token_max, ) results = [] for doc_list in docs_lists: combined = collapse_docs(doc_list, _reduce) results.append(combined) return {"collapsed_summaries": results}
11. 是否继续 collapse
def should_collapse(state: OverallState): num_tokens = length_function(state["collapsed_summaries"]) if num_tokens > token_max: return "collapse_summaries" else: return "generate_final_summary"
12. 生成最终 summary
def generate_final_summary(state: OverallState): response = _reduce(state["collapsed_summaries"]) return {"final_summary": response}
四、构建流程图 (StateGraph)
graph = StateGraph(OverallState)graph.add_node("generate_summary", generate_summary)graph.add_node("collect_summaries", collect_summaries)graph.add_node("collapse_summaries", collapse_summaries)graph.add_node("generate_final_summary", generate_final_summary)graph.add_conditional_edges(START, map_summaries, ["generate_summary"])graph.add_edge("generate_summary", "collect_summaries")graph.add_conditional_edges("collect_summaries", should_collapse)graph.add_conditional_edges("collapse_summaries", should_collapse)graph.add_edge("generate_final_summary", END)app = graph.compile()
五、执行总结流程
for step in app.stream( {"contents": [doc.page_content for doc in split_docs]}, {"recursion_limit": 10},): print(list(step.keys()))
通过 .stream()
启动整个流水线,传入切片后的 contents
,流式输出每步结果,直到最终汇总完成。
六、总结
通过这个示例,你可以看到:
✅ 使用 LangChain + LLM 轻松实现 网页总结
✅ 设计了 自动 map-reduce 流程,支持长文本拆分和递归 reduce
✅ 通过 StateGraph 灵活编排流程
原文地址:https://www.cnblogs.com/chenyishi/p/18945790