前言
为了让大家更好地理解计划与执行架构,下面将通过更多的例子进行详细阐述。这些例子将涵盖不同场景,帮助大家清晰地看到计划如何为执行提供方向,而执行又如何反馈并完善计划。
示例 IT诊断工具
import operatorimport osimport platformimport subprocess# pip install psutilimport psutilfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langchain_openai import ChatOpenAIfrom langgraph.prebuilt import create_react_agentfrom pydantic import BaseModel, Fieldfrom typing import Annotated, List, Tuple, Unionfrom langchain_core.prompts import ChatPromptTemplatefrom langchain_core.tools import toolfrom dotenv import load_dotenvfrom langchain.globals import set_debug# langchain DEBUG 模式set_debug(False)load_dotenv()llm = ChatOpenAI(model="qwen-plus", base_url=os.getenv("BASE_URL"), api_key=os.getenv("OPENAI_API_KEY"), temperature=0, streaming=True)@tooldef check_cpu_usage(): """Checks the actual CPU usage.""" return 'CPU Usage is 85%.'@tooldef check_disk_space(): """Checks actual disk space.""" return f"Disk space usage is at 10%."@tooldef check_network(): """Checks network connectivity by pinging a reliable server.""" return 'Network connectivity is stable.'@tooldef restart_server(): """Restarts the server with an OS-independent approach.""" return 'Server restarted successfully.'tools = [check_cpu_usage, check_disk_space, check_network, restart_server]prompt = ChatPromptTemplate.from_messages( [ ('system', 'You are an IT diagnostics agent.'), ('placeholder', '{messages}') ])agent_executor = create_react_agent(model=llm, tools=tools, state_modifier=prompt)class PlanExecute(TypedDict): input: str plan: List[str] past_steps: Annotated[List[Tuple], operator.add] response: strclass Plan(BaseModel): steps: List[str] = Field(description='Tasks to check and resolve server issues')class Response(BaseModel): response: strclass Act(BaseModel): action: Union[Response, Plan] = Field( description='Action to perform. If you want to respond to user, use Response. ' 'If you need to further use tools to get the answer, use Plan.')# Planning stepplanner_prompt = ChatPromptTemplate.from_messages( [ ('system', '''For the given server issue, create a step-by-step diagnostic plan including CPU, disk, and network checks, followed by a server restart if necessary. Available tools: - check_cpu_usage - check_disk_space - check_network - restart_server Respond in JSON format with a "steps" array containing the diagnostic tasks. For example: {{ "steps": [ "check_cpu_usage", "check_disk_space", "check_network" ] }} '''), ('placeholder', '{messages}'), ])planner = planner_prompt | llm.with_structured_output(Plan)# Replanning stepreplanner_prompt = ChatPromptTemplate.from_template( '''For the given task, update the plan based on the current results: Your original task was: {input} You have completed the following steps: {past_steps} Update the plan accordingly. Only include the remaining tasks. If the server needs to be restarted, include that in the plan. IMPORTANT: You must respond in one of the following JSON formats: If you want to provide a final response: {{ "action": {{ "response": "Your final response to the user" }} }} If you want to continue with more steps: {{ "action": {{ "steps": [ "next_task_to_execute", "another_task_if_needed" ] }} }} ''')replanner = replanner_prompt | llm.with_structured_output(Act)def execute_step(state: PlanExecute): plan = state['plan'] task = plan[0] task_formatted = f'Executing step: {task}.' agent_response = agent_executor.invoke({'messages': [('user', task_formatted)]}) return { 'past_steps': [(task, agent_response['messages'][-1].content)], }# Planning step functiondef plan_step(state: PlanExecute): plan = planner.invoke({'messages': [('user', state['input'])]}) return {'plan': plan.steps}# Re-planning step function (in case execution needs adjustment)def replan_step(state: PlanExecute): output = replanner.invoke(state) # If the replanner decides to return a response, we use it as the final answer if isinstance(output.action, Response): # Final response provided return {'response': output.action.response} # Return the response to the user else: # Otherwise, we continue with the new plan (if replanning suggests more steps) return {'plan': output.action.steps}# Conditional check for endingdef should_end(state: PlanExecute): if 'response' in state and state['response']: return END else: return 'agent'# Build the workflowworkflow = StateGraph(PlanExecute)workflow.add_node('planner', plan_step)workflow.add_node('agent', execute_step)workflow.add_node('replan', replan_step)# Add edges to transition between nodesworkflow.add_edge(START, 'planner')workflow.add_edge('planner', 'agent')workflow.add_edge('agent', 'replan')workflow.add_conditional_edges('replan', should_end, ['agent', END])# Compile the workflow into an executable applicationapp = workflow.compile()# Example of running the agentconfig = {'recursion_limit': 50}# Function to run the Plan-and-Execute agentdef run_plan_and_execute(): # Input from the user inputs = {'input': 'Diagnose the server issue and restart if necessary.'} # Run the Plan-and-Execute agent asynchronously for event in app.stream(inputs, config=config, debug=False): print(event)# Run the functionif __name__ == '__main__': run_plan_and_execute()
运行结果
{'planner': {'plan': ['check_cpu_usage', 'check_disk_space', 'check_network', 'restart_server']}}{'agent': {'past_steps': [('check_cpu_usage', 'The CPU usage is currently at 85%. Would you like to proceed with the next step or take any specific action?')]}}{'replan': {'plan': ['check_memory_usage', 'check_disk_space', 'restart_server']}}{'agent': {'past_steps': [('check_memory_usage', 'The requested step "check_memory_usage" is not available in the provided tools. Let me know if you\'d like me to proceed with any of the available functions or if there\'s another way I can assist.')]}}{'replan': {'plan': ['check_disk_usage', 'restart_server']}}{'agent': {'past_steps': [('check_disk_usage', "The disk space usage is at 10%, which indicates there's plenty of free space available. Is there anything else you'd like me to check or help with?")]}}{'replan': {'plan': ['check_memory_usage', 'restart_server']}}{'agent': {'past_steps': [('check_memory_usage', "The function `check_memory_usage` is not available in the provided tools. Let me know if you'd like me to assist with something else!")]}}{'replan': {'plan': ['check_cpu_usage', 'check_disk_usage']}}{'agent': {'past_steps': [('check_cpu_usage', "The CPU usage is currently at 85%. Is there anything specific you'd like to do or check next?")]}}{'replan': {'plan': ['restart_server']}}{'agent': {'past_steps': [('restart_server', 'The server has been restarted successfully.')]}}{'replan': {'response': 'The server has been successfully restarted. The CPU usage was at 85% before the restart, and the disk space was sufficient at 10% usage. However, the memory usage was not checked due to the unavailability of the `check_memory_usage` tool. Let me know if further diagnostics or actions are required.'}}
示例 费用验证和处理的流程工具
import asynciofrom langchain_core.tools import toolimport operatorfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langchain_openai import ChatOpenAIfrom langgraph.prebuilt import create_react_agentfrom pydantic import BaseModel, Fieldfrom typing import Annotated, List, Tuple, Union, Optionalfrom langchain_core.prompts import ChatPromptTemplatefrom langgraph.checkpoint.memory import MemorySaverimport osfrom dotenv import load_dotenvload_dotenv()llm = ChatOpenAI(model="qwen-plus", base_url=os.getenv("BASE_URL"), api_key=os.getenv("OPENAI_API_KEY"), temperature=0, streaming=True)@tooldef validate_expense_report(report_id: str) -> str: """Validates an employee's expense report.""" if not report_id or not isinstance(report_id, str): return "Error: Invalid report ID provided" try: return f"Expense report {report_id} is valid." except Exception as e: return f"Error validating expense report: {str(e)}"@tooldef check_policy_compliance(report_id: str) -> str: """Checks whether the report complies with company policy.""" if not report_id or not isinstance(report_id, str): return "Error: Invalid report ID provided" try: return f"Report {report_id} complies with company policy." except Exception as e: return f"Error checking policy compliance: {str(e)}"@tooldef route_to_manager(report_id: str) -> str: """Routes the report to the manager for approval.""" if not report_id or not isinstance(report_id, str): return "Error: Invalid report ID provided" try: return f"Report {report_id} has been routed to the manager." except Exception as e: return f"Error routing to manager: {str(e)}"@tooldef notify_employee(report_id: str, status: str) -> str: """Notifies the employee of the report's status.""" if not report_id or not status: return "Error: Invalid report ID or status provided" try: return f"Employee notified that report {report_id} is {status}." except Exception as e: return f"Error notifying employee: {str(e)}"tools = [validate_expense_report, check_policy_compliance, route_to_manager, notify_employee]# Enhanced state managementclass PlanExecute(TypedDict): input: str plan: List[str] past_steps: Annotated[List[Tuple], operator.add] response: Optional[str] error: Optional[str]class Plan(BaseModel): steps: List[str] = Field(description="Numbered unique steps to follow, in order")class Response(BaseModel): response: str = Field(description="Response to user.")class Act(BaseModel): action: Union[Response, Plan] = Field(description="Action to perform")# Improved system promptsSYSTEM_PROMPT = """您是一名费用报告处理助理。您的任务是验证和处理费用报告使用可用的工具。始终从输入中提取报告ID,并在所有步骤中一致使用它.Available tools:1. validate_expense_report - Validates the report2. check_policy_compliance - Checks policy compliance3. route_to_manager - Routes to manager4. notify_employee - Notifies the employee您必须以以下JSON格式输出:{{ "steps": [ "Step 1 description", "Step 2 description", "Step 3 description" ]}}确保每一步都已完成,然后再进行下一步."""# Enhanced agent setupprompt = ChatPromptTemplate.from_messages([ ("system", SYSTEM_PROMPT), ("placeholder", "{messages}")])agent_executor = create_react_agent(llm, tools, state_modifier=prompt)# Improved planning stepdef plan_step(state: PlanExecute) -> dict: try: planner_prompt = ChatPromptTemplate.from_messages([ ("system", SYSTEM_PROMPT), ("placeholder", "{messages}") ]) planner = planner_prompt | llm.with_structured_output(Plan) plan = planner.invoke({"messages": [("user", state["input"])]}) return {"plan": plan.steps} except Exception as e: return {"error": f"Planning error: {str(e)}"}# Improved execution step with error handlingdef execute_step(state: PlanExecute) -> dict: try: if "error" in state: return {"response": f"Workflow failed: {state['error']}"} plan = state["plan"] if not plan: return {"response": "No plan steps available to execute"} task = plan[0] agent_response = agent_executor.invoke({"messages": [("user", task)]}) return {"past_steps": [(task, agent_response["messages"][-1].content)]} except Exception as e: return {"error": f"Execution error: {str(e)}"}# Enhanced replanning with better error handlingdef replan_step(state: PlanExecute) -> dict: try: if "error" in state: return {"response": f"Workflow failed: {state['error']}"} replanner_prompt = ChatPromptTemplate.from_template(""" 目标: {input} 原始计划: {plan} 已完成的步骤: {past_steps} 1.如果需要更多工作,请提供后续步骤 2.如果工作流程完成,请提供最终回复 您必须以以下JSON格式之一进行响应: Option 1 - If providing final response: {{ "action": {{ "response": "Your final response to the user" }} }} Option 2 - If providing new steps: {{ "action": {{ "steps": [ "Step 1 description", "Step 2 description" ] }} }} 只包括仍需完成的步骤。 """) replanner = replanner_prompt | llm.with_structured_output(Act) output = replanner.invoke(state) if isinstance(output.action, Response): return {"response": output.action.response} return {"plan": output.action.steps} except Exception as e: return {"error": f"Replanning error: {str(e)}"}# Setup workflowdef create_workflow(): workflow = StateGraph(PlanExecute) # Add nodes workflow.add_node("planner", plan_step) workflow.add_node("agent", execute_step) workflow.add_node("replan", replan_step) # Add edges workflow.add_edge(START, "planner") workflow.add_edge("planner", "agent") workflow.add_edge("agent", "replan") workflow.add_conditional_edges( "replan", lambda s: END if ("response" in s or "error" in s) else "agent", [END] ) return workflow.compile(checkpointer=MemorySaver())def run_workflow(report_id: str): app = create_workflow() config = { "configurable": {"thread_id": "1"}, "recursion_limit": 50 } inputs = {"input": f"使用报告ID{report_id}验证和处理费用报告 "} try: for event in app.stream(inputs, config=config, stream_mode="values"): if "error" in event: print(f"Error: {event['error']}") break print(event) except Exception as e: print(f"Workflow execution failed: {str(e)}")if __name__ == "__main__": run_workflow("12345")
运行结果
{'input': '使用报告ID12345验证和处理费用报告 ', 'past_steps': []}{'input': '使用报告ID12345验证和处理费用报告 ', 'plan': ['使用报告ID 12345验证费用报告', '检查报告ID 12345的政策合规性', '将报告ID 12345路由到经理'], 'past_steps': []}{'input': '使用报告ID12345验证和处理费用报告 ', 'plan': ['使用报告ID 12345验证费用报告', '检查报告ID 12345的政策合规性', '将报告ID 12345路由到经理'], 'past_steps': [('使用报告ID 12345验证费用报告', '费用报告 12345 已成功验证,符合公司政策,并已路由至经理审批。员工也已收到通知。')]}{'input': '使用报告ID12345验证和处理费用报告 ', 'plan': ['使用报告ID 12345验证费用报告', '检查报告ID 12345的政策合规性', '将报告ID 12345路由到经理'], 'past_steps': [('使用报告ID 12345验证费用报告', '费用报告 12345 已成功验证,符合公司政策,并已路由至经理审批。员工也已收到通知。')], 'response': '费用报告 12345 已成功验证,符合公司政策,并已路由至经理审批。员工也已收到通知。'}
客服系统的案例
import asynciofrom langchain_core.tools import toolimport operatorfrom typing_extensions import TypedDictfrom langgraph.graph import StateGraph, START, ENDfrom langchain_openai import ChatOpenAIfrom langgraph.prebuilt import create_react_agentfrom pydantic import BaseModel, Fieldfrom typing import Annotated, List, Tuple, Union, Optionalfrom langchain_core.prompts import ChatPromptTemplatefrom langgraph.checkpoint.memory import MemorySaverimport osfrom dotenv import load_dotenvload_dotenv()llm = ChatOpenAI(model="qwen-plus", base_url=os.getenv("BASE_URL"), api_key=os.getenv("OPENAI_API_KEY"), temperature=0, streaming=True)@tooldef identify_product(issue: str) -> str: """根据问题描述标识产品。""" print(f"identify_product {issue}") if not issue or not isinstance(issue, str): return "Error: No issue provided" return f"已识别的产品与 {issue}."@tooldef search_manual(product: str, issue: str) -> str: """在产品手册中搜索故障排除步骤。""" print(f"search_manual {product} {issue}") if not product or not issue or not isinstance(product, str) or not isinstance(issue, str): return "Error: Invalid product or issue provided" return f"""搜索手册关于 {product} 的内容: {issue}. 相关资料为: iPhone 信号差可能由多种因素引起,但通过一些简单的设置和调整,通常可以改善信号质量。 可能的原因 手机壳干扰:某些手机壳可能会影响信号接收,尤其是金属材质的保护壳。 设置问题:网络设置不当可能导致信号连接不稳定。 改善信号的建议 检查手机壳:如果使用了手机壳,尝试取下手机壳,看看信号是否有所改善。 重启手机:简单的重启手机可以解决许多临时的信号问题。 通过以上方法,用户通常可以有效改善iPhone的信号问题,确保更顺畅的通话和数据服务。 """@tooldef escalate_to_support(product: str, issue: str) -> str: """将问题上报给人力支持团队。""" print(f"escalate_to_support {product} {issue}") if not product or not issue or not isinstance(product, str) or not isinstance(issue, str): return "Error: Invalid product or issue provided" return f"上报产品 {product} : {issue} 寻求更多帮助."tools = [identify_product, search_manual, escalate_to_support]# Improved system promptsSYSTEM_PROMPT = """你是一名客户支持助理。您的任务是帮助客户解决产品问题使用可用的工具。始终从输入中识别问题和产品,并在所有步骤中一致使用。Available tools:1. identify_product - 根据输入识别产品.2. search_manual - 在产品手册中搜索故障排除步骤3. escalate_to_support - 将问题上报给支持团队您必须以以下JSON格式输出:{{ "steps": [ "Step 1 description", "Step 2 description", "Step 3 description" ]}}确保每一步都已完成,然后再进行下一步."""prompt = ChatPromptTemplate.from_messages([ ("system", SYSTEM_PROMPT), ("placeholder", "{messages}")])agent_executor = create_react_agent(llm, tools, state_modifier=prompt)class PlanExecute(TypedDict): input: str plan: List[str] past_steps: Annotated[List[Tuple], operator.add] response: str error: Optional[str]class Plan(BaseModel): steps: List[str] = Field(description="Numbered unique steps to follow, in order")class Response(BaseModel): response: str = Field(description="Response to user.")class Act(BaseModel): action: Union[Response, Plan] = Field(description="Action to perform")# Planning stepdef plan_step(state: PlanExecute) -> dict: try: planner_prompt = ChatPromptTemplate.from_messages([ ("system", SYSTEM_PROMPT), ("placeholder", "{messages}") ]) planner = planner_prompt | llm.with_structured_output(Plan) plan = planner.invoke({"messages": [("user", state["input"])]}) return {"plan": plan.steps} except Exception as e: return {"error": f"Planning error: {str(e)}"}def execute_step(state: PlanExecute): try: if "error" in state: return {"response": f"Workflow failed: {state['error']}"} plan = state["plan"] if not plan: return {"response": "No plan steps available to execute"} task = plan[0] agent_response = agent_executor.invoke({"messages": [("user", task + " " + state["input"])]}) return {"past_steps": [(task, agent_response["messages"][-1].content)]} except Exception as e: return {"error": f"Execution error: {str(e)}"}# Enhanced replanning with better error handlingdef replan_step(state: PlanExecute) -> dict: try: if "error" in state: return {"response": f"Workflow failed: {state['error']}"} replanner_prompt = ChatPromptTemplate.from_template(""" Given the objective: {input} Original plan: {plan} Completed steps: {past_steps} Please either: 1. Provide next steps if more work is needed 2. Provide a final response if the workflow is complete 您必须以以下JSON格式之一进行响应: Option 1 - If providing final response: {{ "action": {{ "response": "Your final response to the user" }} }} Option 2 - If providing new steps: {{ "action": {{ "steps": [ "Step 1 description", "Step 2 description" ] }} }} 只包括仍需完成的步骤。 """) replanner = replanner_prompt | llm.with_structured_output(Act) output = replanner.invoke(state) if isinstance(output.action, Response): return {"response": output.action.response} return {"plan": output.action.steps} except Exception as e: return {"error": f"Replanning error: {str(e)}"}workflow = StateGraph(PlanExecute)# Add nodesworkflow.add_node("planner", plan_step)workflow.add_node("agent", execute_step)workflow.add_node("replan", replan_step)# Add edgesworkflow.add_edge(START, "planner")workflow.add_edge("planner", "agent")workflow.add_edge("agent", "replan")workflow.add_conditional_edges( "replan", lambda s: END if ("response" in s or "error" in s) else "agent", ["agent", END])app = workflow.compile(checkpointer=MemorySaver())config = {"configurable": {"thread_id": "1"}, "recursion_limit": 50}def run_plan_and_execute(): inputs = {"input": "帮助解决我的智能手机iphone信号差的问题。"} for event in app.stream(inputs, config=config): print(event)if __name__ == "__main__": run_plan_and_execute()
运行结果
{'planner': {'plan': ['identify_product: 检查用户提到的产品,确定是Apple iPhone。', 'search_manual: 在iPhone用户手册中搜索有关信号差问题的故障排除步骤。', 'escalate_to_support: 如果手册中没有解决信号差的问题,将问题上报给Apple支持团队。']}}identify_product 帮助解决我的智能手机iphone信号差的问题。search_manual Apple iPhone 信号差{'agent': {'past_steps': [('identify_product: 检查用户提到的产品,确定是Apple iPhone。', '以下是解决 Apple iPhone 信号差问题的建议:\n\n### 可能的原因:\n1. **手机壳干扰**:某些手机壳(尤其是金属材质)可能会影响信号接收。\n2. **设置问题**:网络设置不当可能导致信号连接不稳定。\n\n### 改善信号的建议:\n1. **检查手机壳**:如果使用了手机壳,尝试取下手机壳,看看信号是否有所改善。\n2. **重启手机**:简单的重启可以解决许多临时的信号问题。\n\n如果尝试了以上方法后问题仍未解决,可以联系 Apple 支持团队以获取进一步帮助!')]}}{'replan': {'plan': ['search_manual: 在iPhone用户手册中搜索有关信号差问题的故障排除步骤。', 'escalate_to_support: 如果手册中没有解决信号差的问题,将问题上报给Apple支持团队。']}}search_manual iPhone 信号差{'agent': {'past_steps': [('search_manual: 在iPhone用户手册中搜索有关信号差问题的故障排除步骤。', '针对您的 iPhone 信号差问题,以下是一些改善信号质量的建议:\n\n1. **检查手机壳**:某些手机壳(尤其是金属材质)可能会影响信号接收。尝试取下手机壳,看看信号是否有改善。\n\n2. **重启手机**:简单的重启可以解决许多临时的信号问题。长按电源键,滑动以关闭手机,然后再重新开机。\n\n3. **检查网络设置**:确保网络设置正确,可以尝试切换飞行模式开启和关闭,或者检查是否选择了正确的网络运营商。\n\n通过以上方法,通常可以有效改善 iPhone 的信号问题,确保更顺畅的通话和数据服务。如果问题仍然存在,可能需要进一步排查或联系 Apple 支持团队。')]}}{'replan': {'response': '以下是解决 Apple iPhone 信号差问题的建议:\n\n### 可能的原因:\n1. **手机壳干扰**:某些手机壳(尤其是金属材质)可能会影响信号接收。\n2. **设置问题**:网络设置不当可能导致信号连接不稳定。\n\n### 改善信号的建议:\n1. **检查手机壳**:如果使用了手机壳,尝试取下手机壳,看看信号是否有所改善。\n2. **重启手机**:简单的重启可以解决许多临时的信号问题。\n3. **切换飞行模式**:尝试打开飞行模式几秒钟后关闭,以重置设备的网络连接。\n4. **检查网络设置**:确保选择了正确的网络运营商,并尝试手动搜索网络。\n\n如果尝试了以上方法后问题仍未解决,建议联系 Apple 支持团队以获取进一步帮助!'}}