# scaffold 项目之定时任务管理
# 简介
定时任务(Scheduled Task / Cron Job) 是一种在预设时间或周期自动执行特定操作的机制,广泛应用于系统运维、数据处理、自动化流程等场景。
# 使用场景
# 1. 系统运维自动化
- 日志轮转:每天凌晨清理 Nginx 访问日志(如
logrotate)。 - 备份:每日 3 点全量备份 MySQL 数据库到云存储。
- 健康检查:每 10 分钟检测服务状态,异常时重启并发送告警。
# 2. 数据处理与 ETL
- 数据同步:每小时从 API 拉取订单数据到数据仓库。
- 报表生成:每周一上午 8 点生成销售周报并邮件发送给管理层。
- 缓存更新:每 5 分钟刷新 Redis 中的热点数据。
# 3. 业务逻辑自动化
- 用户通知:每天 8 点推送生日祝福短信。
- 订单超时:每 30 分钟扫描未支付订单,超时自动取消。
- 内容清理:每月 1 日删除 3 个月前的临时文件。
# 4. 监控与告警
- 资源监控:每 1 分钟检查 CPU 使用率,超过 90% 触发告警。
- 安全扫描:每天凌晨 2 点运行漏洞扫描脚本。
# 开始使用
本项目基于 Quartz + MySQL 实现分布式定时任务,并提供 [基础设施 -> 定时任务] 菜单,进行定时任务的统一管理,支持动态控制任务的添加、修改、开启、暂停、删除、执行一次等操作。
项目对
Quartz框架做了如下处理:
- 封装了对应的模块
scaffold-spring-boot-starter-job。- 在
基础设施模块下添加了job包,提供任务的动态管理,执行日志的存储。注意: 基础设施模块 的 job 包需要引入
scaffold-spring-boot-starter-job包
# 对应实现代码
application.yml
# Quartz 配置项,对应 QuartzProperties 配置类 | |
spring: | |
quartz: | |
auto-startup: true # 本地开发环境,尽量不要开启 Job | |
scheduler-name: schedulerName # Scheduler 名字。默认为 schedulerName | |
job-store-type: jdbc # Job 存储器类型。默认为 memory 表示内存,可选 jdbc 使用数据库。 | |
wait-for-jobs-to-complete-on-shutdown: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true | |
properties: # 添加 Quartz Scheduler 附加属性,更多可以看 http://www.quartz-scheduler.org/documentation/2.4.0-SNAPSHOT/configuration.html 文档 | |
org: | |
quartz: | |
# Scheduler 相关配置 | |
scheduler: | |
instanceName: schedulerName | |
instanceId: AUTO # 自动生成 instance ID | |
# JobStore 相关配置 | |
jobStore: | |
# JobStore 实现类。可见博客:https://blog.csdn.net/weixin_42458219/article/details/122247162 | |
class: org.springframework.scheduling.quartz.LocalDataSourceJobStore | |
isClustered: true # 是集群模式 | |
clusterCheckinInterval: 15000 # 集群检查频率,单位:毫秒。默认为 15000,即 15 秒 | |
misfireThreshold: 60000 # misfire 阀值,单位:毫秒。 | |
# 线程池相关配置 | |
threadPool: | |
threadCount: 25 # 线程池大小。默认为 10 。 | |
threadPriority: 5 # 线程优先级 | |
class: org.quartz.simpl.SimpleThreadPool # 线程池类型 | |
jdbc: # 使用 JDBC 的 JobStore 的时候,JDBC 的配置 | |
initialize-schema: NEVER # 是否自动使用 SQL 初始化 Quartz 表结构。这里设置成 never ,我们手动创建表结构。 |
# springboot-starter-job 模块
# pom.xml
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-quartz</artifactId> | |
</dependency> |
# 启动类
package com.tz.scaffold.framework.quartz.config; | |
/** | |
* <p> Project: scaffold - ScaffoldAsyncAutoConfiguration </p> | |
* | |
* 异步任务的配置类 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
@AutoConfiguration | |
@EnableAsync | |
public class ScaffoldAsyncAutoConfiguration { | |
@Bean | |
public BeanPostProcessor threadPoolTaskExecutorBeanPostProcessor() { | |
return new BeanPostProcessor() { | |
@Override | |
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { | |
if (!(bean instanceof ThreadPoolTaskExecutor)) { | |
return bean; | |
} | |
// 修改提交的任务,接入 TransmittableThreadLocal | |
ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) bean; | |
executor.setTaskDecorator(TtlRunnable::get); | |
return executor; | |
} | |
}; | |
} | |
} |
# 配置类
package com.tz.scaffold.framework.quartz.config; | |
/** | |
* <p> Project: scaffold - ScaffoldQuartzAutoConfiguration </p> | |
* | |
* 定时任务的配置类 | |
* <p> | |
* EnableScheduling: 开启 Spring 自带的定时任务 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
@AutoConfiguration | |
@EnableScheduling | |
@Slf4j | |
public class ScaffoldQuartzAutoConfiguration { | |
@Bean | |
public SchedulerManager schedulerManager(Optional<Scheduler> scheduler) { | |
if (!scheduler.isPresent()) { | |
log.info("[定时任务 - 已禁用][参考 https://www.tzzfj.cn/ 开启]"); | |
return new SchedulerManager(null); | |
} | |
return new SchedulerManager(scheduler.get()); | |
} | |
} |
# 基础 Job 调用者
package com.tz.scaffold.framework.quartz.core.handler; | |
import static cn.hutool.core.exceptions.ExceptionUtil.getRootCauseMessage; | |
/** | |
* <p> Project: scaffold - JobHandlerInvoker </p> | |
* | |
* 基础 Job 调用者,负责调用 {@link JobHandler#execute (String)} 执行任务 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
@DisallowConcurrentExecution | |
@PersistJobDataAfterExecution | |
@Slf4j | |
public class JobHandlerInvoker extends QuartzJobBean { | |
@Resource | |
private ApplicationContext applicationContext; | |
@Resource | |
private JobLogFrameworkService jobLogFrameworkService; | |
@Override | |
protected void executeInternal(JobExecutionContext executionContext) throws JobExecutionException { | |
// 第一步,获得 Job 数据 | |
Long jobId = executionContext.getMergedJobDataMap().getLong(JobDataKeyEnum.JOB_ID.name()); | |
String jobHandlerName = executionContext.getMergedJobDataMap().getString(JobDataKeyEnum.JOB_HANDLER_NAME.name()); | |
String jobHandlerParam = executionContext.getMergedJobDataMap().getString(JobDataKeyEnum.JOB_HANDLER_PARAM.name()); | |
int refireCount = executionContext.getRefireCount(); | |
int retryCount = (Integer) executionContext.getMergedJobDataMap().getOrDefault(JobDataKeyEnum.JOB_RETRY_COUNT.name(), 0); | |
int retryInterval = (Integer) executionContext.getMergedJobDataMap().getOrDefault(JobDataKeyEnum.JOB_RETRY_INTERVAL.name(), 0); | |
// 第二步,执行任务 | |
Long jobLogId = null; | |
LocalDateTime startTime = LocalDateTime.now(); | |
String data = null; | |
Throwable exception = null; | |
try { | |
// 记录 Job 日志(初始) | |
jobLogId = jobLogFrameworkService.createJobLog(jobId, startTime, jobHandlerName, jobHandlerParam, refireCount + 1); | |
// 执行任务 | |
data = this.executeInternal(jobHandlerName, jobHandlerParam); | |
} catch (Throwable ex) { | |
exception = ex; | |
} | |
// 第三步,记录执行日志 | |
this.updateJobLogResultAsync(jobLogId, startTime, data, exception, executionContext); | |
// 第四步,处理有异常的情况 | |
handleException(exception, refireCount, retryCount, retryInterval); | |
} | |
private String executeInternal(String jobHandlerName, String jobHandlerParam) throws Exception { | |
// 获得 JobHandler 对象 | |
JobHandler jobHandler = applicationContext.getBean(jobHandlerName, JobHandler.class); | |
Assert.notNull(jobHandler, "JobHandler 不会为空"); | |
// 执行任务 | |
return jobHandler.execute(jobHandlerParam); | |
} | |
private void updateJobLogResultAsync(Long jobLogId, LocalDateTime startTime, String data, Throwable exception, | |
JobExecutionContext executionContext) { | |
LocalDateTime endTime = LocalDateTime.now(); | |
// 处理是否成功 | |
boolean success = exception == null; | |
if (!success) { | |
data = getRootCauseMessage(exception); | |
} | |
// 更新日志 | |
try { | |
jobLogFrameworkService.updateJobLogResultAsync(jobLogId, endTime, (int) LocalDateTimeUtil.between(startTime, endTime).toMillis(), success, data); | |
} catch (Exception ex) { | |
log.error("[executeInternal][Job({}) logId({}) 记录执行日志失败({}/{})]", | |
executionContext.getJobDetail().getKey(), jobLogId, success, data); | |
} | |
} | |
private void handleException(Throwable exception, | |
int refireCount, int retryCount, int retryInterval) throws JobExecutionException { | |
// 如果有异常,则进行重试 | |
if (exception == null) { | |
return; | |
} | |
// 情况一:如果到达重试上限,则直接抛出异常即可 | |
if (refireCount >= retryCount) { | |
throw new JobExecutionException(exception); | |
} | |
// 情况二:如果未到达重试上限,则 sleep 一定间隔时间,然后重试 | |
// 这里使用 sleep 来实现,主要还是希望实现比较简单。因为,同一时间,不会存在大量失败的 Job。 | |
if (retryInterval > 0) { | |
ThreadUtil.sleep(retryInterval); | |
} | |
// 第二个参数,refireImmediately = true,表示立即重试 | |
throw new JobExecutionException(exception, true); | |
} | |
} |
#
executeInternal(String jobHandlerName, String jobHandlerParam)功能:反射调用具体业务处理器,也就是具体的实现
流程:
- 从 Spring 容器获取
JobHandler实例- 调用其
execute(jobHandlerParam)方法执行业务逻辑
# 任务处理器
/** | |
* <p> Project: scaffold - JobHandler </p> | |
* | |
* 任务处理器 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
public interface JobHandler { | |
/** | |
* 执行任务 | |
* | |
* @param param 参数 | |
* @return 结果 | |
* @throws Exception 异常 | |
*/ | |
String execute(String param) throws Exception; | |
} |
# 任务具体实现例子
/** | |
* <p> Project: scaffold - DemoJob </p> | |
* | |
* 演示作业 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
@Component | |
public class DemoJob implements JobHandler { | |
@Resource | |
private AdminUserMapper adminUserMapper; | |
@Override | |
@TenantJob | |
public String execute(String param) { | |
System.out.println("当前租户:" + TenantContextHolder.getTenantId()); | |
List<AdminUserDO> users = adminUserMapper.selectList(); | |
return "用户数量:" + users.size(); | |
} | |
} |
# Scheduler 管理器,负责创建任务
package com.tz.scaffold.framework.quartz.core.scheduler; | |
/** | |
* <p> Project: scaffold - SchedulerManager </p> | |
* | |
* {@link org.quartz.Scheduler} 的管理器,负责创建任务 | |
* <p> | |
* 考虑到实现的简洁性,我们使用 jobHandlerName 作为唯一标识,即: | |
* <li> | |
* 1. Job 的 {@link JobDetail#getKey ()} | |
* <li> | |
* 2. Trigger 的 {@link Trigger#getKey ()} | |
* <p> | |
* 另外,jobHandlerName 对应到 Spring Bean 的名字,直接调用 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
public class SchedulerManager { | |
private final Scheduler scheduler; | |
public SchedulerManager(Scheduler scheduler) { | |
this.scheduler = scheduler; | |
} | |
/** | |
* 添加 Job 到 Quartz 中 | |
* | |
* @param jobId 任务编号 | |
* @param jobHandlerName 任务处理器的名字 | |
* @param jobHandlerParam 任务处理器的参数 | |
* @param cronExpression CRON 表达式 | |
* @param retryCount 重试次数 | |
* @param retryInterval 重试间隔 | |
* @throws SchedulerException 添加异常 | |
*/ | |
public void addJob(Long jobId, String jobHandlerName, String jobHandlerParam, String cronExpression, | |
Integer retryCount, Integer retryInterval) | |
throws SchedulerException { | |
validateScheduler(); | |
// 创建 JobDetail 对象 | |
JobDetail jobDetail = JobBuilder.newJob(JobHandlerInvoker.class) | |
.usingJobData(JobDataKeyEnum.JOB_ID.name(), jobId) | |
.usingJobData(JobDataKeyEnum.JOB_HANDLER_NAME.name(), jobHandlerName) | |
.withIdentity(jobHandlerName).build(); | |
// 创建 Trigger 对象 | |
Trigger trigger = this.buildTrigger(jobHandlerName, jobHandlerParam, cronExpression, retryCount, retryInterval); | |
// 新增调度 | |
scheduler.scheduleJob(jobDetail, trigger); | |
} | |
/** | |
* 更新 Job 到 Quartz | |
* | |
* @param jobHandlerName 任务处理器的名字 | |
* @param jobHandlerParam 任务处理器的参数 | |
* @param cronExpression CRON 表达式 | |
* @param retryCount 重试次数 | |
* @param retryInterval 重试间隔 | |
* @throws SchedulerException 更新异常 | |
*/ | |
public void updateJob(String jobHandlerName, String jobHandlerParam, String cronExpression, | |
Integer retryCount, Integer retryInterval) | |
throws SchedulerException { | |
validateScheduler(); | |
// 创建新 Trigger 对象 | |
Trigger newTrigger = this.buildTrigger(jobHandlerName, jobHandlerParam, cronExpression, retryCount, retryInterval); | |
// 修改调度 | |
scheduler.rescheduleJob(new TriggerKey(jobHandlerName), newTrigger); | |
} | |
/** | |
* 删除 Quartz 中的 Job | |
* | |
* @param jobHandlerName 任务处理器的名字 | |
* @throws SchedulerException 删除异常 | |
*/ | |
public void deleteJob(String jobHandlerName) throws SchedulerException { | |
validateScheduler(); | |
scheduler.deleteJob(new JobKey(jobHandlerName)); | |
} | |
/** | |
* 暂停 Quartz 中的 Job | |
* | |
* @param jobHandlerName 任务处理器的名字 | |
* @throws SchedulerException 暂停异常 | |
*/ | |
public void pauseJob(String jobHandlerName) throws SchedulerException { | |
validateScheduler(); | |
scheduler.pauseJob(new JobKey(jobHandlerName)); | |
} | |
/** | |
* 启动 Quartz 中的 Job | |
* | |
* @param jobHandlerName 任务处理器的名字 | |
* @throws SchedulerException 启动异常 | |
*/ | |
public void resumeJob(String jobHandlerName) throws SchedulerException { | |
validateScheduler(); | |
scheduler.resumeJob(new JobKey(jobHandlerName)); | |
scheduler.resumeTrigger(new TriggerKey(jobHandlerName)); | |
} | |
/** | |
* 立即触发一次 Quartz 中的 Job | |
* | |
* @param jobId 任务编号 | |
* @param jobHandlerName 任务处理器的名字 | |
* @param jobHandlerParam 任务处理器的参数 | |
* @throws SchedulerException 触发异常 | |
*/ | |
public void triggerJob(Long jobId, String jobHandlerName, String jobHandlerParam) | |
throws SchedulerException { | |
validateScheduler(); | |
// 触发任务 | |
JobDataMap data = new JobDataMap(); // 无需重试,所以不设置 retryCount 和 retryInterval | |
data.put(JobDataKeyEnum.JOB_ID.name(), jobId); | |
data.put(JobDataKeyEnum.JOB_HANDLER_NAME.name(), jobHandlerName); | |
data.put(JobDataKeyEnum.JOB_HANDLER_PARAM.name(), jobHandlerParam); | |
scheduler.triggerJob(new JobKey(jobHandlerName), data); | |
} | |
private Trigger buildTrigger(String jobHandlerName, String jobHandlerParam, String cronExpression, | |
Integer retryCount, Integer retryInterval) { | |
return TriggerBuilder.newTrigger() | |
.withIdentity(jobHandlerName) | |
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)) | |
.usingJobData(JobDataKeyEnum.JOB_HANDLER_PARAM.name(), jobHandlerParam) | |
.usingJobData(JobDataKeyEnum.JOB_RETRY_COUNT.name(), retryCount) | |
.usingJobData(JobDataKeyEnum.JOB_RETRY_INTERVAL.name(), retryInterval) | |
.build(); | |
} | |
private void validateScheduler() { | |
if (scheduler == null) { | |
throw exception0(NOT_IMPLEMENTED.getCode(), | |
"[定时任务 - 已禁用][参考 https://www.tzzfj.cn/ 开启]"); | |
} | |
} | |
} |
# infar 基础设施模块
主要是提供对定时任务的图形化管理
controller
package com.tz.scaffold.module.infra.controller.admin.job; | |
/** | |
* <p> Project: scaffold - JobController </p> | |
* | |
* 管理后台 - 定时任务 | |
* @author Tz | |
* @date 2024/01/09 23:45 | |
* @version 1.0.0 | |
* @since 1.0.0 | |
*/ | |
@Tag(name = "管理后台 - 定时任务") | |
@RestController | |
@RequestMapping("/infra/job") | |
@Validated | |
public class JobController { | |
@Resource | |
private JobService jobService; | |
@PostMapping("/create") | |
@Operation(summary = "创建定时任务") | |
@PreAuthorize("@ss.hasPermission('infra:job:create')") | |
public CommonResult<Long> createJob(@Valid @RequestBody JobCreateReqVO createReqVO) | |
throws SchedulerException { | |
return success(jobService.createJob(createReqVO)); | |
} | |
@PutMapping("/update") | |
@Operation(summary = "更新定时任务") | |
@PreAuthorize("@ss.hasPermission('infra:job:update')") | |
public CommonResult<Boolean> updateJob(@Valid @RequestBody JobUpdateReqVO updateReqVO) | |
throws SchedulerException { | |
jobService.updateJob(updateReqVO); | |
return success(true); | |
} | |
@PutMapping("/update-status") | |
@Operation(summary = "更新定时任务的状态") | |
@Parameters({ | |
@Parameter(name = "id", description = "编号", required = true, example = "1024"), | |
@Parameter(name = "status", description = "状态", required = true, example = "1"), | |
}) | |
@PreAuthorize("@ss.hasPermission('infra:job:update')") | |
public CommonResult<Boolean> updateJobStatus(@RequestParam(value = "id") Long id, @RequestParam("status") Integer status) | |
throws SchedulerException { | |
jobService.updateJobStatus(id, status); | |
return success(true); | |
} | |
@DeleteMapping("/delete") | |
@Operation(summary = "删除定时任务") | |
@Parameter(name = "id", description = "编号", required = true, example = "1024") | |
@PreAuthorize("@ss.hasPermission('infra:job:delete')") | |
public CommonResult<Boolean> deleteJob(@RequestParam("id") Long id) | |
throws SchedulerException { | |
jobService.deleteJob(id); | |
return success(true); | |
} | |
@PutMapping("/trigger") | |
@Operation(summary = "触发定时任务") | |
@Parameter(name = "id", description = "编号", required = true, example = "1024") | |
@PreAuthorize("@ss.hasPermission('infra:job:trigger')") | |
public CommonResult<Boolean> triggerJob(@RequestParam("id") Long id) throws SchedulerException { | |
jobService.triggerJob(id); | |
return success(true); | |
} | |
@GetMapping("/get") | |
@Operation(summary = "获得定时任务") | |
@Parameter(name = "id", description = "编号", required = true, example = "1024") | |
@PreAuthorize("@ss.hasPermission('infra:job:query')") | |
public CommonResult<JobRespVO> getJob(@RequestParam("id") Long id) { | |
JobDO job = jobService.getJob(id); | |
return success(JobConvert.INSTANCE.convert(job)); | |
} | |
@GetMapping("/list") | |
@Operation(summary = "获得定时任务列表") | |
@Parameter(name = "ids", description = "编号列表", required = true) | |
@PreAuthorize("@ss.hasPermission('infra:job:query')") | |
public CommonResult<List<JobRespVO>> getJobList(@RequestParam("ids") Collection<Long> ids) { | |
List<JobDO> list = jobService.getJobList(ids); | |
return success(JobConvert.INSTANCE.convertList(list)); | |
} | |
@GetMapping("/page") | |
@Operation(summary = "获得定时任务分页") | |
@PreAuthorize("@ss.hasPermission('infra:job:query')") | |
public CommonResult<PageResult<JobRespVO>> getJobPage(@Valid JobPageReqVO pageVO) { | |
PageResult<JobDO> pageResult = jobService.getJobPage(pageVO); | |
return success(JobConvert.INSTANCE.convertPage(pageResult)); | |
} | |
@GetMapping("/export-excel") | |
@Operation(summary = "导出定时任务 Excel") | |
@PreAuthorize("@ss.hasPermission('infra:job:export')") | |
@OperateLog(type = EXPORT) | |
public void exportJobExcel(@Valid JobExportReqVO exportReqVO, | |
HttpServletResponse response) throws IOException { | |
List<JobDO> list = jobService.getJobList(exportReqVO); | |
// 导出 Excel | |
List<JobExcelVO> datas = JobConvert.INSTANCE.convertList02(list); | |
ExcelUtils.write(response, "定时任务.xls", "数据", JobExcelVO.class, datas); | |
} | |
@GetMapping("/get_next_times") | |
@Operation(summary = "获得定时任务的下 n 次执行时间") | |
@Parameters({ | |
@Parameter(name = "id", description = "编号", required = true, example = "1024"), | |
@Parameter(name = "count", description = "数量", example = "5") | |
}) | |
@PreAuthorize("@ss.hasPermission('infra:job:query')") | |
public CommonResult<List<LocalDateTime>> getJobNextTimes(@RequestParam("id") Long id, | |
@RequestParam(value = "count", required = false, defaultValue = "5") Integer count) { | |
JobDO job = jobService.getJob(id); | |
if (job == null) { | |
return success(Collections.emptyList()); | |
} | |
return success(CronUtils.getNextTimes(job.getCronExpression(), count)); | |
} | |
} |
在页面中点击 [更多 -> 调度日志] 按钮,可以査看到 定时任务 的执行日志