dbaplus社群 2024年11月28日
一套万能的异步处理方案(典藏版)
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文介绍了一套通用的异步处理SDK,遵循开闭原则,可轻松实现异步处理,保证方法有效执行且不影响主流程,具有多种优点及详细的设计原理、组件、数据库脚本等内容。

🎯设计通用异步处理SDK,实现各种异步处理

💪保证方法有效执行且不影响主流程,数据不丢失

📋包含无侵入设计、独立组件等优点及原理

📄涉及多种组件、数据库脚本、配置信息

三火哥 2024-11-28 07:16 上海

设计了一套通用的异步处理SDK,可以很轻松地实现各种异步处理。


前言


良好的系统设计必须要做到开闭原则,随着业务的不断迭代更新,核心代码也会被不断改动,出错的概率也会大大增加。但是大部分增加的功能都是在扩展原有的功能,既要保证性能又要保证质量,我们往往都会使用异步线程池来处理,然而却增加了很多不确定性因素。由此我设计了一套通用的异步处理SDK,可以很轻松的实现各种异步处理


目的


通过异步处理不仅能够保证方法能够得到有效的执行而且不影响主流程


更重要的是各种兜底方法保证数据不丢失,从而达到最终一致性\color{red}最终一致性


优点


无侵入设计,独立数据库,独立定时任务,独立消息队列,独立人工执行界面(统一登录认证)


使用spring事务事件机制,即使异步策略解析失败也不会影响业务


如果你的方法正在运行事务,会等事务提交后或回滚后再处理事件


就算事务提交了,异步策略解析失败了,我们还有兜底方案执行(除非数据库有问题,消息队列有问题,方法有bug)


原理


容器初始化bean完成后遍历所有方法,把有@AsyncExec注解的方法缓存起来


方法运行时通过AOP切面发布事件


事务事件监听处理异步执行策略


@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMPLETION)

fallbackExecution=true 没有事务正在运行,依然处理事件 

TransactionPhase.AFTER_COMPLETION 事务提交后和事务回滚后都处理事件


组件


设计模式



流程图



数据库脚本


sql 代码解读复制代码CREATE TABLE `async_scene` (  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',  `application_name` varchar(100) NOT NULL DEFAULT '' COMMENT '应用名称',  `method_sign` varchar(50) NOT NULL DEFAULT '' COMMENT '方法签名',  `scene_name` varchar(200) NOT NULL DEFAULT '' COMMENT '业务场景描述',  `async_type` varchar(50) NOT NULL DEFAULT '' COMMENT '异步策略类型',  `queue_name` varchar(200) NOT NULL DEFAULT '' COMMENT '队列名称',  `theme_value` varchar(100) NOT NULL DEFAULT '' COMMENT '消费主题',  `exec_count` int NOT NULL DEFAULT '0' COMMENT '失败重试次数',  `exec_deleted` int NOT NULL DEFAULT '0' COMMENT '执行后是否删除',  `async_version` varchar(50) NOT NULL DEFAULT '' COMMENT '组件版本号',  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',  `cdc_crt_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '记录新增时间',  `cdc_upd_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录修改时间',  PRIMARY KEY (`id`) USING BTREE,  UNIQUE KEY `uk_application_sign` (`application_name`,`method_sign`) USING BTREE,  KEY `idx_cdc_upd_time` (`cdc_upd_time`)) ENGINE=InnoDB COMMENT='异步场景表';
CREATE TABLE `async_req` ( `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID', `application_name` varchar(100) NOT NULL DEFAULT '' COMMENT '应用名称', `sign` varchar(50) NOT NULL DEFAULT '' COMMENT '方法签名', `class_name` varchar(200) NOT NULL DEFAULT '' COMMENT '全路径类名称', `method_name` varchar(100) NOT NULL DEFAULT '' COMMENT '方法名称', `async_type` varchar(50) NOT NULL DEFAULT '' COMMENT '异步策略类型', `exec_status` tinyint NOT NULL DEFAULT '0' COMMENT '执行状态 0:初始化 1:执行失败 2:执行成功', `exec_count` int NOT NULL DEFAULT '0' COMMENT '执行次数', `param_json` longtext COMMENT '请求参数', `remark` varchar(200) NOT NULL DEFAULT '' COMMENT '业务描述', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (`id`) USING BTREE, KEY `idx_applocation_name` (`application_name`) USING BTREE, KEY `idx_exec_status` (`exec_status`) USING BTREE) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='异步处理请求';
CREATE TABLE `async_log` ( `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID', `async_id` bigint NOT NULL DEFAULT '0' COMMENT '异步请求ID', `error_data` longtext COMMENT '执行错误信息', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', PRIMARY KEY (`id`) USING BTREE, KEY `idx_async_id` (`async_id`) USING BTREE) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='异步处理日志';

异步策略



安全级别



执行状态



流程图



apollo 配置


xml 代码解读复制代码# 开关:默认关闭async.enabled=true
# 应用名称spring.application.name=xxx
# 数据源 druid spring.datasource.driver-class-name=com.mysql.jdbc.Driverspring.datasource.url=jdbc:mysql://127.0.0.1:3306/fc_async?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&useSSL=false&allowMultiQueries=true&rewriteBatchedStatements=truespring.datasource.username=userspring.datasource.password=xxxxspring.datasource.filters=configspring.datasource.connectionProperties=config.decrypt=true;config.decrypt.key=yyy#静态地址spring.resources.add-mappings=truespring.resources.static-locations=classpath:/static/

# 以下配置都有默认值# 核心线程数async.executor.thread.corePoolSize=10# 最大线程数async.executor.thread.maxPoolSize=50# 队列容量async.executor.thread.queueCapacity=10000# 活跃时间async.executor.thread.keepAliveSeconds=600# 执行成功是否删除记录:默认删除async.exec.deleted=true# 自定义队列名称前缀:默认应用名称async.topic=${spring.application.name}# 重试执行次数:默认5次async.exec.count=5
# 重试最大查询数量async.retry.limit=100
# 补偿最大查询数量async.comp.limit=100
# 登录拦截:默认falseasync.login=false


用法


txt 代码解读复制代码1,异步开关scm.async.enabled=true
2,在需要异步执行的方法加注解 (必须是spring代理方法)@AsyncExec(type = AsyncExecEnum.SAVE_ASYNC, remark = "数据字典")
3,人工处理地址http://localhost:8004/async/index.html

注意



txt 代码解读复制代码1,应用名称spring.application.name
2,队列名称${async.topic:${spring.application.name}}_async_queue
3,自己业务要做幂等
4,一个应用公用一个队列自产自消
5,定时任务异步重试定时任务(2分钟重试一次,可配置重试次数) 异步补偿定时任务(一小时补偿一次,创建时间在一小时之前的)


效果展示


github地址:

https://github.com/xiongyanokok/fc-async.git




作者丨三火哥

来源丨网址:https://juejin.cn/post/7266087843239084090

dbaplus社群欢迎广大技术人员投稿,投稿邮箱:editor@dbaplus.cn

跳转微信打开

Fish AI Reader

Fish AI Reader

AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

FishAI

FishAI

鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

联系邮箱 441953276@qq.com

相关标签

异步处理SDK 系统性能 设计原理 组件
相关文章