# scaffold 项目之定时任务管理

# 简介

定时任务(Scheduled Task / Cron Job) 是一种在预设时间或周期自动执行特定操作的机制,广泛应用于系统运维、数据处理、自动化流程等场景。

# 使用场景

# 1. 系统运维自动化

  • 日志轮转:每天凌晨清理 Nginx 访问日志(如 logrotate )。
  • 备份:每日 3 点全量备份 MySQL 数据库到云存储。
  • 健康检查:每 10 分钟检测服务状态,异常时重启并发送告警。

# 2. 数据处理与 ETL

  • 数据同步:每小时从 API 拉取订单数据到数据仓库。
  • 报表生成:每周一上午 8 点生成销售周报并邮件发送给管理层。
  • 缓存更新:每 5 分钟刷新 Redis 中的热点数据。

# 3. 业务逻辑自动化

  • 用户通知:每天 8 点推送生日祝福短信。
  • 订单超时:每 30 分钟扫描未支付订单,超时自动取消。
  • 内容清理:每月 1 日删除 3 个月前的临时文件。

# 4. 监控与告警

  • 资源监控:每 1 分钟检查 CPU 使用率,超过 90% 触发告警。
  • 安全扫描:每天凌晨 2 点运行漏洞扫描脚本。

# 开始使用

本项目基于 Quartz + MySQL 实现分布式定时任务,并提供 [基础设施 -> 定时任务] 菜单,进行定时任务的统一管理,支持动态控制任务的添加、修改、开启、暂停、删除、执行一次等操作。

项目对 Quartz框架 做了如下处理:

  1. 封装了对应的模块 scaffold-spring-boot-starter-job
  2. 基础设施模块 下添加了 job 包,提供任务的动态管理,执行日志的存储。

注意: 基础设施模块 的 job 包需要引入 scaffold-spring-boot-starter-job

# 对应实现代码

application.yml

# Quartz 配置项,对应 QuartzProperties 配置类
spring:
  quartz:
    auto-startup: true # 本地开发环境,尽量不要开启 Job
    scheduler-name: schedulerName # Scheduler 名字。默认为 schedulerName
    job-store-type: jdbc # Job 存储器类型。默认为 memory 表示内存,可选 jdbc 使用数据库。
    wait-for-jobs-to-complete-on-shutdown: true # 应用关闭时,是否等待定时任务执行完成。默认为 false ,建议设置为 true
    properties: # 添加 Quartz Scheduler 附加属性,更多可以看 http://www.quartz-scheduler.org/documentation/2.4.0-SNAPSHOT/configuration.html 文档
      org:
        quartz:
          # Scheduler 相关配置
          scheduler:
            instanceName: schedulerName
            instanceId: AUTO # 自动生成 instance ID
          # JobStore 相关配置
          jobStore:
            # JobStore 实现类。可见博客:https://blog.csdn.net/weixin_42458219/article/details/122247162
            class: org.springframework.scheduling.quartz.LocalDataSourceJobStore
            isClustered: true # 是集群模式
            clusterCheckinInterval: 15000 # 集群检查频率,单位:毫秒。默认为 15000,即 15 秒
            misfireThreshold: 60000 # misfire 阀值,单位:毫秒。
          # 线程池相关配置
          threadPool:
            threadCount: 25 # 线程池大小。默认为 10 。
            threadPriority: 5 # 线程优先级
            class: org.quartz.simpl.SimpleThreadPool # 线程池类型
    jdbc: # 使用 JDBC 的 JobStore 的时候,JDBC 的配置
      initialize-schema: NEVER # 是否自动使用 SQL 初始化 Quartz 表结构。这里设置成 never ,我们手动创建表结构。

# springboot-starter-job 模块

# pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

# 启动类

package com.tz.scaffold.framework.quartz.config;
/**
 * <p> Project: scaffold - ScaffoldAsyncAutoConfiguration </p>
 *
 * 异步任务的配置类
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
@AutoConfiguration
@EnableAsync
public class ScaffoldAsyncAutoConfiguration {
    @Bean
    public BeanPostProcessor threadPoolTaskExecutorBeanPostProcessor() {
        return new BeanPostProcessor() {
            @Override
            public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                if (!(bean instanceof ThreadPoolTaskExecutor)) {
                    return bean;
                }
                // 修改提交的任务,接入 TransmittableThreadLocal
                ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) bean;
                executor.setTaskDecorator(TtlRunnable::get);
                return executor;
            }
        };
    }
}

# 配置类

package com.tz.scaffold.framework.quartz.config;
/**
 * <p> Project: scaffold - ScaffoldQuartzAutoConfiguration </p>
 *
 * 定时任务的配置类
 * <p>
 * EnableScheduling: 开启 Spring 自带的定时任务
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
@AutoConfiguration
@EnableScheduling
@Slf4j
public class ScaffoldQuartzAutoConfiguration {
    @Bean
    public SchedulerManager schedulerManager(Optional<Scheduler> scheduler) {
        if (!scheduler.isPresent()) {
            log.info("[定时任务 - 已禁用][参考 https://www.tzzfj.cn/ 开启]");
            return new SchedulerManager(null);
        }
        return new SchedulerManager(scheduler.get());
    }
}

# 基础 Job 调用者

package com.tz.scaffold.framework.quartz.core.handler;
import static cn.hutool.core.exceptions.ExceptionUtil.getRootCauseMessage;
/**
 * <p> Project: scaffold - JobHandlerInvoker </p>
 *
 * 基础 Job 调用者,负责调用 {@link JobHandler#execute (String)} 执行任务
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
@Slf4j
public class JobHandlerInvoker extends QuartzJobBean {
    @Resource
    private ApplicationContext applicationContext;
    @Resource
    private JobLogFrameworkService jobLogFrameworkService;
    @Override
    protected void executeInternal(JobExecutionContext executionContext) throws JobExecutionException {
        // 第一步,获得 Job 数据
        Long jobId = executionContext.getMergedJobDataMap().getLong(JobDataKeyEnum.JOB_ID.name());
        String jobHandlerName = executionContext.getMergedJobDataMap().getString(JobDataKeyEnum.JOB_HANDLER_NAME.name());
        String jobHandlerParam = executionContext.getMergedJobDataMap().getString(JobDataKeyEnum.JOB_HANDLER_PARAM.name());
        int refireCount  = executionContext.getRefireCount();
        int retryCount = (Integer) executionContext.getMergedJobDataMap().getOrDefault(JobDataKeyEnum.JOB_RETRY_COUNT.name(), 0);
        int retryInterval = (Integer) executionContext.getMergedJobDataMap().getOrDefault(JobDataKeyEnum.JOB_RETRY_INTERVAL.name(), 0);
        // 第二步,执行任务
        Long jobLogId = null;
        LocalDateTime startTime = LocalDateTime.now();
        String data = null;
        Throwable exception = null;
        try {
            // 记录 Job 日志(初始)
            jobLogId = jobLogFrameworkService.createJobLog(jobId, startTime, jobHandlerName, jobHandlerParam, refireCount + 1);
            // 执行任务
            data = this.executeInternal(jobHandlerName, jobHandlerParam);
        } catch (Throwable ex) {
            exception = ex;
        }
        // 第三步,记录执行日志
        this.updateJobLogResultAsync(jobLogId, startTime, data, exception, executionContext);
        // 第四步,处理有异常的情况
        handleException(exception, refireCount, retryCount, retryInterval);
    }
    private String executeInternal(String jobHandlerName, String jobHandlerParam) throws Exception {
        // 获得 JobHandler 对象
        JobHandler jobHandler = applicationContext.getBean(jobHandlerName, JobHandler.class);
        Assert.notNull(jobHandler, "JobHandler 不会为空");
        // 执行任务
        return jobHandler.execute(jobHandlerParam);
    }
    private void updateJobLogResultAsync(Long jobLogId, LocalDateTime startTime, String data, Throwable exception,
                                         JobExecutionContext executionContext) {
        LocalDateTime endTime = LocalDateTime.now();
        // 处理是否成功
        boolean success = exception == null;
        if (!success) {
            data = getRootCauseMessage(exception);
        }
        // 更新日志
        try {
            jobLogFrameworkService.updateJobLogResultAsync(jobLogId, endTime, (int) LocalDateTimeUtil.between(startTime, endTime).toMillis(), success, data);
        } catch (Exception ex) {
            log.error("[executeInternal][Job({}) logId({}) 记录执行日志失败({}/{})]",
                    executionContext.getJobDetail().getKey(), jobLogId, success, data);
        }
    }
    private void handleException(Throwable exception,
                                 int refireCount, int retryCount, int retryInterval) throws JobExecutionException {
        // 如果有异常,则进行重试
        if (exception == null) {
            return;
        }
        // 情况一:如果到达重试上限,则直接抛出异常即可
        if (refireCount >= retryCount) {
            throw new JobExecutionException(exception);
        }
        // 情况二:如果未到达重试上限,则 sleep 一定间隔时间,然后重试
        // 这里使用 sleep 来实现,主要还是希望实现比较简单。因为,同一时间,不会存在大量失败的 Job。
        if (retryInterval > 0) {
            ThreadUtil.sleep(retryInterval);
        }
        // 第二个参数,refireImmediately = true,表示立即重试
        throw new JobExecutionException(exception, true);
    }
}

# executeInternal(String jobHandlerName, String jobHandlerParam)

功能:反射调用具体业务处理器,也就是具体的实现
流程

  1. 从 Spring 容器获取 JobHandler 实例
  2. 调用其 execute(jobHandlerParam) 方法执行业务逻辑

# 任务处理器

/**
 * <p> Project: scaffold - JobHandler </p>
 *
 * 任务处理器
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
public interface JobHandler {
    /**
     * 执行任务
     *
     * @param param 参数
     * @return 结果
     * @throws Exception 异常
     */
    String execute(String param) throws Exception;
}

# 任务具体实现例子

/**
 * <p> Project: scaffold - DemoJob </p>
 *
 * 演示作业
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
@Component
public class DemoJob implements JobHandler {
    @Resource
    private AdminUserMapper adminUserMapper;
    @Override
    @TenantJob
    public String execute(String param) {
        System.out.println("当前租户:" + TenantContextHolder.getTenantId());
        List<AdminUserDO> users = adminUserMapper.selectList();
        return "用户数量:" + users.size();
    }
}

# Scheduler 管理器,负责创建任务

package com.tz.scaffold.framework.quartz.core.scheduler;
/**
 * <p> Project: scaffold - SchedulerManager </p>
 *
 * {@link org.quartz.Scheduler} 的管理器,负责创建任务
 * <p>
 * 考虑到实现的简洁性,我们使用 jobHandlerName 作为唯一标识,即:
 * <li>
 *     1. Job 的 {@link JobDetail#getKey ()}
 * <li>
 *     2. Trigger 的 {@link Trigger#getKey ()}
 * <p>
 * 另外,jobHandlerName 对应到 Spring Bean 的名字,直接调用
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
public class SchedulerManager {
    private final Scheduler scheduler;
    public SchedulerManager(Scheduler scheduler) {
        this.scheduler = scheduler;
    }
    /**
     * 添加 Job 到 Quartz 中
     *
     * @param jobId 任务编号
     * @param jobHandlerName 任务处理器的名字
     * @param jobHandlerParam 任务处理器的参数
     * @param cronExpression CRON 表达式
     * @param retryCount 重试次数
     * @param retryInterval 重试间隔
     * @throws SchedulerException 添加异常
     */
    public void addJob(Long jobId, String jobHandlerName, String jobHandlerParam, String cronExpression,
                       Integer retryCount, Integer retryInterval)
            throws SchedulerException {
        validateScheduler();
        // 创建 JobDetail 对象
        JobDetail jobDetail = JobBuilder.newJob(JobHandlerInvoker.class)
                .usingJobData(JobDataKeyEnum.JOB_ID.name(), jobId)
                .usingJobData(JobDataKeyEnum.JOB_HANDLER_NAME.name(), jobHandlerName)
                .withIdentity(jobHandlerName).build();
        // 创建 Trigger 对象
        Trigger trigger = this.buildTrigger(jobHandlerName, jobHandlerParam, cronExpression, retryCount, retryInterval);
        // 新增调度
        scheduler.scheduleJob(jobDetail, trigger);
    }
    /**
     * 更新 Job 到 Quartz
     *
     * @param jobHandlerName 任务处理器的名字
     * @param jobHandlerParam 任务处理器的参数
     * @param cronExpression CRON 表达式
     * @param retryCount 重试次数
     * @param retryInterval 重试间隔
     * @throws SchedulerException 更新异常
     */
    public void updateJob(String jobHandlerName, String jobHandlerParam, String cronExpression,
                          Integer retryCount, Integer retryInterval)
            throws SchedulerException {
        validateScheduler();
        // 创建新 Trigger 对象
        Trigger newTrigger = this.buildTrigger(jobHandlerName, jobHandlerParam, cronExpression, retryCount, retryInterval);
        // 修改调度
        scheduler.rescheduleJob(new TriggerKey(jobHandlerName), newTrigger);
    }
    /**
     * 删除 Quartz 中的 Job
     *
     * @param jobHandlerName 任务处理器的名字
     * @throws SchedulerException 删除异常
     */
    public void deleteJob(String jobHandlerName) throws SchedulerException {
        validateScheduler();
        scheduler.deleteJob(new JobKey(jobHandlerName));
    }
    /**
     * 暂停 Quartz 中的 Job
     *
     * @param jobHandlerName 任务处理器的名字
     * @throws SchedulerException 暂停异常
     */
    public void pauseJob(String jobHandlerName) throws SchedulerException {
        validateScheduler();
        scheduler.pauseJob(new JobKey(jobHandlerName));
    }
    /**
     * 启动 Quartz 中的 Job
     *
     * @param jobHandlerName 任务处理器的名字
     * @throws SchedulerException 启动异常
     */
    public void resumeJob(String jobHandlerName) throws SchedulerException {
        validateScheduler();
        scheduler.resumeJob(new JobKey(jobHandlerName));
        scheduler.resumeTrigger(new TriggerKey(jobHandlerName));
    }
    /**
     * 立即触发一次 Quartz 中的 Job
     *
     * @param jobId 任务编号
     * @param jobHandlerName 任务处理器的名字
     * @param jobHandlerParam 任务处理器的参数
     * @throws SchedulerException 触发异常
     */
    public void triggerJob(Long jobId, String jobHandlerName, String jobHandlerParam)
            throws SchedulerException {
        validateScheduler();
        // 触发任务
        JobDataMap data = new JobDataMap(); // 无需重试,所以不设置 retryCount 和 retryInterval
        data.put(JobDataKeyEnum.JOB_ID.name(), jobId);
        data.put(JobDataKeyEnum.JOB_HANDLER_NAME.name(), jobHandlerName);
        data.put(JobDataKeyEnum.JOB_HANDLER_PARAM.name(), jobHandlerParam);
        scheduler.triggerJob(new JobKey(jobHandlerName), data);
    }
    private Trigger buildTrigger(String jobHandlerName, String jobHandlerParam, String cronExpression,
                                 Integer retryCount, Integer retryInterval) {
        return TriggerBuilder.newTrigger()
                .withIdentity(jobHandlerName)
                .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
                .usingJobData(JobDataKeyEnum.JOB_HANDLER_PARAM.name(), jobHandlerParam)
                .usingJobData(JobDataKeyEnum.JOB_RETRY_COUNT.name(), retryCount)
                .usingJobData(JobDataKeyEnum.JOB_RETRY_INTERVAL.name(), retryInterval)
                .build();
    }
    private void validateScheduler() {
        if (scheduler == null) {
            throw exception0(NOT_IMPLEMENTED.getCode(),
                    "[定时任务 - 已禁用][参考 https://www.tzzfj.cn/ 开启]");
        }
    }
}

# infar 基础设施模块

主要是提供对定时任务的图形化管理

controller

package com.tz.scaffold.module.infra.controller.admin.job;
/**
 * <p> Project: scaffold - JobController </p>
 *
 * 管理后台 - 定时任务
 * @author Tz
 * @date 2024/01/09 23:45
 * @version 1.0.0
 * @since 1.0.0
 */
@Tag(name = "管理后台 - 定时任务")
@RestController
@RequestMapping("/infra/job")
@Validated
public class JobController {
    @Resource
    private JobService jobService;
    @PostMapping("/create")
    @Operation(summary = "创建定时任务")
    @PreAuthorize("@ss.hasPermission('infra:job:create')")
    public CommonResult<Long> createJob(@Valid @RequestBody JobCreateReqVO createReqVO)
            throws SchedulerException {
        return success(jobService.createJob(createReqVO));
    }
    @PutMapping("/update")
    @Operation(summary = "更新定时任务")
    @PreAuthorize("@ss.hasPermission('infra:job:update')")
    public CommonResult<Boolean> updateJob(@Valid @RequestBody JobUpdateReqVO updateReqVO)
            throws SchedulerException {
        jobService.updateJob(updateReqVO);
        return success(true);
    }
    @PutMapping("/update-status")
    @Operation(summary = "更新定时任务的状态")
    @Parameters({
            @Parameter(name = "id", description = "编号", required = true, example = "1024"),
            @Parameter(name = "status", description = "状态", required = true, example = "1"),
    })
    @PreAuthorize("@ss.hasPermission('infra:job:update')")
    public CommonResult<Boolean> updateJobStatus(@RequestParam(value = "id") Long id, @RequestParam("status") Integer status)
            throws SchedulerException {
        jobService.updateJobStatus(id, status);
        return success(true);
    }
	@DeleteMapping("/delete")
    @Operation(summary = "删除定时任务")
    @Parameter(name = "id", description = "编号", required = true, example = "1024")
	@PreAuthorize("@ss.hasPermission('infra:job:delete')")
    public CommonResult<Boolean> deleteJob(@RequestParam("id") Long id)
            throws SchedulerException {
        jobService.deleteJob(id);
        return success(true);
    }
    @PutMapping("/trigger")
    @Operation(summary = "触发定时任务")
    @Parameter(name = "id", description = "编号", required = true, example = "1024")
    @PreAuthorize("@ss.hasPermission('infra:job:trigger')")
    public CommonResult<Boolean> triggerJob(@RequestParam("id") Long id) throws SchedulerException {
        jobService.triggerJob(id);
        return success(true);
    }
    @GetMapping("/get")
    @Operation(summary = "获得定时任务")
    @Parameter(name = "id", description = "编号", required = true, example = "1024")
    @PreAuthorize("@ss.hasPermission('infra:job:query')")
    public CommonResult<JobRespVO> getJob(@RequestParam("id") Long id) {
        JobDO job = jobService.getJob(id);
        return success(JobConvert.INSTANCE.convert(job));
    }
    @GetMapping("/list")
    @Operation(summary = "获得定时任务列表")
    @Parameter(name = "ids", description = "编号列表", required = true)
    @PreAuthorize("@ss.hasPermission('infra:job:query')")
    public CommonResult<List<JobRespVO>> getJobList(@RequestParam("ids") Collection<Long> ids) {
        List<JobDO> list = jobService.getJobList(ids);
        return success(JobConvert.INSTANCE.convertList(list));
    }
    @GetMapping("/page")
    @Operation(summary = "获得定时任务分页")
    @PreAuthorize("@ss.hasPermission('infra:job:query')")
    public CommonResult<PageResult<JobRespVO>> getJobPage(@Valid JobPageReqVO pageVO) {
        PageResult<JobDO> pageResult = jobService.getJobPage(pageVO);
        return success(JobConvert.INSTANCE.convertPage(pageResult));
    }
    @GetMapping("/export-excel")
    @Operation(summary = "导出定时任务 Excel")
    @PreAuthorize("@ss.hasPermission('infra:job:export')")
    @OperateLog(type = EXPORT)
    public void exportJobExcel(@Valid JobExportReqVO exportReqVO,
                               HttpServletResponse response) throws IOException {
        List<JobDO> list = jobService.getJobList(exportReqVO);
        // 导出 Excel
        List<JobExcelVO> datas = JobConvert.INSTANCE.convertList02(list);
        ExcelUtils.write(response, "定时任务.xls", "数据", JobExcelVO.class, datas);
    }
    @GetMapping("/get_next_times")
    @Operation(summary = "获得定时任务的下 n 次执行时间")
    @Parameters({
            @Parameter(name = "id", description = "编号", required = true, example = "1024"),
            @Parameter(name = "count", description = "数量", example = "5")
    })
    @PreAuthorize("@ss.hasPermission('infra:job:query')")
    public CommonResult<List<LocalDateTime>> getJobNextTimes(@RequestParam("id") Long id,
                                                   @RequestParam(value = "count", required = false, defaultValue = "5") Integer count) {
        JobDO job = jobService.getJob(id);
        if (job == null) {
            return success(Collections.emptyList());
        }
        return success(CronUtils.getNextTimes(job.getCronExpression(), count));
    }
}

在页面中点击 [更多 -> 调度日志] 按钮,可以査看到 定时任务 的执行日志