# scaffold 项目之动态线程池及异步任务提交
# 简介
在介绍这个功能之前,先介绍为什么要做这个封装这个组件 scaffold-spring-boot-starter-biz-dynamic-pool
在项目中,不可避免的会使用到线程,那么就涉及到频繁的创建销毁线程,这个时候可以用动态池,但是我们直接配置线程池的话,核心数该如何给?最大给多少?拒绝策略如何选择?队列最大多少? 等一系列的问题,能想到的解决方案就是 可以动态调整这些参数
在项目中,我们的异步执行的逻辑也很多,不可能都放在一个线程池中,所以我们可以给异步任务分类型, 每个任务类型放在同类型线程池中,比如:
- 支付通知异步任务 (支付通知异步任务线程池)
- 上传文件异步任务 (上传文件异步线程池)
- ...
# 开始使用
# 引入模块
<dependency> | |
<groupId>com.tz.boot</groupId> | |
<artifactId>scaffold-spring-boot-starter-biz-dynamic-pool</artifactId> | |
</dependency> |
# 配置
--- #################### 动态线程池相关配置 #################### | |
spring: | |
dynamic: | |
tp: | |
enabled: true # 是否启用 dynamictp,默认 true | |
enabledCollect: true # 是否开启监控指标采集,默认 true | |
collectorTypes: micrometer,logging,es # 监控数据采集器类型(logging | micrometer | internal_logging | JMX),默认 micrometer | |
monitorInterval: 10 # 监控时间间隔(报警检测、指标采集),默认 5s |
# 目录结构

# 表结构
SET NAMES utf8mb4; | |
SET FOREIGN_KEY_CHECKS = 0; | |
-- ---------------------------- | |
-- Table structure for infra_dynamic_pool_async_message | |
-- ---------------------------- | |
DROP TABLE IF EXISTS infra_dynamic_pool_async_message; | |
CREATE TABLE infra_dynamic_pool_async_message( | |
id BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '主键' , | |
keyword VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci COMMENT '关键值;关键值' , | |
username VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci COMMENT '发送任务的用户名;发送任务的用户名' , | |
task_type VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci COMMENT '任务类型;任务类型' , | |
task_name VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci COMMENT '任务名称;任务名称' , | |
execute_id BIGINT(20) COMMENT '执行的线程池的主键;执行的线程池的主键' , | |
context_info text CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci COMMENT '消息内容;执行任务的响应消息' , | |
error_detail_info text CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci COMMENT '消息内容;执行任务的响应消息' , | |
execute_status INT COMMENT '执行情况;执行情况; 0、执行中。1、执行成功。-1、执行失败。-2、执行错误' , | |
cpu_usage_rate VARCHAR(255) COMMENT 'cpu使用率;单位:%' , | |
cpu_usage_time INT COMMENT 'cpu使用时间;cpu使用时间(毫秒)' , | |
execute_time INT COMMENT '任务执行时间;执行花费的时间(毫秒)' , | |
thread_name VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci COMMENT '线程名称;线程名称' , | |
creator VARCHAR(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci COMMENT '创建者' , | |
create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' , | |
updater VARCHAR(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci COMMENT '更新者' , | |
update_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间' , | |
deleted BIT(1) DEFAULT b'0' COMMENT '是否删除' , | |
tenant_id BIGINT(20) NOT NULL DEFAULT 0 COMMENT '租户编号' , | |
PRIMARY KEY (id) | |
) COMMENT = '动态线程池异步任务消息表'; | |
-- ---------------------------- | |
-- Table structure for infra_dynamic_pool_notice_platform | |
-- ---------------------------- | |
DROP TABLE IF EXISTS `infra_dynamic_pool_notice_platform`; | |
CREATE TABLE `infra_dynamic_pool_notice_platform` ( | |
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增编号', | |
`platform` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '通知平台;通知平台,如:钉钉、邮箱等', | |
`platform_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '平台id', | |
`url_key` varchar(648) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT 'url的令牌', | |
`secret` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '密钥', | |
`webhook` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT 'web钩子', | |
`timeout` int(11) DEFAULT NULL COMMENT '请求超时时间;http请求超时时间,单位(毫秒)', | |
`receivers` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '接收人;可以多个,用”,“分割多个, 默认是全部all', | |
`creator` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '创建者', | |
`create_time` datetime(0) DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', | |
`updater` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '更新者', | |
`update_time` datetime(0) DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间', | |
`deleted` bit(1) DEFAULT b'0' COMMENT '是否删除', | |
`tenant_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '租户编号', | |
PRIMARY KEY (`id`) USING BTREE | |
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = latin1 COLLATE = latin1_swedish_ci COMMENT = '动态池通知平台' ROW_FORMAT = Dynamic; | |
-- ---------------------------- | |
-- Records of infra_dynamic_pool_notice_platform | |
-- ---------------------------- | |
INSERT INTO `infra_dynamic_pool_notice_platform` VALUES (1, 'ding', '1', 'b406c96fe0c3f5161dc4a6013cf0609ac8f6e1fd25179b0d0fc6be0c915b8c8c', 'SEC8a790ab7731d09f260e7d601051ec47fee5e84decf1a4296bd6473df7f7b5f17', 'https://oapi.dingtalk.com/robot/send?access_token=b406c96fe0c3f5161dc4a6013cf0609ac8f6e1fd25179b0d0fc6be0c915b8c8c', 3000, '13232730191', NULL, '2024-11-28 16:56:35', NULL, '2024-11-28 16:56:35', b'0', 0); | |
-- ---------------------------- | |
-- Table structure for infra_dynamic_pool_notice_type | |
-- ---------------------------- | |
DROP TABLE IF EXISTS `infra_dynamic_pool_notice_type`; | |
CREATE TABLE `infra_dynamic_pool_notice_type` ( | |
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增编号', | |
`type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '通知报警类型;配置更改通知、容量阈值通知等', | |
`enabled` bit(1) NOT NULL DEFAULT b'1' COMMENT '是否启用;是否启用通知', | |
`platform_ids` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '通知平台id;通知平台id,多个以’,‘分割', | |
`threshold` int(11) DEFAULT 70 COMMENT '报警阈值;报警阈值', | |
`interval` int(11) DEFAULT 120 COMMENT '报警间隔;报警间隔,时间单位(s)', | |
`receivers` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '接收者;接收者,这里配置会覆盖通知平台的配置', | |
`executor_id` bigint(20) DEFAULT NULL COMMENT '线程池的id;线程池的id,指定绑定的通知报警类型', | |
`creator` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '创建者', | |
`create_time` datetime(0) DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', | |
`updater` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '更新者', | |
`update_time` datetime(0) DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间', | |
`deleted` bit(1) DEFAULT b'0' COMMENT '是否删除', | |
`tenant_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '租户编号', | |
PRIMARY KEY (`id`) USING BTREE | |
) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = latin1 COLLATE = latin1_swedish_ci COMMENT = '动态池通知报警类型' ROW_FORMAT = Dynamic; | |
-- ---------------------------- | |
-- Records of infra_dynamic_pool_notice_type | |
-- ---------------------------- | |
INSERT INTO `infra_dynamic_pool_notice_type` VALUES (1, 'change', b'1', '[1]', 70, 120, NULL, 1, 'Tz', '2024-11-28 17:11:37', NULL, '2024-11-28 17:11:37', b'0', 0); | |
INSERT INTO `infra_dynamic_pool_notice_type` VALUES (2, 'capacity', b'1', '[1]', 80, 120, NULL, 1, 'Tz', '2024-11-28 17:12:12', NULL, '2024-11-28 17:12:12', b'0', 0); | |
-- ---------------------------- | |
-- Table structure for infra_dynamic_pool_thread_executor | |
-- ---------------------------- | |
DROP TABLE IF EXISTS `infra_dynamic_pool_thread_executor`; | |
CREATE TABLE `infra_dynamic_pool_thread_executor` ( | |
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '自增编号', | |
`thread_pool_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '线程池名称;线程池名称', | |
`thread_pool_alias_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL COMMENT '线程池别名;线程池别名', | |
`executor_type` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '线程池类型;线程池类型 common、eager、ordered、scheduled、priority,默认 common', | |
`core_pool_size` int(11) DEFAULT 1 COMMENT '核心线程数;核心线程数,默认1', | |
`maximum_pool_size` int(11) DEFAULT NULL COMMENT '最大线程数;最大线程数,默认cpu核数', | |
`queue_capacity` int(11) DEFAULT 1024 COMMENT '队列容量;队列容量,默认1024', | |
`queue_type` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT 'VariableLinkedBlockingQueue' COMMENT '任务队列;任务队列,默认VariableLinkedBlockingQueue', | |
`rejected_handler_type` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT 'CallerRunsPolicy' COMMENT '拒绝策略;拒绝策略,默认AbortPolicy', | |
`keep_alive_time` int(11) DEFAULT 60 COMMENT '空闲线程等待超时时间;空闲线程等待超时时间,默认60', | |
`thread_name_prefix` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '线程名前缀;线程名前缀,默认dtp', | |
`allow_core_thread_time_out` bit(1) DEFAULT b'0' COMMENT '是否允许核心线程池超时;是否允许核心线程池超时', | |
`wait_for_tasks_to_complete_on_shutdown` bit(1) DEFAULT b'1' COMMENT '是否优雅关闭线程池;是否优雅关闭线程池,即等待线程池中的线程执行完毕', | |
`await_termination_seconds` int(11) DEFAULT 3 COMMENT '优雅关闭线程池时,阻塞等待线程池中任务执行时间;优雅关闭线程池时,阻塞等待线程池中任务执行时间,默认3,单位(s)', | |
`pre_start_all_core_threads` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT '\0' COMMENT '是否预热所有核心线程;是否预热所有核心线程,默认false', | |
`run_timeout` int(11) DEFAULT 0 COMMENT '任务执行超时阈值;任务执行超时阈值,单位(ms),默认0(不统计)', | |
`queue_timeout` int(11) DEFAULT NULL COMMENT '任务在队列等待超时阈值;任务在队列等待超时阈值,单位(ms),默认0(不统计)', | |
`task_wrapper_names` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '任务包装器名称;任务包装器名称', | |
`notify_enabled` bit(1) DEFAULT b'1' COMMENT '是否开启报警;是否开启报警', | |
`platform_ids` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '报警平台id;报警平台id', | |
`creator` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '创建者', | |
`create_time` datetime(0) DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', | |
`updater` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '更新者', | |
`update_time` datetime(0) DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间', | |
`deleted` bit(1) DEFAULT b'0' COMMENT '是否删除', | |
`tenant_id` bigint(20) NOT NULL DEFAULT 0 COMMENT '租户编号', | |
`large_classification` int(11) DEFAULT NULL COMMENT '大分类', | |
`litter_classification` int(11) DEFAULT NULL COMMENT '小分类', | |
PRIMARY KEY (`id`) USING BTREE | |
) ENGINE = InnoDB AUTO_INCREMENT = 9 CHARACTER SET = sjis COLLATE = sjis_japanese_ci COMMENT = '动态池线程池配置' ROW_FORMAT = Dynamic; | |
-- ---------------------------- | |
-- Records of infra_dynamic_pool_thread_executor | |
-- ---------------------------- | |
INSERT INTO `infra_dynamic_pool_thread_executor` VALUES (1, 'pay_module', '支付模块线程池', 'ThreadPoolTaskExecutor', 8, 30, 1024, 'VariableLinkedBlockingQueue', 'CallerRunsPolicy', 60, 'pay-notify-task-', b'0', b'1', 3, '\0', 0, NULL, '[\"ttl\", \"mdc\"]', b'1', '[1,2,3,4]', NULL, '2024-11-26 20:23:46', NULL, '2024-11-26 20:23:46', b'0', 0, 2000, 2002); | |
INSERT INTO `infra_dynamic_pool_thread_executor` VALUES (2, 'tomcatTp', 'tomcat线程池', 'common', 10, 200, 1024, 'VariableLinkedBlockingQueue', 'CallerRunsPolicy', 60, 'system-tomcat-', b'0', b'1', 3, '\0', 0, NULL, '[\"ttl\", \"mdc\", \"custom\"]', b'1', '[1,2,3,4]', NULL, '2024-12-12 10:40:26', NULL, '2024-12-12 10:40:26', b'0', 0, 1000, 1001); | |
INSERT INTO `infra_dynamic_pool_thread_executor` VALUES (3, 'system_scheduled_task_pool', '系统定时任务线程池', 'ScheduledThreadPoolExecutor', 8, 100, 1024, 'DelayQueue', 'CallerRunsPolicy', 60, 'scheduled-module-', b'0', b'1', 3, '\0', 0, NULL, '[\"ttl\", \"mdc\"]', b'1', '[1,2,3,4]', NULL, '2024-12-18 14:39:10', NULL, '2024-12-18 14:39:10', b'0', 0, 3000, 3001); | |
INSERT INTO `infra_dynamic_pool_thread_executor` VALUES (4, 'common_business_task_pool', '通用业务任务线程池', 'common', 8, 100, 1024, 'VariableLinkedBlockingQueue', 'CallerRunsPolicy', 60, 'common-business-', b'0', b'1', 3, '', 0, NULL, '[\"ttl\", \"mdc\"]', b'1', '[1,2,3,4]', NULL, '2024-12-12 10:40:26', NULL, '2024-12-12 10:40:26', b'0', 0, 2000, 2001); | |
INSERT INTO `infra_dynamic_pool_thread_executor` VALUES (5, 'common_scheduled_task_pool', '通用定时任务线程池', 'scheduled', 8, 50, 1024, 'VariableLinkedBlockingQueue', 'CallerRunsPolicy', 60, 'common-scheduled-', b'0', b'1', 3, '', 0, NULL, '[\"ttl\", \"mdc\"]', b'1', '[1,2,3,4]', NULL, '2024-12-12 10:40:26', NULL, '2024-12-12 10:40:26', b'0', 0, 3000, 3002); | |
SET FOREIGN_KEY_CHECKS = 1; |
infra_dynamic_pool_async_message: 动态线程池异步任务消息表,记录提交的异步任务的执行情况
infra_dynamic_pool_notice_platform: 动态池通知平台, 动态线程池预警情况,比如线程池中的任务到达一定的阈值就会通知
infra_dynamic_pool_notice_type: 动态池通知报警类型,预警类型配置
infra_dynamic_pool_thread_executor: 动态池线程池配置,初始化线程池的相关配置,或者后续新增新的线程池
# 代码实现
# 配置类(扩展原本的 org.dromara.dynamictp 配置)
package com.tz.scaffold.framework.dynamicpool.config; | |
/** | |
* <p> Project: scaffold - DynamicPoolAutoConfiguration </p> | |
* | |
* 动态线程池自动装配 | |
* @author Tz | |
* @version 1.0.0 | |
* @date 2024/11/19 17:58 | |
* @since 1.0.0 | |
*/ | |
@AutoConfiguration | |
@EnableScheduling | |
@EnableDynamicTp | |
@EnableAsync | |
public class DynamicPoolAutoConfiguration { | |
@Bean | |
public DynamicThreadPoolFrameworkUtils dynamicThreadPoolFrameworkUtils(DynamicThreadPoolApi dynamicThreadPoolApi | |
, AsyncTaskExecutorService asyncTaskExecutorService | |
, LockTemplate lockTemplate | |
, StringRedisTemplate stringRedisTemplate | |
, WebSocketSenderApi webSocketSenderApi) { | |
DynamicThreadPoolFrameworkUtils.init(dynamicThreadPoolApi, asyncTaskExecutorService, lockTemplate, stringRedisTemplate, webSocketSenderApi); | |
return new DynamicThreadPoolFrameworkUtils(); | |
} | |
@Bean | |
public AsyncTaskExecutorService asyncTaskExecutorService() { | |
return new AsyncTaskExecutorServiceImpl(); | |
} | |
} |
# 核心处理类
package com.tz.scaffold.framework.dynamicpool.core.util; | |
/** | |
* <p> Project: scaffold - DynamicThreadPoolFrameworkUtils </p> | |
* | |
* 动态线程池管理工具 参考 {@link org.dromara.dynamictp.core.spring.DtpBeanDefinitionRegistrar} | |
* @author Tz | |
* @version 1.0.0 | |
* @date 2024/11/29 20:14 | |
* @since 1.0.0 | |
*/ | |
@Slf4j | |
public class DynamicThreadPoolFrameworkUtils { | |
private static DynamicThreadPoolApi DYNAMIC_THREAD_POOL_API; | |
private static AsyncTaskExecutorService ASYNC_TASK_EXECUTOR_SERVICE; | |
public static LockTemplate LOCK_TEMPLATE; | |
private static final String REGISTER_SOURCE = "beanPostProcessor"; | |
private static DtpProperties DTP_PROPERTIES = DtpProperties.getInstance(); | |
public static StringRedisTemplate STRING_REDIS_TEMPLATE; | |
private static WebSocketSenderApi WEB_SOCKET_SENDER_API; | |
/** | |
* 初始化动态线程池 | |
* @param dynamicThreadPoolApi 动态线程池配置查询接口 | |
* @param asyncTaskExecutorService 异步任务执行器 | |
* @param lockTemplate 分布式锁操作 | |
* @param stringRedisTemplate 缓存操作 | |
*/ | |
public static void init(DynamicThreadPoolApi dynamicThreadPoolApi | |
, AsyncTaskExecutorService asyncTaskExecutorService | |
, LockTemplate lockTemplate | |
, StringRedisTemplate stringRedisTemplate | |
, WebSocketSenderApi webSocketSenderApi) { | |
DynamicThreadPoolFrameworkUtils.DYNAMIC_THREAD_POOL_API = dynamicThreadPoolApi; | |
DynamicThreadPoolFrameworkUtils.ASYNC_TASK_EXECUTOR_SERVICE = asyncTaskExecutorService; | |
DynamicThreadPoolFrameworkUtils.LOCK_TEMPLATE = lockTemplate; | |
DynamicThreadPoolFrameworkUtils.STRING_REDIS_TEMPLATE = stringRedisTemplate; | |
DynamicThreadPoolFrameworkUtils.WEB_SOCKET_SENDER_API = webSocketSenderApi; | |
DtpProperties dtpProperties = getDtpProperties(); | |
registerBeanDefinitions(dtpProperties); | |
} | |
/** | |
* 刷新线程池情况 | |
*/ | |
public static void refresh() { | |
DtpProperties dtpProperties = getDtpProperties(); | |
DtpRegistry.refresh(dtpProperties); | |
} | |
/** | |
* 获取线程初始化配置 | |
* @return 线程初始化配置 | |
*/ | |
private static DtpProperties getDtpProperties() { | |
DynamicThreadPoolProperties initConfig = DYNAMIC_THREAD_POOL_API.getInitConfig(); | |
DtpProperties dtpProperties = DtpProperties.getInstance(); | |
List<DtpExecutorProps> executors = dtpProperties.getExecutors(); | |
// 处理线程池配置 | |
if (CollectionUtils.isAnyEmpty(executors)) { | |
executors = new ArrayList<>(); | |
} | |
for (ThreadExecutorRespDTO executor : initConfig.getExecutors()) { | |
DtpExecutorProps dtpExecutorProps = new DtpExecutorProps(); | |
// 判断大分类是不是系统线程池 | |
if (DynamicThreadPoolCategorizeTypeEnum.valueOf(executor.getLargeClassification()) | |
.equals(DynamicThreadPoolCategorizeTypeEnum.SYSTEM_POOL)) { | |
// 判断小分类是不是第三方框架额外集成的情况 | |
if (DynamicThreadPoolCategorizeTypeEnum.isExtFrameworkPool(executor.getLitterClassification())) { | |
TpExecutorProps tpExecutorProps = new TpExecutorProps(); | |
BeanUtil.copyProperties(executor, tpExecutorProps); | |
switch (DynamicThreadPoolCategorizeTypeEnum.statusOf(executor.getLitterClassification())) { | |
case TOMCAT_POOL: { | |
dtpProperties.setTomcatTp(tpExecutorProps); | |
break; | |
} | |
case JETTY_POOL: { | |
dtpProperties.setJettyTp(tpExecutorProps); | |
break; | |
} | |
case DUBBO_POOL: { | |
if (CollectionUtils.isAnyEmpty(dtpProperties.getDubboTp())) { | |
List<TpExecutorProps> tpExecutorPropsList = new ArrayList<>(1); | |
tpExecutorPropsList.add(tpExecutorProps); | |
dtpProperties.setDubboTp(tpExecutorPropsList); | |
} else { | |
dtpProperties.getDubboTp().add(tpExecutorProps); | |
} | |
break; | |
} | |
case ROCKETMQ_POOL: { | |
if (CollectionUtils.isAnyEmpty(dtpProperties.getRocketMqTp())) { | |
List<TpExecutorProps> tpExecutorPropsList = new ArrayList<>(1); | |
tpExecutorPropsList.add(tpExecutorProps); | |
dtpProperties.setRocketMqTp(tpExecutorPropsList); | |
} else { | |
dtpProperties.getRocketMqTp().add(tpExecutorProps); | |
} | |
break; | |
} | |
case HYSTRIX_POOL: { | |
if (CollectionUtils.isAnyEmpty(dtpProperties.getHystrixTp())) { | |
List<TpExecutorProps> tpExecutorPropsList = new ArrayList<>(1); | |
tpExecutorPropsList.add(tpExecutorProps); | |
dtpProperties.setHystrixTp(tpExecutorPropsList); | |
} else { | |
dtpProperties.getHystrixTp().add(tpExecutorProps); | |
} | |
break; | |
} | |
case OKHTTP3_POOL: { | |
if (CollectionUtils.isAnyEmpty(dtpProperties.getOkhttp3Tp())) { | |
List<TpExecutorProps> tpExecutorPropsList = new ArrayList<>(1); | |
tpExecutorPropsList.add(tpExecutorProps); | |
dtpProperties.setOkhttp3Tp(tpExecutorPropsList); | |
} else { | |
dtpProperties.getOkhttp3Tp().add(tpExecutorProps); | |
} | |
break; | |
} | |
case GRPC_POOL: { | |
if (CollectionUtils.isAnyEmpty(dtpProperties.getGrpcTp())) { | |
List<TpExecutorProps> tpExecutorPropsList = new ArrayList<>(1); | |
tpExecutorPropsList.add(tpExecutorProps); | |
dtpProperties.setGrpcTp(tpExecutorPropsList); | |
} else { | |
dtpProperties.getGrpcTp().add(tpExecutorProps); | |
} | |
break; | |
} | |
case BRPC_POOL: { | |
if (CollectionUtils.isAnyEmpty(dtpProperties.getBrpcTp())) { | |
List<TpExecutorProps> tpExecutorPropsList = new ArrayList<>(1); | |
tpExecutorPropsList.add(tpExecutorProps); | |
dtpProperties.setBrpcTp(tpExecutorPropsList); | |
} else { | |
dtpProperties.getBrpcTp().add(tpExecutorProps); | |
} | |
break; | |
} | |
case MOTAN_POOL: { | |
if (CollectionUtils.isAnyEmpty(dtpProperties.getMotanTp())) { | |
List<TpExecutorProps> tpExecutorPropsList = new ArrayList<>(1); | |
tpExecutorPropsList.add(tpExecutorProps); | |
dtpProperties.setMotanTp(tpExecutorPropsList); | |
} else { | |
dtpProperties.getMotanTp().add(tpExecutorProps); | |
} | |
break; | |
} | |
case TARS_POOL: { | |
if (CollectionUtils.isAnyEmpty(dtpProperties.getTarsTp())) { | |
List<TpExecutorProps> tpExecutorPropsList = new ArrayList<>(1); | |
tpExecutorPropsList.add(tpExecutorProps); | |
dtpProperties.setTarsTp(tpExecutorPropsList); | |
} else { | |
dtpProperties.getTarsTp().add(tpExecutorProps); | |
} | |
break; | |
} | |
case SOFARPC_POOL: { | |
if (CollectionUtils.isAnyEmpty(dtpProperties.getSofaTp())) { | |
List<TpExecutorProps> tpExecutorPropsList = new ArrayList<>(1); | |
tpExecutorPropsList.add(tpExecutorProps); | |
dtpProperties.setSofaTp(tpExecutorPropsList); | |
} else { | |
dtpProperties.getSofaTp().add(tpExecutorProps); | |
} | |
break; | |
} | |
default: break; | |
} | |
} | |
continue; | |
} | |
BeanUtil.copyProperties(executor, dtpExecutorProps); | |
executors.add(dtpExecutorProps); | |
} | |
// 处理通知平台 | |
List<NotifyPlatform> platforms = dtpProperties.getPlatforms(); | |
if (CollectionUtils.isAnyEmpty(platforms)) { | |
platforms = new ArrayList<>(); | |
} | |
for (NoticePlatformRespDTO platform : initConfig.getPlatforms()) { | |
NotifyPlatform notifyPlatform = new NotifyPlatform(); | |
BeanUtil.copyProperties(platform, notifyPlatform); | |
platforms.add(notifyPlatform); | |
} | |
dtpProperties.setExecutors(executors); | |
dtpProperties.setPlatforms(platforms); | |
DTP_PROPERTIES = dtpProperties; | |
return dtpProperties; | |
} | |
/** | |
* 根据配置注册动态线程池 | |
* @param dtpProperties 动态线程池配置参数 | |
*/ | |
private static void registerBeanDefinitions(DtpProperties dtpProperties) { | |
BeanDefinitionRegistry beanDefinitionRegistry = (BeanDefinitionRegistry) SpringUtil.getApplicationContext().getAutowireCapableBeanFactory(); | |
val executors = dtpProperties.getExecutors(); | |
if (CollectionUtils.isAnyEmpty(executors)) { | |
log.warn("DynamicTp registrar, no executors are configured."); | |
return; | |
} | |
executors.forEach(e -> { | |
// 如果是特殊线程池 特殊处理 | |
if (DynamicThreadPoolSpecialTypeEnum.isSpecial(e.getExecutorType())) { | |
if (Objects.equals(DynamicThreadPoolSpecialTypeEnum.getClass(e.getExecutorType()), ThreadPoolTaskExecutor.class)) { | |
//ThreadPoolTaskExecutor 类型线程池特殊处理 | |
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); | |
threadPoolTaskExecutor.setCorePoolSize(e.getCorePoolSize()); | |
threadPoolTaskExecutor.setMaxPoolSize(e.getMaximumPoolSize()); | |
threadPoolTaskExecutor.setQueueCapacity(e.getQueueCapacity()); | |
threadPoolTaskExecutor.setThreadNamePrefix(e.getThreadNamePrefix()); | |
threadPoolTaskExecutor.setAwaitTerminationSeconds(e.getAwaitTerminationSeconds()); | |
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(e.isWaitForTasksToCompleteOnShutdown()); | |
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(e.isAllowCoreThreadTimeOut()); | |
threadPoolTaskExecutor.setPrestartAllCoreThreads(e.isPreStartAllCoreThreads()); | |
threadPoolTaskExecutor.setRejectedExecutionHandler(RejectHandlerGetter.buildRejectedHandler(e.getRejectedHandlerType())); | |
threadPoolTaskExecutor.initialize(); | |
// 进行代理 | |
val proxy = newProxy(e.getExecutorType(), threadPoolTaskExecutor.getThreadPoolExecutor()); | |
DtpRegistry.registerExecutor(new ExecutorWrapper(e.getThreadPoolName(), proxy), REGISTER_SOURCE); | |
} else if (Objects.equals(DynamicThreadPoolSpecialTypeEnum.getClass(e.getExecutorType()), ScheduledThreadPoolExecutor.class)) { | |
//ScheduledThreadPoolExecutor 类型线程池特殊处理 | |
ScheduledThreadPoolExecutor scheduledExecutorService = (ScheduledThreadPoolExecutor) ThreadPoolBuilder.newBuilder() | |
.threadPoolName(e.getThreadPoolName()) | |
.corePoolSize(e.getCorePoolSize()) | |
.maximumPoolSize(e.getMaximumPoolSize()) | |
.workQueue(e.getQueueType(), e.getQueueCapacity()) | |
.allowCoreThreadTimeOut(e.isAllowCoreThreadTimeOut()) | |
.keepAliveTime(e.getKeepAliveTime()) | |
.awaitTerminationSeconds(e.getAwaitTerminationSeconds()) | |
.waitForTasksToCompleteOnShutdown(e.isWaitForTasksToCompleteOnShutdown()) | |
.scheduled() | |
.buildCommon(); | |
// 进行代理 | |
Executor proxy = newScheduledTpProxy(e.getThreadPoolName(), scheduledExecutorService); | |
DtpRegistry.registerExecutor(new ExecutorWrapper(e.getThreadPoolName(), proxy), REGISTER_SOURCE); | |
} | |
} else { | |
Class<?> executorTypeClass = null; | |
if (Objects.equals(BusinessThreadPoolExecutorExtend.class, DynamicThreadPoolSpecialTypeEnum.getClass(e.getExecutorType()))) { | |
executorTypeClass = DynamicThreadPoolSpecialTypeEnum.getClass(e.getExecutorType()); | |
} else { | |
executorTypeClass = ExecutorType.getClass(e.getExecutorType()); | |
} | |
Map<String, Object> propertyValues = buildPropertyValues(e); | |
Object[] args = buildConstructorArgs(executorTypeClass, e); | |
SpringBeanHelper.register(beanDefinitionRegistry, e.getThreadPoolName(), executorTypeClass, propertyValues, args); | |
SpringUtil.getBean(e.getThreadPoolName()); | |
} | |
}); | |
} | |
/** | |
* 构建参数 | |
* @param props 线程池配置属性 | |
* @return 构建好的 map 集合 | |
*/ | |
private static Map<String, Object> buildPropertyValues(DtpExecutorProps props) { | |
Map<String, Object> propertyValues = Maps.newHashMap(); | |
propertyValues.put(THREAD_POOL_NAME, props.getThreadPoolName()); | |
propertyValues.put(THREAD_POOL_ALIAS_NAME, props.getThreadPoolAliasName()); | |
propertyValues.put(ALLOW_CORE_THREAD_TIMEOUT, props.isAllowCoreThreadTimeOut()); | |
propertyValues.put(WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN, props.isWaitForTasksToCompleteOnShutdown()); | |
propertyValues.put(AWAIT_TERMINATION_SECONDS, props.getAwaitTerminationSeconds()); | |
propertyValues.put(PRE_START_ALL_CORE_THREADS, props.isPreStartAllCoreThreads()); | |
propertyValues.put(REJECT_HANDLER_TYPE, props.getRejectedHandlerType()); | |
propertyValues.put(REJECT_ENHANCED, props.isRejectEnhanced()); | |
propertyValues.put(RUN_TIMEOUT, props.getRunTimeout()); | |
propertyValues.put(TRY_INTERRUPT_WHEN_TIMEOUT, props.isTryInterrupt()); | |
propertyValues.put(QUEUE_TIMEOUT, props.getQueueTimeout()); | |
val notifyItems = mergeAllNotifyItems(props.getNotifyItems()); | |
propertyValues.put(NOTIFY_ITEMS, notifyItems); | |
propertyValues.put(PLATFORM_IDS, props.getPlatformIds()); | |
propertyValues.put(NOTIFY_ENABLED, props.isNotifyEnabled()); | |
val taskWrappers = TaskWrappers.getInstance().getByNames(props.getTaskWrapperNames()); | |
propertyValues.put(TASK_WRAPPERS, taskWrappers); | |
propertyValues.put(PLUGIN_NAMES, props.getPluginNames()); | |
propertyValues.put(AWARE_NAMES, props.getAwareNames()); | |
return propertyValues; | |
} | |
/** | |
* 构建构造器需要的参数 | |
* @param clazz 构建的类 | |
* @param props 构建参数 | |
* @return 构建后的对象 | |
*/ | |
private static Object[] buildConstructorArgs(Class<?> clazz, DtpExecutorProps props) { | |
BlockingQueue<Runnable> taskQueue; | |
if (clazz.equals(EagerDtpExecutor.class)) { | |
taskQueue = new TaskQueue(props.getQueueCapacity()); | |
} else if (clazz.equals(PriorityDtpExecutor.class)) { | |
taskQueue = new PriorityBlockingQueue<>(props.getQueueCapacity(), PriorityDtpExecutor.getRunnableComparator()); | |
} else { | |
taskQueue = buildLbq(props.getQueueType(), | |
props.getQueueCapacity(), | |
props.isFair(), | |
props.getMaxFreeMemory()); | |
} | |
return new Object[]{ | |
props.getCorePoolSize(), | |
props.getMaximumPoolSize(), | |
props.getKeepAliveTime(), | |
props.getUnit(), | |
taskQueue, | |
new NamedThreadFactory(props.getThreadNamePrefix()), | |
RejectHandlerGetter.buildRejectedHandler(props.getRejectedHandlerType()) | |
}; | |
} | |
/** | |
* 代理 {@link ThreadPoolExecutor} 为 DynamicTP 框架对应的对象 | |
* @param name 线程池名称 | |
* @param originExecutor 被代理的线程池 | |
* @return 代理之后的 DynamicTP 对应的 ThreadPoolExecutor 对象 | |
*/ | |
private static ThreadPoolExecutorProxy newProxy(String name, ThreadPoolExecutor originExecutor) { | |
val proxy = new ThreadPoolExecutorProxy(originExecutor); | |
shutdownGracefulAsync(originExecutor, name, 0); | |
return proxy; | |
} | |
/** | |
* 代理 {@link ScheduledThreadPoolExecutor} 为 DynamicTP 框架对应的对象 | |
* @param name 线程池名称 | |
* @param originExecutor 被代理的线程池 | |
* @return 代理之后的 DynamicTP 对应的 ScheduledThreadPoolExecutor 对象 | |
*/ | |
private static ScheduledThreadPoolExecutorProxy newScheduledTpProxy(String name, ScheduledThreadPoolExecutor originExecutor) { | |
val proxy = new ScheduledThreadPoolExecutorProxy(originExecutor); | |
shutdownGracefulAsync(originExecutor, name, 0); | |
return proxy; | |
} | |
/** | |
* 根据线程池名称获取线程池实例 | |
* @param executorName 线程池名称 | |
* @return 线程池实例 | |
*/ | |
public static Executor getExecutor(String executorName) { | |
return getExecutorWrapper(executorName).getExecutor(); | |
} | |
/** | |
* 根据线程池名称获取线程池包装器 | |
* @param executorName 线程池名称 | |
* @return 线程池包装器 | |
*/ | |
public static ExecutorWrapper getExecutorWrapper(String executorName) { | |
return DtpRegistry.getExecutorWrapper(executorName); | |
} | |
/** | |
* 创建 异步任务 执行记录 | |
* | |
* @param asyncMessage 创建信息 | |
* @return 返回异步任务的 id | |
*/ | |
public static Long createAsyncMessage(AsyncTaskInfo asyncMessage) { | |
AsyncMessageCreateReqDTO reqDTO = BeanUtil.copyProperties(asyncMessage, AsyncMessageCreateReqDTO.class); | |
Long id = DYNAMIC_THREAD_POOL_API.createAsyncMessage(reqDTO); | |
return id; | |
} | |
/** | |
* 更新动态线程池异步任务消息 | |
* | |
* @param asyncMessage 更新信息 | |
*/ | |
public static void updateAsyncMessage(AsyncTaskInfo asyncMessage) { | |
AsyncMessageCreateReqDTO reqDTO = BeanUtil.copyProperties(asyncMessage, AsyncMessageCreateReqDTO.class); | |
DYNAMIC_THREAD_POOL_API.updateAsyncMessage(reqDTO); | |
} | |
/** | |
* 通用的提交执行异步任务 | |
* <p> | |
* 会提交实现 {@link AsyncTaskExecutor#baseExec ()} 方法中的任务内容, | |
* 实际执行的是子类的 {@link AsyncTaskExecutor#taskContent ()} | |
* @param asyncTaskExecutor 需要执行的异步任务内容 | |
* @return AsyncTaskSubmitDTO 异步任务提交信息 | |
*/ | |
public static AsyncTaskSubmitDTO execAsyncTask(AsyncTaskExecutor asyncTaskExecutor) { | |
return ASYNC_TASK_EXECUTOR_SERVICE.execAsyncTask(asyncTaskExecutor); | |
} | |
/** | |
* 发送消息给指定用户 | |
* | |
* @param userType 用户类型 | |
* @param userId 用户编号 | |
* @param messageType 消息类型 | |
* @param messageContent 消息内容,JSON 格式 | |
*/ | |
public static void sendAsyncTaskExecutorResult(Integer userType, Long userId, String messageType, String messageContent) { | |
WEB_SOCKET_SENDER_API.send(userType, userId, messageType, messageContent); | |
} | |
} |
# DynamicThreadPoolApi 接口实现
package com.tz.scaffold.module.infra.api.dynamicpool; | |
import javax.annotation.Resource; | |
import javax.validation.Valid; | |
import java.util.List; | |
/** | |
* <p> Project: scaffold - DynamicThreadPoolApiImpl </p> | |
* | |
* 动态池线程池配置 API 接口实现 | |
* @author Tz | |
* @version 1.0.0 | |
* @date 2024/11/25 10:22 | |
* @since 1.0.0 | |
*/ | |
@Service | |
public class DynamicThreadPoolApiImpl implements DynamicThreadPoolApi { | |
@Resource | |
private ThreadExecutorService threadExecutorService; | |
@Resource | |
private NoticePlatformService noticePlatformService; | |
@Resource | |
private NoticeTypeService noticeTypeService; | |
@Resource | |
private AsyncMessageService asyncMessageService; | |
/** | |
* 获取动态线程池初始化配置的 json 格式数据 | |
* | |
* @return 动态线程池初始化配置的 json 格式数据 | |
*/ | |
@Override | |
public DynamicThreadPoolProperties getInitConfig() { | |
//①查询通知列表 | |
NoticePlatformPageReqVO noticePlatformPageReqVO = new NoticePlatformPageReqVO(); | |
noticePlatformPageReqVO.setPageNo(-1); | |
PageResult<NoticePlatformDO> noticePlatformPage = noticePlatformService.getNoticePlatformPage(noticePlatformPageReqVO); | |
List<NoticePlatformRespDTO> noticePlatformRespDTOS = NoticePlatformConvert.INSTANCE.convertList4(noticePlatformPage.getList()); | |
//②查询线程池配置 | |
ThreadExecutorPageReqVO threadExecutorPageReqVO = new ThreadExecutorPageReqVO(); | |
threadExecutorPageReqVO.setPageNo(-1); | |
PageResult<ThreadExecutorDO> threadExecutorPage = threadExecutorService.getThreadExecutorPage(threadExecutorPageReqVO); | |
List<ThreadExecutorRespDTO> threadExecutorRespDTOS = ThreadExecutorConvert.INSTANCE.convertList4(threadExecutorPage.getList()); | |
if (!CollectionUtils.isAnyEmpty(threadExecutorRespDTOS)) { | |
//③查询通知类型列表 | |
for (ThreadExecutorRespDTO threadExecutorRespDTO : threadExecutorRespDTOS) { | |
NoticeTypePageReqVO noticeTypePageReqVO = new NoticeTypePageReqVO(); | |
// 根据线程池配置查询对应的通知类型列表 | |
noticeTypePageReqVO.setPageNo(-1); | |
noticeTypePageReqVO.setExecutorId(threadExecutorRespDTO.getId()); | |
PageResult<NoticeTypeDO> noticeTypePage = noticeTypeService.getNoticeTypePage(noticeTypePageReqVO); | |
threadExecutorRespDTO.setNotifyItems(ThreadExecutorConvert.INSTANCE.convertList(noticeTypePage.getList())); | |
} | |
} | |
DynamicThreadPoolProperties dynamicThreadPoolProperties = new DynamicThreadPoolProperties(); | |
dynamicThreadPoolProperties.setExecutors(threadExecutorRespDTOS); | |
dynamicThreadPoolProperties.setPlatforms(noticePlatformRespDTOS); | |
return dynamicThreadPoolProperties; | |
} | |
/** | |
* 创建 异步任务 执行记录 | |
* | |
* @param createDTO 创建信息 | |
* @return 返回异步任务的 id | |
*/ | |
@Override | |
public Long createAsyncMessage(@Valid AsyncMessageCreateReqDTO createDTO) { | |
Long asyncMessageId = asyncMessageService.createAsyncMessage(createDTO); | |
return asyncMessageId; | |
} | |
/** | |
* 更新动态线程池异步任务消息 | |
* | |
* @param updateReqDTO 更新信息 | |
*/ | |
@Override | |
public void updateAsyncMessage(@Valid AsyncMessageCreateReqDTO updateReqDTO) { | |
asyncMessageService.updateAsyncMessage(updateReqDTO); | |
} | |
/** | |
* 通过异步任务 keyword(唯一值) 查询异步任务 | |
* | |
* @param keyword 异步任务 keyword(唯一值) | |
* @return 异步任务对象信息 | |
*/ | |
@Override | |
public AsyncMessageRespDTO getAsyncMessage(String keyword) { | |
return null; | |
} | |
} |
# 任务提交信息
package com.tz.scaffold.framework.dynamicpool.core.service; | |
import lombok.Data; | |
/** | |
* <p> Project: scaffold - AsyncTaskMessage </p> | |
* | |
* 异步任务消息的记录 | |
* @author Tz | |
* @version 1.0.0 | |
* @date 2024/12/11 16:09 | |
* @since 1.0.0 | |
*/ | |
@Data | |
public class AsyncTaskInfo { | |
/** | |
* 主键 | |
*/ | |
private Long id; | |
/** | |
* 关键值 | |
*/ | |
private String keyword; | |
/** | |
* 发送任务的用户名 | |
*/ | |
private String username; | |
/** | |
* 任务类型 | |
*/ | |
private String taskType; | |
/** | |
* 任务名称 | |
*/ | |
private String taskName; | |
/** | |
* 执行的线程池的主键 | |
*/ | |
private Long executeId; | |
/** | |
* 执行结果信息 | |
*/ | |
private String contextInfo; | |
/** | |
* 错误明细信息 | |
*/ | |
private String errorDetailInfo; | |
/** | |
* 执行情况;0、执行中。1、执行成功。-1、执行失败。-2、执行错误 | |
*/ | |
private Integer executeStatus; | |
/** | |
* cpu 使用率;单位:% | |
*/ | |
private String cpuUsageRate; | |
/** | |
* cpu 使用时间;cpu 使用时间(毫秒) | |
*/ | |
private Long cpuUsageTime; | |
/** | |
* 执行时间;执行花费的时间(毫秒) | |
*/ | |
private Long executeTime; | |
/** | |
* 线程名称;线程名称 | |
*/ | |
private String threadName; | |
} |
# 任务执行类
package com.tz.scaffold.framework.dynamicpool.core.service; | |
import cn.hutool.core.util.StrUtil; | |
import com.tz.scaffold.framework.dynamicpool.core.dto.AsyncTaskSubmitDTO; | |
import com.tz.scaffold.framework.dynamicpool.core.task.timely.AsyncTaskExecutor; | |
import com.tz.scaffold.framework.dynamicpool.core.util.DynamicThreadPoolFrameworkUtils; | |
import lombok.extern.slf4j.Slf4j; | |
import org.dromara.dynamictp.common.timer.HashedWheelTimer; | |
import org.dromara.dynamictp.common.timer.Timeout; | |
import org.dromara.dynamictp.core.executor.NamedThreadFactory; | |
import org.dromara.dynamictp.core.support.ExecutorAdapter; | |
import org.dromara.dynamictp.core.support.ExecutorWrapper; | |
import java.lang.ref.WeakReference; | |
import java.util.Optional; | |
import java.util.concurrent.CancellationException; | |
import java.util.concurrent.CompletableFuture; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicReference; | |
/** | |
* <p> Project: scaffold - AsyncTaskExecutorServiceImpl </p> | |
* | |
* 异步任务消息接口 实现 | |
* @author Tz | |
* @version 1.0.0 | |
* @date 2024/12/11 16:09 | |
* @since 1.0.0 | |
*/ | |
@Slf4j | |
public class AsyncTaskExecutorServiceImpl implements AsyncTaskExecutorService { | |
/** | |
* 保存的使用时间轮算法的定时器 | |
*/ | |
private static final ConcurrentHashMap<String, HashedWheelTimer> WHEEL_TIMER_CONCURRENT_HASH_MAP = new ConcurrentHashMap<>(); | |
// RateLimiter limiter = RateLimiter.of("taskLimiter", RateLimiterConfig.custom() | |
// .limitForPeriod(100) | |
// .limitRefreshPeriod(Duration.ofSeconds(2)) | |
// // 超时时间(0 表示立即返回结果) | |
// .timeoutDuration(Duration.ZERO) | |
// .build()); | |
@Override | |
public AsyncTaskSubmitDTO execAsyncTask(AsyncTaskExecutor asyncTaskExecutor) { | |
AsyncTaskSubmitDTO asyncTaskSubmitDTO = new AsyncTaskSubmitDTO(); | |
asyncTaskSubmitDTO.setKeyword(asyncTaskExecutor.getKeyword()); | |
asyncTaskSubmitDTO.setIsSubmitTask(true); | |
// 需要校验 | |
// 返回任务的提交情况 | |
String checkAsyncTaskExecutor = asyncTaskExecutor.checkAsyncTaskExecutor(); | |
if (!StrUtil.isBlank(checkAsyncTaskExecutor)) { | |
asyncTaskSubmitDTO.setIsSubmitTask(false); | |
asyncTaskSubmitDTO.setNoSubmitIllustrate(checkAsyncTaskExecutor); | |
return asyncTaskSubmitDTO; | |
} | |
// boolean permitted = limiter.acquirePermission(); | |
// if (!permitted) { | |
// asyncTaskSubmitDTO.setIsSubmitTask(false); | |
// asyncTaskSubmitDTO.setNoSubmitIllustrate ("服务繁忙!请勿频繁提交,请稍后再试!"); | |
// return asyncTaskSubmitDTO; | |
// } | |
// 状态标记 | |
// 任务开始的标记 | |
AtomicBoolean taskStarted = new AtomicBoolean(false); | |
// 任务取消的标记 | |
AtomicBoolean taskCancelled = new AtomicBoolean(false); | |
// 使用 AtomicReference 包装 mainFuture,解决初始化问题 | |
AtomicReference<CompletableFuture<Void>> mainFutureRef = new AtomicReference<>(); | |
// 根据任务类型获取对应的获取目标线程池包装类 | |
ExecutorWrapper executorWrapper = DynamicThreadPoolFrameworkUtils | |
.getExecutorWrapper(asyncTaskExecutor.getAsyncTaskExecutorName()); | |
// 通过目标线程池包装类获取对应线程池 | |
ExecutorAdapter<?> targetExecutor = executorWrapper.getExecutor(); | |
// 根据任务类型获取对应的定时器 | |
HashedWheelTimer hashedWheelTimer = getHashedWheelTimer(asyncTaskExecutor); | |
//--------- 开始处理执行任务 --------- | |
// 阶段一:队列等待超时监控, 相同类型任务加入线程池队列后开始计算等待超时时间 | |
Timeout queueTimeout = hashedWheelTimer.newTimeout(timeout -> { | |
// 确保只在未启动时处理队列超时,并且还未执行过取消(只有任务执行了才能取消) | |
if (!taskStarted.get() && taskCancelled.compareAndSet(false, true)) { | |
log.warn("任务【{}】在【{}】线程池队列中等待超时,超时时间: {} ms", | |
asyncTaskExecutor.getKeyword(), | |
executorWrapper.getThreadPoolName(), | |
asyncTaskExecutor.getTaskQueueTimeout()); | |
// 任务逻辑层面的取消 | |
asyncTaskExecutor.cancelTask(); | |
// 通过 mainFutureRef 获取 mainFuture 并取消 | |
CompletableFuture<Void> future = mainFutureRef.get(); | |
if (future != null) { | |
future.cancel(false); | |
} | |
} | |
}, asyncTaskExecutor.getTaskQueueTimeout(), TimeUnit.MILLISECONDS); | |
// 创建主任务 Future | |
CompletableFuture<Void> mainFuture = CompletableFuture.runAsync(() -> { | |
// 任务真正开始执行,设置状态为开始 | |
taskStarted.set(true); | |
Thread.currentThread().setName(asyncTaskExecutor.getTaskName() + "_" + asyncTaskExecutor.getKeyword()); | |
// 阶段二:执行超时监控 | |
// 这里需要放在内部,因为只有在这个任务真正开始执行才开始计算任务超时时间 | |
Timeout execTimeout = hashedWheelTimer.newTimeout(timeout -> { | |
// 通过引用获取 future | |
CompletableFuture<Void> future = mainFutureRef.get(); | |
if (future != null) { | |
// 双重检查:任务未完成且未被取消 | |
if (!future.isDone() && !future.isCancelled() && taskCancelled.compareAndSet(false, true)) { | |
log.warn("任务【{}】在【{}】线程池中执行超时,超时时间: {} ms", | |
asyncTaskExecutor.getKeyword(), | |
executorWrapper.getThreadPoolName(), | |
asyncTaskExecutor.getTaskExecuteTimeout()); | |
// 任务逻辑层面的取消 | |
asyncTaskExecutor.cancelTask(); | |
// 强制中断 当前执行的异步任务 | |
future.cancel(true); | |
} else { | |
// 任务已正常完成或被其他逻辑取消,无需处理 | |
log.debug("任务【{}】已正常完成或取消,无需重复处理", asyncTaskExecutor.getKeyword()); | |
} | |
} | |
}, asyncTaskExecutor.getTaskExecuteTimeout(), TimeUnit.MILLISECONDS); | |
try { | |
// 实际执行的任务逻辑 | |
asyncTaskExecutor.baseExec(); | |
} finally { | |
// 不管结果如何都取消这个定时任务 | |
execTimeout.cancel(); | |
} | |
}, targetExecutor); | |
// 将 mainFuture 存入引用容器 | |
mainFutureRef.set(mainFuture); | |
// 使用弱引用包装 Timeout | |
TimeoutTracker timeoutTracker = new TimeoutTracker(queueTimeout); | |
// 正常完成清理 | |
mainFuture.whenComplete((res, ex) -> { | |
// 清理所有保存的定时任务 | |
timeoutTracker.cancelAll(); | |
// 设置当前任务的状态为取消 | |
taskCancelled.set(true); | |
}); | |
// 异常处理 | |
mainFuture.exceptionally(ex -> { | |
if (ex instanceof CancellationException) { | |
// 任务未启动出现异常 | |
if (!taskStarted.get()) { | |
log.warn("任务【{}】在队列等待中被取消", asyncTaskExecutor.getKeyword()); | |
} else { | |
log.warn("任务【{}】在执行中被强制中断", asyncTaskExecutor.getKeyword()); | |
} | |
} | |
return null; | |
}); | |
return asyncTaskSubmitDTO; | |
} | |
/** | |
* 根据对应的异步任务执行器获取对应的定时器 | |
* @param asyncTaskExecutor 异步任务执行器 | |
* @return 使用时间轮算法的定时器 | |
*/ | |
private HashedWheelTimer getHashedWheelTimer(AsyncTaskExecutor asyncTaskExecutor) { | |
if (WHEEL_TIMER_CONCURRENT_HASH_MAP.get(asyncTaskExecutor.getAsyncTaskExecutorName()) != null) { | |
return WHEEL_TIMER_CONCURRENT_HASH_MAP.get(asyncTaskExecutor.getAsyncTaskExecutorName()); | |
} else { | |
// 使用时间轮算法(Netty HashedWheelTimer) | |
final HashedWheelTimer hashedWheelTimer = new HashedWheelTimer( | |
new NamedThreadFactory(asyncTaskExecutor.getAsyncTaskExecutorName() + "time-wheel"), | |
// tick duration (ms) | |
100, | |
TimeUnit.MILLISECONDS, | |
// ticks per wheel | |
512 | |
); | |
// 关闭定时器 | |
Runtime.getRuntime().addShutdownHook(new Thread(hashedWheelTimer::stop)); | |
return hashedWheelTimer; | |
} | |
} | |
/** | |
* <p> Project: scaffold - TimeoutTracker </p> | |
* | |
* 使用弱引用包装 Timeout | |
* 若未及时取消 Timeout,会导致: | |
* <li> | |
* 1. 线程池队列中堆积未执行的取消任务 | |
* <li> | |
* 2. 强引用导致无法被 GC 回收 | |
* | |
* @author Tz | |
* @version 1.0.0 | |
* @date 2025/2/10 16:09 | |
* @since 1.0.0 | |
*/ | |
private static class TimeoutTracker { | |
final WeakReference<Timeout> weakReference; | |
TimeoutTracker(Timeout timeout) { | |
this.weakReference = new WeakReference<>(timeout); | |
} | |
void cancelAll() { | |
Optional.ofNullable(weakReference.get()).ifPresent(Timeout::cancel); | |
} | |
} | |
} |
# 任务定义类
package com.tz.scaffold.framework.dynamicpool.core.task.timely; | |
import com.baomidou.lock.LockInfo; | |
import com.sun.management.OperatingSystemMXBean; | |
import com.tz.scaffold.framework.common.util.json.JsonUtils; | |
import com.tz.scaffold.framework.dynamicpool.core.enums.AsyncTaskExecuteStatusEnum; | |
import com.tz.scaffold.framework.dynamicpool.core.service.AsyncTaskInfo; | |
import com.tz.scaffold.framework.dynamicpool.core.util.DynamicThreadPoolFrameworkUtils; | |
import com.tz.scaffold.framework.security.core.LoginUser; | |
import com.tz.scaffold.framework.security.core.util.SecurityFrameworkUtils; | |
import lombok.Data; | |
import lombok.EqualsAndHashCode; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.commons.lang3.StringUtils; | |
import java.io.PrintWriter; | |
import java.io.Serializable; | |
import java.io.StringWriter; | |
import java.lang.management.ManagementFactory; | |
import java.lang.management.ThreadInfo; | |
import java.lang.management.ThreadMXBean; | |
import java.util.Objects; | |
import java.util.UUID; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* <p> Project: scaffold - AsyncTaskExecutor </p> | |
* | |
* 异步任务执行器的抽象类 继承 {@link AsyncTaskInfo} 获得执行情况的状态 | |
* @author Tz | |
* @version 1.0.0 | |
* @date 2024/12/11 16:09 | |
* @since 1.0.0 | |
*/ | |
@Slf4j | |
@Data | |
@EqualsAndHashCode(callSuper = true) | |
public abstract class AsyncTaskExecutor extends AsyncTaskInfo implements Serializable { | |
private static final String CACHE_LOCK_KEY = "AsyncTaskLockKeyCache"; | |
/** | |
* Java 虚拟机的线程系统的管理接口 | |
*/ | |
private final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); | |
/** | |
* 运行 Java 虚拟机的操作系统的管理接口 | |
*/ | |
private final OperatingSystemMXBean osBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); | |
/** | |
* 异步任务执行器名称 | |
*/ | |
private String asyncTaskExecutorName; | |
/** | |
* 异步任务执行器密钥 | |
*/ | |
private static String ASYNC_TASK_EXECUTOR_KEY = "asyncTaskExecutor:%s:%s"; | |
/** | |
* 异步任务锁定密钥 | |
*/ | |
private static String ASYNC_TASK_LOCK_KEY = "asyncTaskLockKey:%s:%s"; | |
/** | |
* 记录线程执行开始时间 | |
*/ | |
private ThreadLocal<Long> startTimeThreadLocal = new ThreadLocal<>(); | |
/** | |
* 线程任务取消情况 | |
*/ | |
private Boolean isCancel = false; | |
/** | |
* 进度百分比 | |
*/ | |
protected static final Integer PERCENTAGE = 100; | |
/** | |
* 完成度 | |
*/ | |
protected Integer completionDegree; | |
/** | |
* 进度步长 | |
*/ | |
protected Integer progressStepSize; | |
/** | |
* 任务是否是运行状态 | |
*/ | |
protected Boolean isRunning = true; | |
/** | |
* 同步锁的关键值 | |
* <p> | |
* 相同的 key 值会进行同步操作 | |
* <p> | |
* 注意:不设置也是同步执行的 | |
*/ | |
protected String syncKeyword = ""; | |
/** | |
* 过期时间 单位:毫秒, 默认 3 分钟 | |
*/ | |
protected Long expire = 1000L * 60 * 3; | |
/** | |
* 获取锁超时时间 单位:毫秒 | |
*/ | |
protected Long acquireTimeout = 2000L; | |
/** | |
* 任务执行超时时间 单位:毫秒 | |
*/ | |
protected Long taskExecuteTimeout = 10000L; | |
/** | |
* 任务在队列中等待超时时间 单位:毫秒 | |
*/ | |
protected Long taskQueueTimeout = 20000L; | |
/** | |
* 初始化值 | |
*/ | |
public AsyncTaskExecutor(String asyncTaskExecutorName, String taskName, String taskType) { | |
//======= 初始化相关参数 ======= | |
// 初始进度 | |
setCompletionDegree(0); | |
// 默认执行中状态 | |
setExecuteStatus(AsyncTaskExecuteStatusEnum.EXECUTING.getType()); | |
this.setTaskName(taskName); | |
this.setTaskType(taskType); | |
this.asyncTaskExecutorName = asyncTaskExecutorName; | |
if (SecurityFrameworkUtils.getLoginUser() != null) { | |
this.setUsername(Objects.requireNonNull(SecurityFrameworkUtils.getLoginUser()).getId().toString()); | |
setKeyword(getRandomKeyword(Objects.requireNonNull(SecurityFrameworkUtils.getLoginUser()).getId().toString())); | |
} else { | |
this.setUsername("0"); | |
setKeyword(getRandomKeyword("0")); | |
} | |
} | |
/** | |
* 基础执行器,只是做异步任务的消息记录 具体的业务由子类实现 {@link #taskContent ()} 完成 | |
*/ | |
public void baseExec() { | |
LockInfo lockInfo = null; | |
try { | |
// 保存锁的关键字,用于判断,是否有任务还在执行。避免频繁执行异步任务,而且还不知道情况。 | |
DynamicThreadPoolFrameworkUtils.STRING_REDIS_TEMPLATE | |
.opsForValue() | |
.set(CACHE_LOCK_KEY + formatAsyncTaskLockKey(asyncTaskExecutorName, getSyncKeyword()) | |
, formatAsyncTaskLockKey(asyncTaskExecutorName, getSyncKeyword()) | |
, getExpire() | |
, TimeUnit.MILLISECONDS); | |
//①尝试获取锁 | |
lockInfo = DynamicThreadPoolFrameworkUtils.LOCK_TEMPLATE.lock(formatAsyncTaskLockKey(asyncTaskExecutorName, getSyncKeyword()) | |
, getExpire() | |
, getAcquireTimeout()); | |
//②获取失败 - 中断任务 | |
if (lockInfo == null) { | |
throw new RuntimeException("获取锁失败,有任务正在执行!"); | |
} | |
//②获取成功 - 进行业务处理 | |
synchronized (lockInfo) { | |
startTimeThreadLocal.set(System.currentTimeMillis()); | |
try { | |
// 保存任务记录,返回 id | |
Long id = DynamicThreadPoolFrameworkUtils.createAsyncMessage(this); | |
this.setId(id); | |
//TODO 可能出现该任务执行过长导致同类型任务一直获取不到锁的情况,所以需要做一个锁超时结束当前执行的任务的功能 | |
// 执行实际的任务内容 | |
boolean execFlag = taskContent(); | |
// 判断执行结果 | |
if (execFlag) { | |
this.setContextInfo(String.format("执行 【%s】 任务执行成功", this.getTaskName())); | |
this.setExecuteStatus(AsyncTaskExecuteStatusEnum.EXECUTION_SUCCESSFUL.getType()); | |
} else { | |
// 如果是失败则需要是判断执行失败,还是任务超时了 | |
if (this.getIsCancel()) { | |
this.baseCallBack(); | |
} else { | |
this.setContextInfo(String.format("执行 【%s】 任务执行失败", this.getTaskName())); | |
this.setExecuteStatus(AsyncTaskExecuteStatusEnum.EXECUTION_FAILED.getType()); | |
} | |
} | |
DynamicThreadPoolFrameworkUtils.updateAsyncMessage(this); | |
} catch (Exception e){ | |
// 将异常信息保存到 String 中 | |
StringWriter sw = new StringWriter(); | |
PrintWriter pw = new PrintWriter(sw); | |
e.printStackTrace(pw); | |
String exceptionAsString = sw.toString(); | |
this.setExecuteStatus(AsyncTaskExecuteStatusEnum.EXECUTION_ERROR.getType()); | |
this.setContextInfo(String.format("执行 【%s】 异常,异常信息:【%s】", this.getTaskName(), e.getMessage())); | |
this.setErrorDetailInfo(String.format("执行 【%s】 异常,异常明细信息:【%s】", this.getTaskName(), exceptionAsString)); | |
} finally { | |
long costTime = System.currentTimeMillis() - startTimeThreadLocal.get(); | |
startTimeThreadLocal.remove(); | |
long cpuUsageTime = threadMXBean.getThreadCpuTime(Thread.currentThread().getId()) / (1000 * 1000); | |
double cpuUsageRate = getThreadCpuUsage(Thread.currentThread().getId()); | |
ThreadInfo threadInfo = threadMXBean.getThreadInfo(Thread.currentThread().getId()); | |
this.setCpuUsageRate(cpuUsageRate + "%"); | |
this.setCpuUsageTime(cpuUsageTime); | |
this.setExecuteTime(costTime); | |
this.setThreadName(threadInfo.getThreadName()); | |
if (this.getId() != null) { | |
DynamicThreadPoolFrameworkUtils.updateAsyncMessage(this); | |
} else { | |
DynamicThreadPoolFrameworkUtils.createAsyncMessage(this); | |
} | |
} | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
throw e; | |
} finally { | |
//③解锁 | |
DynamicThreadPoolFrameworkUtils.LOCK_TEMPLATE.releaseLock(lockInfo); | |
DynamicThreadPoolFrameworkUtils.STRING_REDIS_TEMPLATE | |
.opsForValue() | |
.getOperations() | |
.delete(CACHE_LOCK_KEY + formatAsyncTaskLockKey(asyncTaskExecutorName, getSyncKeyword())); | |
this.setIsRunning(false); | |
if (SecurityFrameworkUtils.getLoginUser() != null) { | |
LoginUser loginUser = SecurityFrameworkUtils.getLoginUser(); | |
DynamicThreadPoolFrameworkUtils.sendAsyncTaskExecutorResult(loginUser.getUserType() | |
, loginUser.getId() | |
, "async-task-message" | |
, JsonUtils.toJsonString(this)); | |
} | |
} | |
} | |
/** | |
* 实际异步执行的任务体抽象方法、由具体的异步任务实现 | |
* @return 执行结果 | |
* @throws Exception 可能出现的业务异常 | |
*/ | |
public abstract boolean taskContent() throws Exception; | |
/** | |
* 取消任务执行 | |
*/ | |
public void cancelTask() { | |
this.isCancel = true; | |
} | |
/** | |
* 任务取消通用的执行逻辑 | |
*/ | |
protected void baseCallBack() { | |
// 删除缓存的当前任务锁 | |
DynamicThreadPoolFrameworkUtils.STRING_REDIS_TEMPLATE | |
.opsForValue() | |
.getOperations() | |
.delete(CACHE_LOCK_KEY + formatAsyncTaskLockKey(asyncTaskExecutorName, getSyncKeyword())); | |
// 设置相关的信息,任务取消的情况 | |
this.setContextInfo(String.format("执行 【%s】 任务执行失败, 原因是任务超时被取消,超时时间 【%s】 ms", this.getTaskName(), this.getTaskExecuteTimeout())); | |
this.setExecuteStatus(AsyncTaskExecuteStatusEnum.EXECUTION_FAILED.getType()); | |
this.cancelTaskCallBack(); | |
} | |
/** | |
* 任务取消后的回调方法 | |
* <p> | |
* 可以在这里释放资源等操作 | |
* </p> | |
*/ | |
public abstract void cancelTaskCallBack(); | |
/** | |
* 推进进度 | |
*/ | |
public void advanceProgress() { | |
this.completionDegree = this.completionDegree + this.progressStepSize; | |
if (this.completionDegree > PERCENTAGE) { | |
this.completionDegree = PERCENTAGE; | |
} | |
} | |
/** | |
* 对执行器进行必要的校验 | |
* @return 校验结果 | |
*/ | |
public String checkAsyncTaskExecutor() { | |
if (StringUtils.isBlank(this.asyncTaskExecutorName)) { | |
return "异步任务执行器名称未设置!"; | |
} | |
if (this.expire < this.taskExecuteTimeout) { | |
return "锁的超时时间必须大于任务执行的超时时间!"; | |
} | |
String cacheLockKey = DynamicThreadPoolFrameworkUtils.STRING_REDIS_TEMPLATE | |
.opsForValue() | |
.get(CACHE_LOCK_KEY + formatAsyncTaskLockKey(this.asyncTaskExecutorName, getSyncKeyword())); | |
if (cacheLockKey != null) { | |
return "存在同类型的任务在执行, 任务名称为:【" + this.asyncTaskExecutorName + "】, 请稍后在试"; | |
} | |
if (this.getTaskExecuteTimeout() > this.getTaskQueueTimeout()) { | |
return "队列等待时间要大于任务执行的超时时间,请调整!"; | |
} | |
return ""; | |
} | |
/** | |
* 格式化异步任务执行器密钥 | |
* @param key1 格式化 key1 | |
* @param key2 格式化 key2 | |
* @return 格式化之后的密钥 | |
*/ | |
private static String formatAsyncTaskExecutorKey(String key1, String key2) { | |
return String.format(ASYNC_TASK_EXECUTOR_KEY, key1, key2); | |
} | |
/** | |
* 格式化异步任务锁定密钥 | |
* @param key1 格式化 key1 | |
* @param key2 格式化 key2 | |
* @return 格式化之后的密钥 | |
*/ | |
private static String formatAsyncTaskLockKey(String key1, String key2) { | |
return String.format(ASYNC_TASK_LOCK_KEY, key1, key2); | |
} | |
/** | |
* 获取一个唯一值 保证唯一 | |
* @param username 用户名 | |
* @return 唯一值,用作标识 | |
*/ | |
public synchronized String getRandomKeyword(String username) { | |
return username + "_" + UUID.randomUUID(); | |
} | |
/** | |
* 根据线程 id 获取线程执行占用总 cpu 的百分比 | |
* @param threadId 线程 id | |
* @return 占用总 cpu 的百分比 | |
*/ | |
private double getThreadCpuUsage(long threadId) { | |
long threadCpuTime = threadMXBean.getThreadCpuTime(threadId); | |
long systemCpuTime = osBean.getProcessCpuTime(); | |
double cpuUsage = ((double) threadCpuTime / (double) systemCpuTime) * 100; | |
cpuUsage = Math.round(cpuUsage * 100.0) / 100.0; | |
return cpuUsage; | |
} | |
} |
# 大致处理步骤
- 通过注入
DynamicThreadPoolApi实例来查询infra_dynamic_pool_thread_executor表的初始化线程池配置,具体方法getInitConfig- 获取到对应配置后转成对应的
dynamictp配置, 具体方法BeanUtil.copyProperties(executor, dtpExecutorProps);- 根据配置注册动态线程池,需要区分不同类型的线程池(特殊线程池 spring 自带、定时任务线程池),特殊线程池需要通过代理的方式扩展,普通线程池直接动态注册 bean 具体方法
registerBeanDefinitions
# 实际例子
继承 AsyncTaskExecutor 类重写方法写自己需要执行的任务逻辑
AsyncTaskExecutor asyncTaskExecutor = new AsyncTaskExecutor("test", "测试任务", "业务任务") { | |
@Override | |
public boolean taskContent() throws Exception { | |
for (int i = 0; i < 100; i++) { | |
if (this.getIsCancel()) { | |
return false; | |
} | |
System.out.println("testtest1: " + i); | |
Thread.sleep(500); | |
} | |
return true; | |
} | |
/** | |
* 任务取消后的回调方法 | |
*/ | |
@Override | |
public void cancelTaskCallBack() { | |
// | |
System.out.println("回调函数1。。。"); | |
} | |
}; | |
asyncTaskExecutor.setSyncKeyword(UUID.randomUUID().toString()); | |
AsyncTaskSubmitDTO asyncTaskSubmitDTO = DynamicThreadPoolFrameworkUtils.execAsyncTask(asyncTaskExecutor); | |
System.out.println("提交情况: " + asyncTaskSubmitDTO.getIsSubmitTask()); | |
System.out.println("提交信息: " + asyncTaskSubmitDTO.getNoSubmitIllustrate()); |
继承类 -> 重写方法实现具体的处理逻辑 -> 调用
DynamicThreadPoolFrameworkUtils.execAsyncTask(asyncTaskExecutor)提交任务
# 问题点和优化
# 问题一
在启动定时任务计算异步任务执行超时的时候 为什么使用的是 Netty HashedWheelTimer 而不用 ScheduledExecutorService ?
ScheduledExecutorService 和 Netty HashedWheelTimer 都是用于处理定时任务的工具,但它们在实现机制、性能特点和适用场景上有一些差异。以下是它们的对比:
# ScheduledExecutorService
# 功能
灵活性: ScheduledExecutorService 提供了更灵活的任务调度机制,支持延迟执行、周期性执行等多种调度方式。这使得它适用于各种不同的定时任务场景。
可控性:通过使用线程池, ScheduledExecutorService 提供了对任务执行线程的管理和控制,能够更好地适应不同的并发需求。
异常处理:相对于 Timer , ScheduledExecutorService 对于任务执行中的异常有更好的处理机制,不会因为一个任务的异常导致整个调度器终止。
线程安全: ScheduledExecutorService 在设计上相对于 Timer 更加线程安全,更适合在多线程环境中使用。
扩展性:作为 ExecutorService 的子接口, ScheduledExecutorService 不仅可以执行定时任务,还能执行普通的异步任务,使得任务的管理更加一致和统一。
# 适用场景
- 适用于需要灵活调度机制的场景,如延迟执行、周期性执行等。
- 适用于需要对任务执行线程进行管理和控制的场景。
- 适用于需要处理任务执行中异常的场景。
# Netty HashedWheelTimer
# 功能
高性能: HashedWheelTimer 是一个高性能的定时器 / 调度库,基于哈希时间轮(Hashed Wheel)的算法,该算法通过将时间划分为多个槽(ticks),每个槽对应一个时间区间,从而实现高效的定时任务调度。
高精度: HashedWheelTimer 可以实现高精度的定时任务调度,适用于需要高精度定时的场景。
高并发: HashedWheelTimer 适用于高并发场景,能够高效地处理大量定时任务。
# 适用场景
- 适用于需要高性能、高精度定时任务调度的场景,如网络编程中的定时任务处理。
- 适用于高并发场景,能够高效地处理大量定时任务。
# 对比
| 特性 | ScheduledExecutorService | Netty HashedWheelTimer |
|---|---|---|
| 灵活性 | 高,支持多种调度方式 | 低,主要用于定时任务 |
| 可控性 | 高,可管理任务执行线程 | 低,任务执行是同步的 |
| 异常处理 | 好,不会因任务异常终止 | 差,任务异常可能影响整个调度器 |
| 线程安全 | 高,适合多线程环境 | 低,任务执行是同步的 |
| 扩展性 | 高,可执行普通异步任务 | 低,主要用于定时任务 |
| 性能 | 一般,适合一般场景 | 高,适合高性能场景 |
| 精度 | 一般,适合一般精度需求 | 高,适合高精度需求 |
| 并发 | 一般,适合一般并发需求 | 高,适合高并发场景 |
# 总结
- ScheduledExecutorService 适用于需要灵活调度机制、对任务执行线程有管理需求、需要处理任务执行中异常的场景。
- Netty HashedWheelTimer 适用于需要高性能、高精度定时任务调度的场景,特别是在高并发环境下。
在执行异步任务的接口中,我们使用了 Netty HashedWheelTimer , 原因是这里会有高并发的情况,对于 ScheduledExecutorService 不要定义核心线程数,调少了后续任务需要在队列中等待,那么会导致执行超时时间不准确,调多了白白占用资源,所以这里根据不同类型的任务都添加一个 Netty HashedWheelTimer 定时器, 同类型任务放入同一个定时器不同时间刻度上。
# 问题二
为什么要添加 TimeoutTracker 类的来管理 Netty HashedWheelTimer 生成的定时任务 Timeout , 并且用 WeakReference 包装?
# WeakReference
WeakReference<T> 是 Java 中的一种引用类型,它是 java.lang.ref.Reference 的一个子类,用于实现弱引用(Weak Reference)。与强引用(Strong Reference)不同,弱引用不会阻止对象被垃圾回收器(GC)回收,即使该对象仍然可以被通过弱引用访问。
# 作用
WeakReference<T> 的主要作用是允许对象在内存不足时被垃圾回收器回收,从而减少内存泄漏的风险,并且可以与引用队列( ReferenceQueue )配合使用,以实现更灵活的内存管理。
# 使用场景
- 缓存(Cache):在缓存系统中,使用弱引用来存储缓存对象,当内存不足时,垃圾回收器会自动回收这些对象,从而避免内存泄漏。
- 注册表(Registry):在某些场景下,需要维护一个对象的引用列表,但又希望这些对象在不再被使用时能够被自动回收。使用弱引用可以实现这一目的。
- 事件监听器(Listener)管理:在事件驱动的系统中,使用弱引用来管理监听器对象,当监听器对象不再被其他强引用所引用时,垃圾回收器会回收这些监听器对象,从而避免内存泄漏。
在本项目中 并发请求太多,原本正常使用是 强引用 ,会导致:强引用无法被 GC 回收。即使调用了 Timeout.cancel(false) 方法也只是取消定任务,引用可能还存在。
如果使用了 WeakReference 只要没有引用到就会自动被清理。
# 扩展
感觉这个功能有点局限性,只能在一台电脑上,还是会和其他应用抢占资源,可能在集群模式下会均衡分配任务,但是我想把这个项目抽离出来单独部署在一台性能比较好的服务器然后专门处理各种异步运算复杂的任务。
但是提交的任务怎么和原来的代码解耦出来呢,可能需要提交代码片段,程序兼容接口接收的代码片段和数据,然后进行执行运输,就像现在的 AI训练数据集一样 提交数据集,程序后台训练然后训练完成返回结果
找了一下可以用 Groovy脚本引擎 大概实现:
# 方案一:Groovy 脚本引擎(推荐)
还没测试
使用 Groovy 脚本引擎,既支持 Java 语法,又提供了更好的沙箱隔离能力。
# 1. 引入依赖
<dependency> | |
<groupId>org.codehaus.groovy</groupId> | |
<artifactId>groovy-all</artifactId> | |
<version>3.0.19</version> | |
<type>pom</type> | |
</dependency> |
# 2. Groovy 脚本执行器
package com.tz.scaffold.framework.dynamicpool.core.script; | |
import groovy.lang.GroovyClassLoader; | |
import groovy.lang.GroovyObject; | |
import groovy.lang.GroovyShell; | |
import groovy.util.GroovyScriptEngine; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.stereotype.Component; | |
import javax.annotation.PreDestroy; | |
import java.util.concurrent.*; | |
/** | |
* Groovy 脚本执行器 - 支持动态脚本执行 | |
*/ | |
@Slf4j | |
@Component | |
public class GroovyScriptExecutor { | |
// 脚本缓存 | |
private final ConcurrentHashMap<String, GroovyObject> scriptCache = new ConcurrentHashMap<>(); | |
// 独立的类加载器,用于隔离 | |
private final GroovyClassLoader groovyClassLoader; | |
// 脚本执行线程池 | |
private final ExecutorService scriptExecutor; | |
public GroovyScriptExecutor() { | |
// 使用自定义类加载器,限制权限 | |
this.groovyClassLoader = new GroovyClassLoader( | |
Thread.currentThread().getContextClassLoader() | |
); | |
this.scriptExecutor = Executors.newCachedThreadPool( | |
r -> { | |
Thread t = new Thread(r); | |
t.setName("groovy-script-" + System.currentTimeMillis()); | |
t.setDaemon(true); | |
return t; | |
} | |
); | |
} | |
/** | |
* 执行 Groovy 脚本 | |
* @param script 脚本内容 | |
* @param context 上下文参数 | |
* @return 执行结果 | |
*/ | |
public Object executeScript(String script, Map<String, Object> context) throws Exception { | |
// 1. 安全检查 | |
ScriptSecurityValidator.validate(script); | |
// 2. 生成脚本签名 | |
String scriptHash = DigestUtils.md5DigestAsHex(script.getBytes()); | |
// 3. 获取或编译脚本 | |
GroovyObject groovyObject = scriptCache.computeIfAbsent(scriptHash, key -> { | |
try { | |
Class<?> scriptClass = groovyClassLoader.parseClass(script); | |
return (GroovyObject) scriptClass.newInstance(); | |
} catch (Exception e) { | |
throw new RuntimeException("编译脚本失败", e); | |
} | |
}); | |
// 4. 设置上下文 | |
if (context != null) { | |
context.forEach((k, v) -> { | |
try { | |
groovyObject.setProperty(k, v); | |
} catch (Exception e) { | |
log.warn("设置属性失败: {}={}", k, v); | |
} | |
}); | |
} | |
// 5. 执行脚本 | |
return groovyObject.invokeMethod("run", null); | |
} | |
/** | |
* 异步执行脚本 | |
*/ | |
public CompletableFuture<Object> executeScriptAsync(String script, | |
Map<String, Object> context) { | |
return CompletableFuture.supplyAsync(() -> { | |
try { | |
return executeScript(script, context); | |
} catch (Exception e) { | |
log.error("执行脚本失败", e); | |
throw new CompletionException(e); | |
} | |
}, scriptExecutor); | |
} | |
@PreDestroy | |
public void destroy() { | |
scriptExecutor.shutdown(); | |
groovyClassLoader.clearCache(); | |
} | |
} |
# 3. 安全的脚本沙箱
package com.tz.scaffold.framework.dynamicpool.core.script; | |
import java.util.regex.Pattern; | |
/** | |
* 脚本安全校验器 | |
*/ | |
public class ScriptSecurityValidator { | |
// 危险操作正则 | |
private static final Pattern[] DANGEROUS_PATTERNS = { | |
Pattern.compile("\\bSystem\\.(exit|gc|runFinalization|load|loadLibrary)\\b"), | |
Pattern.compile("\\bRuntime\\.(exec|getRuntime)\\b"), | |
Pattern.compile("\\bProcessBuilder\\b"), | |
Pattern.compile("\\bClass\\.(forName|newInstance)\\b"), | |
Pattern.compile("\\bMethod\\.(invoke)\\b"), | |
Pattern.compile("\\bField\\.(set|get)\\b"), | |
Pattern.compile("\\bFile(InputStream|OutputStream|Reader|Writer)\\b"), | |
Pattern.compile("\\bSocket\\b"), | |
Pattern.compile("\\bServerSocket\\b"), | |
Pattern.compile("\\bDatagramSocket\\b"), | |
Pattern.compile("\\bURL\\.openConnection\\b"), | |
Pattern.compile("\\bHttpClient\\b"), | |
Pattern.compile("\\bThread\\.(stop|suspend|resume)\\b"), | |
Pattern.compile("\\bUnsafe\\b"), | |
Pattern.compile("\\bSecurityManager\\b") | |
}; | |
public static void validate(String script) { | |
if (script == null || script.trim().isEmpty()) { | |
throw new IllegalArgumentException("脚本内容不能为空"); | |
} | |
// 检查脚本长度 | |
if (script.length() > 50000) { | |
throw new IllegalArgumentException("脚本内容过长,最大支持50000字符"); | |
} | |
// 检查危险操作 | |
for (Pattern pattern : DANGEROUS_PATTERNS) { | |
if (pattern.matcher(script).find()) { | |
throw new SecurityException("脚本包含危险操作: " + pattern.pattern()); | |
} | |
} | |
} | |
} |
# 4. 集成到 AsyncTaskExecutor
package com.tz.scaffold.framework.dynamicpool.core.task.timely; | |
import com.tz.scaffold.framework.dynamicpool.core.script.GroovyScriptExecutor; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.context.annotation.Scope; | |
import org.springframework.stereotype.Component; | |
import java.util.HashMap; | |
import java.util.Map; | |
/** | |
* 脚本异步任务执行器 | |
*/ | |
@Slf4j | |
@Component | |
@Scope("prototype") | |
public class ScriptAsyncTaskExecutor extends AsyncTaskExecutor { | |
@Autowired | |
private GroovyScriptExecutor scriptExecutor; | |
private String scriptContent; | |
private Map<String, Object> scriptContext; | |
public ScriptAsyncTaskExecutor() { | |
super("script-executor", "脚本任务", "SCRIPT"); | |
this.scriptContext = new HashMap<>(); | |
} | |
/** | |
* 设置要执行的脚本 | |
*/ | |
public void setScript(String scriptContent) { | |
this.scriptContent = scriptContent; | |
} | |
/** | |
* 设置脚本上下文变量 | |
*/ | |
public void setScriptContext(Map<String, Object> context) { | |
if (context != null) { | |
this.scriptContext.putAll(context); | |
} | |
} | |
@Override | |
public boolean taskContent() throws Exception { | |
if (scriptContent == null || scriptContent.trim().isEmpty()) { | |
log.error("脚本内容为空"); | |
return false; | |
} | |
// 添加当前实例到上下文,方便脚本操作进度 | |
scriptContext.put("task", this); | |
scriptContext.put("logger", log); | |
// 执行脚本 | |
Object result = scriptExecutor.executeScript(scriptContent, scriptContext); | |
// 处理返回结果 | |
if (result instanceof Boolean) { | |
return (Boolean) result; | |
} else if (result instanceof Number) { | |
return ((Number) result).intValue() > 0; | |
} else { | |
return result != null; | |
} | |
} | |
@Override | |
public void cancelTaskCallBack() { | |
log.info("脚本任务 [{}] 被取消", this.getTaskName()); | |
} | |
} |
# 5. 用户调用接口
@RestController | |
@RequestMapping("/api/script-task") | |
public class ScriptTaskController { | |
@Autowired | |
private ApplicationContext applicationContext; | |
@PostMapping("/execute") | |
public Result<String> executeScriptTask(@RequestBody ScriptTaskRequest request) { | |
try { | |
// 创建脚本任务实例 | |
ScriptAsyncTaskExecutor executor = applicationContext.getBean( | |
ScriptAsyncTaskExecutor.class | |
); | |
// 设置脚本 | |
executor.setScript(request.getScriptContent()); | |
// 设置上下文 | |
Map<String, Object> context = new HashMap<>(); | |
context.put("userId", SecurityFrameworkUtils.getLoginUser().getId()); | |
context.put("params", request.getParams()); | |
executor.setScriptContext(context); | |
// 执行任务 | |
executor.baseExec(); | |
return Result.success("脚本任务已提交,任务ID: " + executor.getId()); | |
} catch (Exception e) { | |
log.error("执行脚本任务失败", e); | |
return Result.error("执行失败: " + e.getMessage()); | |
} | |
} | |
} | |
@Data | |
public class ScriptTaskRequest { | |
@NotEmpty(message = "脚本内容不能为空") | |
private String scriptContent; | |
private String taskName = "脚本任务"; | |
private String taskType = "SCRIPT"; | |
private Map<String, Object> params; | |
} |
# 6. 用户脚本示例
// 用户提交的脚本 - 支持 Groovy/Java 混合语法 | |
import java.time.LocalDateTime | |
// 获取上下文参数 | |
def userId = params.userId ?: "未知用户" | |
logger.info("开始执行任务,用户: {}", userId) | |
// 模拟业务处理 | |
for (int i = 0; i <= 10; i++) { | |
Thread.sleep(1000) | |
// 更新进度 | |
task.setCompletionDegree(i * 10) | |
task.advanceProgress() | |
logger.info("进度: {}%", i * 10) | |
} | |
// 返回结果 | |
return true |