# scaffold 项目之消息队列基于本地内存实现

# 简介

消息队列 (Message Queue) 是一种进程间通信或同一进程内不同线程间的通信方式,它提供了异步的通信协议,允许发送者和接收者不需要同时与消息队列交互。

# 使用场景

# 特点

  • 解耦:生产者和消费者不需要知道彼此的存在
  • 异步:生产者发送消息后可以立即返回,不需要等待消费者处理
  • 削峰:在流量高峰时缓冲请求,避免系统过载
  • 可靠性:消息可以持久化,确保不会丢失
  • 扩展性:可以轻松增加消费者来提高处理能力

# 使用场景

  1. 订单处理系统:用户下单后,订单消息放入队列,由后台服务异步处理
  2. 日志收集:应用将日志发送到消息队列,由专门的日志处理服务消费
  3. 通知系统:用户行为触发通知,通过消息队列发送给通知服务
  4. 数据同步:不同系统间通过消息队列同步数据变更
  5. 任务调度:将定时任务放入队列,由工作节点消费执行

# 开始使用

本地实现的消息队列是基于 spring event

以【短信发送】举例子,演示 Spring Event 的使用

  1. Message 消息

    package com.tz.scaffold.module.system.mq.message.sms;
    /**
     * <p> Project: scaffold - SmsSendMessage </p>
     *
     * 短信发送消息
     * @author Tz
     * @date 2024/01/09 23:45
     * @version 1.0.0
     * @since 1.0.0
     */
    @Data
    public class SmsSendMessage {
        /**
         * 短信日志编号
         */
        @NotNull(message = "短信日志编号不能为空")
        private Long logId;
        /**
         * 手机号
         */
        @NotNull(message = "手机号不能为空")
        private String mobile;
        /**
         * 短信渠道编号
         */
        @NotNull(message = "短信渠道编号不能为空")
        private Long channelId;
        /**
         * 短信 API 的模板编号
         */
        @NotNull(message = "短信 API 的模板编号不能为空")
        private String apiTemplateId;
        /**
         * 短信模板参数
         */
        private List<KeyValue<String, Object>> templateParams;
    }
  2. SmsProducer 生产者

    package com.tz.scaffold.module.system.mq.producer.sms;
    /**
     * <p> Project: scaffold - SmsProducer </p>
     *
     * Sms 短信相关消息的 Producer
     * @author Tz
     * @date 2024/01/09 23:45
     * @version 1.0.0
     * @since 1.0.0
     */
    @Slf4j
    @Component
    public class SmsProducer {
        @Resource
        private ApplicationContext applicationContext;
        /**
         * 发送 {@link SmsSendMessage} 消息
         *
         * @param logId 短信日志编号
         * @param mobile 手机号
         * @param channelId 渠道编号
         * @param apiTemplateId 短信模板编号
         * @param templateParams 短信模板参数
         */
        public void sendSmsSendMessage(Long logId, String mobile,
                                       Long channelId, String apiTemplateId, List<KeyValue<String, Object>> templateParams) {
            SmsSendMessage message = new SmsSendMessage().setLogId(logId).setMobile(mobile);
            message.setChannelId(channelId).setApiTemplateId(apiTemplateId).setTemplateParams(templateParams);
            applicationContext.publishEvent(message);
        }
    }
  3. SmsSendConsumer 消费者

    package com.tz.scaffold.module.system.mq.consumer.sms;
    import com.tz.scaffold.module.system.mq.message.sms.SmsSendMessage;
    import com.tz.scaffold.module.system.service.sms.SmsSendService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.event.EventListener;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Component;
    import javax.annotation.Resource;
    /**
     * <p> Project: scaffold - SmsSendConsumer </p>
     *
     * 针对 {@link SmsSendMessage} 的消费者
     * @author Tz
     * @date 2024/01/09 23:45
     * @version 1.0.0
     * @since 1.0.0
     */
    @Component
    @Slf4j
    public class SmsSendConsumer {
        @Resource
        private SmsSendService smsSendService;
        /**
         * Async: Spring Event 默认在 Producer 发送的线程,通过 @Async 实现异步
         * @param message
         */
        @EventListener
        @Async
        public void onMessage(SmsSendMessage message) {
            log.info("[onMessage][消息内容({})]", message);
            smsSendService.doSendSms(message);
        }
    }

# 测试

调用 SmsProducer.sendSmsSendMessage(...) 方法就可以看到效果