博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringBoot+RabbitMq实现延时消息队列
阅读量:5879 次
发布时间:2019-06-19

本文共 5575 字,大约阅读时间需要 18 分钟。

背景:
在一些应用场景中,程序并不需要同步执行,例如用户注册之后的邮件或者短信通知提醒。这种场景的实现则是在当前线程,开启一个新线 程,当前线程在开启新线程之后会继续往下执行,无需等待新线程执行完成。   但例如一些需要延时的场景则不只是开启新线程执行如此简单了。譬如提交订单后在15分钟内没有完成支付,订单需要关闭,这种情 况,是否只开启一个异步线程就不适用了呢。

那么就单单实现在提交订单后的15分钟内,如果没有完成支付,系统关闭订单。有哪些可行的方案呢。


方案:
  1. 使用定时任务轮询订单表,查询出订单创建了15分钟以上并且未支付的订单,如果有查询出此类订单则执行关闭。

    缺点:假设每1分钟轮询一次,则会存在秒级误差,如果秒级轮询,则会极其消耗性能,影响程序的健壮性。
  2. 提交订单时开启一个新线程,而新线程直接休眠15分钟,休眠结束后开始执行订单关闭

    缺点:如果在线程休眠时,重启了整个服务,那么会怎样呢?
  3. 使用延时消息队列

    缺点:需要额外部署消息中间件

综上考虑:使用延时消息队列则成为最佳选择,消息延时发布之后,保存在消息中间件中,在15分钟后才会正式发布至队列,延时队列监听器在15分钟后监听到消息时,才开始执行,而这期间,即使项目重启也没有关系。


以springboot为基础框架,集成rabbitmq实现延时队列

注意:这里不采用网上流传的死信队列转发,而是采用rabbitmq3.7+版本的延时队列插件,所以务必安装3.7+版本并启用延时队列插件。

增加amqp依赖

org.springframework.boot
spring-boot-starter-parent
1.5.4.RELEASE
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-amqp

修改application.yml文件,配置rabbitmq,并且开启消息的手动应答

spring:    rabbitmq:        host: 127.0.0.1        port: 5672        username: admin        password: admin        listener:            direct:                acknowledge-mode: MANUAL            simple:                acknowledge-mode: MANUAL

配置队列,路由,交换机

package cn.rongyuan.config;import java.util.HashMap;import java.util.Map;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitAdmin;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @title rabbitmq配置类 * @author zengzp * @time 2018年8月20日 上午10:46:43 * @Description  */@Configurationpublic class RabbitConfig {        // 支付超时延时交换机    public static final String Delay_Exchange_Name = "delay.exchange";    // 超时订单关闭队列    public static final String Timeout_Trade_Queue_Name = "close_trade";        @Bean    public Queue delayPayQueue() {        return new Queue(RabbitConfig.Timeout_Trade_Queue_Name, true);    }            // 定义广播模式的延时交换机 无需绑定路由    @Bean    FanoutExchange delayExchange(){        Map
args = new HashMap
(); args.put("x-delayed-type", "direct"); FanoutExchange topicExchange = new FanoutExchange(RabbitConfig.Delay_Exchange_Name, true, false, args); topicExchange.setDelayed(true); return topicExchange; } // 绑定延时队列与交换机 @Bean public Binding delayPayBind() { return BindingBuilder.bind(delayPayQueue()).to(delayExchange()); } // 定义消息转换器 @Bean Jackson2JsonMessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } // 定义消息模板用于发布消息,并且设置其消息转换器 @Bean RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jsonMessageConverter()); return rabbitTemplate; } @Bean RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); }}

在提交订单时发布消息至延时队列并且指定延时时长

@Autowired    RabbitTemplate rabbitTemplate;    // 通过广播模式发布延时消息 延时30分钟 持久化消息 消费后销毁 这里无需指定路由,会广播至每个绑定此交换机的队列    rabbitTemplate.convertAndSend(RabbitConfig.Delay_Exchange_Name, "", trade.getTradeCode(), message ->{        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);        message.getMessageProperties().setDelay(30 * (60*1000));   // 毫秒为单位,指定此消息的延时时长        return message;    });

消费端监听延时队列

package cn.rongyuan.mq.consumer;import java.io.IOException;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;import cn.rongyuan.config.RabbitConfig;import cn.rongyuan.service.TradeService;import cn.rongyuan.util.ExceptionUtil;/** * @title 消息消费端 * @author zengzp * @time 2018年8月20日 上午11:00:26 * @Description  */@Componentpublic class PayTimeOutConsumer {        @Autowired    TradeService tradeService;        private Logger logger = LoggerFactory.getLogger(getClass());        @RabbitListener(queues = RabbitConfig.Timeout_Trade_Queue_Name)    public void process(String tradeCode, Message message, Channel channel) throws IOException{        try {            logger.info("开始执行订单[{}]的支付超时订单关闭......", tradeCode);            tradeService.cancelTrade(tradeCode);            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);            logger.info("超时订单处理完毕");        } catch (Exception e) {            logger.error("超时订单处理失败:{}", ExceptionUtil.getMessage(e));            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);        }     }}

参考资料:
1、spring amqp 官方文档:https://docs.spring.io/spring-amqp/docs/2.0.0.M2/reference/htmlsingle/#delayed-message-exchange  2、rabbitmq 官方文档:http://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/

转载地址:http://xccix.baihongyu.com/

你可能感兴趣的文章
zngnqfxtuubuosmo
查看>>
R语言低级绘图函数-abline
查看>>
虚拟机配置
查看>>
【JQuery Easy UI】后台管理系统的简单布局分享
查看>>
132、Android安全机制(2) Android Permission权限控制机制(转载)
查看>>
Linux tree命令
查看>>
web测试方法总结
查看>>
在Hadoop1.2.1上运行第一个Hadoop程序FileSystemCat
查看>>
Android 聊天表情输入、表情翻页带效果、下拉刷新聊天记录
查看>>
mysql insert锁机制【转】
查看>>
x86内存映射
查看>>
【中文分词】DAG、DP、HMM、Viterbi
查看>>
当你买了一辆全车搭载Android操作系统的某侠电动汽车以后
查看>>
angularjs自定义指令Directive
查看>>
kbmmw 5.02发布
查看>>
杭电1285确定比赛名次
查看>>
BZOJ 2982 combination Lucas定理
查看>>
[sqoop] sqoop2 使用
查看>>
js延时函数setTimeout
查看>>
新手学JavaScript都要学什么?
查看>>