掘金 人工智能 9小时前
学习 Coze Studio 的智能体会话接口
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文深入解析了Coze Studio智能体会话接口的后端处理流程。从接口层到应用层,再到核心的领域层,文章详细阐述了如何处理用户请求、构建运行参数、执行智能体以及实现流式输出。文中特别提到了X-Accel-Buffering响应头在流式传输中的作用,以及Coze Studio如何利用GORM和gorm.io/gen进行数据库操作。此外,文章还强调了DDD分层原则在架构设计中的应用,以及跨领域防腐层在整合不同领域服务时的重要性,为理解智能体内部执行逻辑奠定了基础。

💡 接口层实现:Coze Studio的接口通过IDL定义,例如`agentrun_service.thrift`文件,并使用`hz`工具自动生成API处理器。接口层主要负责绑定和校验入参,并调用应用层服务。其中,`X-Accel-Buffering: no`响应头被用于控制反向代理的缓冲行为,实现流式传输。

🚀 应用层实现:应用层整合了多个领域服务,包括消息、智能体运行、单智能体、会话和快捷指令等。它处理用户重新生成请求,构造智能体运行参数,并最终调用领域层的`AgentRun`方法实现流式输出。每个领域服务基本对应一个数据库表,并使用GORM进行数据库交互。

⚙️ 领域层实现:核心的`AgentRun`方法通过创建双向管道,并使用`safego.Go`启动一个包含错误恢复机制的goroutine来执行实际的智能体运行逻辑。此过程涉及一系列数据库操作,如查询智能体信息、获取历史对话、创建运行记录和用户消息等。

🔗 跨领域防腐层:文章指出,Coze Studio在领域层操作中引入了跨领域防腐层,以防止领域之间的直接依赖。最终的执行逻辑从`agent_run`领域传递到`single_agent`领域,正式开始智能体的内部执行。

🏛️ DDD分层原则:整个对话接口的处理流程严格遵循DDD(领域驱动设计)的分层原则,包括接口层、应用层、领域层,并通过跨领域防腐层连接不同领域,展现了清晰且模块化的后端架构。

前面我们已经学习了 Coze Studio 的代码架构,对项目整体有了一个大致的了解。今天,我们将深入智能体执行的核心,研究下用户在和智能体对话时,后端服务是如何处理会话请求的。

接口层实现

我们首先来看下智能体会话的接口层实现。正如上一篇所述,Coze Studio 中的接口均通过 IDL 定义,位于 idl/conversation/agentrun_service.thrift 文件:

service AgentRunService {  run.AgentRunResponse AgentRun(1: run.AgentRunRequest request) (    api.post='/api/conversation/chat',     api.category="conversation",     api.gen_path= "agent_run"  )  run.ChatV3Response ChatV3(1: run.ChatV3Request request)(    api.post = "/v3/chat",    api.category="chat",    api.tag="openapi",    api.gen_path="chat"  )}

这里还有一个 /v3/chat 接口,它是正式发布的智能体对外的 API 接口,其实现和 /api/conversation/chat 基本一致,这里不作过多介绍。

然后使用 hz 工具将 IDL 自动生成 API 处理器,位于 backend/api/handler/coze/agent_run_service.go 文件:

// @router /api/conversation/chat [POST]func AgentRun(ctx context.Context, c *app.RequestContext) {  // 绑定并校验入参  var req run.AgentRunRequest  c.BindAndValidate(&req)    // 新建 SSE 发送器  sseSender := sseImpl.NewSSESender(sse.NewStream(c))  c.SetStatusCode(http.StatusOK)  c.Response.Header.Set("X-Accel-Buffering", "no")  // 调用 conversation 应用服务  conversation.ConversationSVC.Run(ctx, sseSender, &req)}

API 接口层没有什么复杂的逻辑,主要是绑定并校验入参,然后调用应用层。

这里值得一提是 X-Accel-Buffering 响应头的使用,这是一个特殊的 HTTP 响应头,主要用于控制反向代理(如 Nginx)的缓冲行为,当它的值被设置为 no 时,表示告诉代理服务器不要对当前响应进行缓冲。如果没有这个设置,Nginx 等代理可能会缓冲响应内容直到缓冲区填满或响应完成,这会导致客户端无法实时获取数据,产生延迟感。这个设置通常用于需要 流式传输 的场景,比如:实时日志输出、大型文件下载、SSE 服务器推送等。

应用层实现

接着再来看下应用层的实现,位于 backend/application/conversation/agent_run.go 文件:

func (c *ConversationApplicationService) Run(ctx context.Context, sseSender *sseImpl.SSenderImpl, ar *run.AgentRunRequest) error {  // 从当前会话中获取用户 ID  userID := ctxutil.MustGetUIDFromCtx(ctx)  // 如果 RegenMessageID > 0 说明是重新生成,将对应的运行记录和消息删除  if ar.RegenMessageID != nil && ptr.From(ar.RegenMessageID) > 0 {    msgMeta, err := c.MessageDomainSVC.GetByID(ctx, ptr.From(ar.RegenMessageID))    if msgMeta != nil {      err = c.AgentRunDomainSVC.Delete(ctx, []int64{msgMeta.RunID})      delErr := c.MessageDomainSVC.Delete(ctx, &msgEntity.DeleteMeta{        RunIDs: []int64{msgMeta.RunID},      })    }  }  // 查询 Agent 信息  agentInfo, caErr := c.checkAgent(ctx, ar)  // 获取当前会话,如果不存在,则创建新会话  conversationData, ccErr := c.checkConversation(ctx, ar, userID)  // 获取快捷指令  var shortcutCmd *cmdEntity.ShortcutCmd  if ar.GetShortcutCmdID() > 0 {    cmdID := ar.GetShortcutCmdID()    cmdMeta, err := c.ShortcutDomainSVC.GetByCmdID(ctx, cmdID, 0)    shortcutCmd = cmdMeta  }  // 构造智能体运行参数  arr, err := c.buildAgentRunRequest(ctx, ar, userID, agentInfo.SpaceID, conversationData, shortcutCmd)    // 调用 agent_run 领域服务  streamer, err := c.AgentRunDomainSVC.AgentRun(ctx, arr)    // 从 streamer 拉取消息,根据消息类型构建对应的响应体,通过 sseSender 发送出去,实现流式输出  c.pullStream(ctx, sseSender, streamer, ar)  return nil}

为了方便表述,我对原代码的顺序做了一些调整,主要包括三大块逻辑:

    处理用户的重新生成请求:一般来说只会在最后一轮对话上出现重新生成按钮,实现逻辑就是将对应的运行记录和消息删除,然后继续正常的会话即可;构造智能体运行参数:查询必要的信息,包括 Agent 信息、会话信息、快捷指令等,构造智能体运行参数;运行智能体:调用 agent_run 领域服务,并通过 SSE 实现流式输出;

从代码可以看出,应用层整合了多个领域层服务,这里的每一个领域基本上都对应一个数据库表:

Coze Studio 使用 GORM 访问数据库,这是一个功能强大的 Go 语言 ORM(对象关系映射)库,它简化了 Go 程序与数据库的交互,提供了优雅的 API 和丰富的功能:

感兴趣的同学可以在每个领域的 internal/dal 下找到 modelquery 两个目录,这些都是通过 gorm.io/gen 自动生成的,包含对数据库表的增删改查代码:

backend/domain/agent/singleagent/internal/dal├── model│   ├── single_agent_draft.gen.go│   ├── single_agent_publish.gen.go│   └── single_agent_version.gen.go├── query│   ├── gen.go│   ├── single_agent_draft.gen.go│   ├── single_agent_publish.gen.go│   └── single_agent_version.gen.go

领域层实现

上面的 Run() 方法中,最核心的一句是调用领域层的 AgentRun() 方法,其实现位于 backend/domain/conversation/agentrun/service/agent_run_impl.go 文件中,如下:

func (c *runImpl) AgentRun(ctx context.Context, arm *entity.AgentRunMeta) (*schema.StreamReader[*entity.AgentRunResponse], error) {  // 新建一个容量 20 的双向管道  sr, sw := schema.Pipe[*entity.AgentRunResponse](20)  // 将 StreamWriter 传入 c.run 方法  safego.Go(ctx, func() {    defer sw.Close()    _ = c.run(ctx, sw, rtDependence)  })  // 将 StreamReader 返回上层,供应用层读取  return sr, nil}

这里的逻辑比较简单,主要是通过 schema.Pipe 创建一个双向管道,用于上层读取消息和下层写入消息,然后使用 safego.Go 调用 c.run() 方法。safego.Go 是对原生的 go 的一层包装,它的主要作用是创建一个 goroutine 来执行传入的函数,并在 goroutine 中添加了错误恢复机制,确保在 goroutine 中发生的 panic 会被捕获和处理,而不会导致整个程序崩溃:

func Go(ctx context.Context, fn func()) {  go func() {    defer goutil.Recovery(ctx)    fn()  }()}

调用的 c.run() 方法如下:

func (c *runImpl) run(ctx context.Context, sw *schema.StreamWriter[*entity.AgentRunResponse], rtDependence *runtimeDependence) (err error) {  // 获取智能体信息  agentInfo, err := c.handlerAgent(ctx, rtDependence)  rtDependence.agentInfo = agentInfo    // 获取最近 N 轮历史对话  history, err := c.handlerHistory(ctx, rtDependence)    // 创建一条新的运行记录 `run_record`  runRecord, err := c.createRunRecord(ctx, sw, rtDependence)  rtDependence.runID = runRecord.ID  // 创建一条新的用户消息 `message`  input, err := c.handlerInput(ctx, sw, rtDependence)  rtDependence.questionMsgID = input.ID    // 流式执行智能体  err = c.handlerStreamExecute(ctx, sw, history, input, rtDependence)  return}

这里仍然是一系列数据库的操作,包括查询智能体信息、获取历史对话、创建运行记录、创建用户消息等,不过需要注意的是,这些基本上都是对其他领域的操作,Coze Studio 在这里引入了跨领域防腐层,防止领域之间的直接依赖。至此,整个对话接口的流程图如下所示:

到这里,终于走到了智能体会话接口的最末端,通过调用 crossagent 跨领域服务,执行逻辑从 agent_run 领域进入 single_agent 领域,正式开始执行智能体。

小结

今天,我们学习了 Coze Studio 智能体会话接口的完整后端处理流程,其代码严格遵循 DDD 的分层原则,从接口层、到应用层、到领域层、最后通过跨领域防腐层,将最终的执行任务交给了 single_agent 领域。至此,准备工作已经就绪,接下来,我们将进入 single_agent 领域,揭开智能体内部执行逻辑的神秘面纱。

欢迎关注

如果这篇文章对您有所帮助,欢迎关注我的同名公众号:日习一技,每天学一点新技术

我会每天花一个小时,记录下我学习的点点滴滴。内容包括但不限于:

目标是让大家用5分钟读完就能有所收获,不需要太费劲,但却可以轻松获取一些干货。不管你是技术新手还是老鸟,欢迎给我提建议,如果有想学习的技术,也欢迎交流!

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

Coze Studio 智能体 后端架构 DDD 流式传输
相关文章