百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

RabbitMQ实现延迟消息居然如此简单,整个插件就完事了

ztj100 2024-11-23 00:04 26 浏览 0 评论

RabbitMQ实现延迟消息的方式有两种,一种是使用死信队列实现,另一种是使用延迟插件实现。死信队列实现我们以前曾经讲过这次我们讲个更简单的,使用延迟插件实现。

插件安装

首先我们需要下载并安装RabbitMQ的延迟插件。

  • 去RabbitMQ的官网下载插件,插件地址:https://www.rabbitmq.com/community-plugins.html
  • 直接搜索rabbitmq_delayed_message_exchange即可找到我们需要下载的插件,下载和RabbitMQ配套的版本,不要弄错;
  • 将插件文件复制到RabbitMQ安装目录的plugins目录下;
    • 进入RabbitMQ安装目录的sbin目录下,使用如下命令启用延迟插件;
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    • 启用插件成功后就可以看到如下信息,之后重新启动RabbitMQ服务即可。

    实现延迟消息

    接下来我们需要在SpringBoot中实现延迟消息功能,这次依然沿用商品下单的场景。比如说有个用户下单了,他60分钟不支付订单,订单就会被取消,这就是一个典型的延迟消息使用场景。

    • 首先我们需要在pom.xml文件中添加AMQP相关依赖;
    <!--消息队列相关依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    • 之后在application.yml添加RabbitMQ的相关配置;
    spring:
      rabbitmq:
        host: localhost # rabbitmq的连接地址
        port: 5672 # rabbitmq的连接端口号
        virtual-host: /mall # rabbitmq的虚拟host
        username: mall # rabbitmq的用户名
        password: mall # rabbitmq的密码
        publisher-confirms: true #如果对异步消息需要回调必须设置为true
    • 接下来创建RabbitMQ的Java配置,主要用于配置交换机、队列和绑定关系;
    /**
     * 消息队列配置
     * Created by macro on 2018/9/14.
     */
    @Configuration
    public class RabbitMqConfig {
    
        /**
         * 订单延迟插件消息队列所绑定的交换机
         */
        @Bean
        CustomExchange  orderPluginDirect() {
            //创建一个自定义交换机,可以发送延迟消息
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), "x-delayed-message",true, false,args);
        }
    
        /**
         * 订单延迟插件队列
         */
        @Bean
        public Queue orderPluginQueue() {
            return new Queue(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getName());
        }
    
        /**
         * 将订单延迟插件队列绑定到交换机
         */
        @Bean
        public Binding orderPluginBinding(CustomExchange orderPluginDirect,Queue orderPluginQueue) {
            return BindingBuilder
                    .bind(orderPluginQueue)
                    .to(orderPluginDirect)
                    .with(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey())
                    .noargs();
        }
    
    }
    • 创建一个取消订单消息的发出者,通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间;
    /**
     * 取消订单消息的发出者
     * Created by macro on 2018/9/14.
     */
    @Component
    public class CancelOrderSender {
        private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void sendMessage(Long orderId,final long delayTimes){
            //给延迟队列发送消息
            amqpTemplate.convertAndSend(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //给消息设置延迟毫秒值
                    message.getMessageProperties().setHeader("x-delay",delayTimes);
                    return message;
                }
            });
            LOGGER.info("send delay message orderId:{}",orderId);
        }
    }
    • 创建一个取消订单消息的接收者,用于处理订单延迟插件队列中的消息。
    /**
     * 取消订单消息的处理者
     * Created by macro on 2018/9/14.
     */
    @Component
    @RabbitListener(queues = "mall.order.cancel.plugin")
    public class CancelOrderReceiver {
        private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
        @Autowired
        private OmsPortalOrderService portalOrderService;
        @RabbitHandler
        public void handle(Long orderId){
            LOGGER.info("receive delay message orderId:{}",orderId);
            portalOrderService.cancelOrder(orderId);
        }
    }
    • 然后在我们的订单业务实现类中添加如下逻辑,当下单成功之前,往消息队列中发送一个取消订单的延迟消息,这样如果订单没有被支付的话,就能取消订单了;
    /**
     * 前台订单管理Service
     * Created by macro on 2018/8/30.
     */
    @Service
    public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
        private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);
        @Autowired
        private CancelOrderSender cancelOrderSender;
    
        @Override
        public CommonResult generateOrder(OrderParam orderParam) {
            //todo 执行一系类下单操作,具体参考mall项目
            LOGGER.info("process generateOrder");
            //下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)
            sendDelayMessageCancelOrder(11L);
            return CommonResult.success(null, "下单成功");
        }
    
        @Override
        public void cancelOrder(Long orderId) {
            //todo 执行一系类取消订单操作,具体参考mall项目
            LOGGER.info("process cancelOrder orderId:{}",orderId);
        }
    
        private void sendDelayMessageCancelOrder(Long orderId) {
            //获取订单超时时间,假设为60分钟(测试用的30秒)
            long delayTimes = 30 * 1000;
            //发送延迟消息
            cancelOrderSender.sendMessage(orderId, delayTimes);
        }
    
    }
    • 启动项目后,在Swagger中调用下单接口;
  • 调用完成后查看控制台日志可以发现,从消息发送和消息接收处理正好相差了30s,我们设置的延迟时间。
  • 2020-06-08 13:46:01.474  INFO 1644 --- [nio-8080-exec-1] c.m.m.t.s.i.OmsPortalOrderServiceImpl    : process generateOrder
    2020-06-08 13:46:01.482  INFO 1644 --- [nio-8080-exec-1] c.m.m.tiny.component.CancelOrderSender   : send delay message orderId:11
    2020-06-08 13:46:31.517  INFO 1644 --- [cTaskExecutor-4] c.m.m.t.component.CancelOrderReceiver    : receive delay message orderId:11
    2020-06-08 13:46:31.520  INFO 1644 --- [cTaskExecutor-4] c.m.m.t.s.i.OmsPortalOrderServiceImpl    : process cancelOrder orderId:11

    两种实现方式对比

    我们之前使用过死信队列的方式,这里我们把两种方式做个对比,先来聊下这两种方式的实现原理。

    死信队列

    死信队列是这样一个队列,如果消息发送到该队列并超过了设置的时间,就会被转发到设置好的处理超时消息的队列当中去,利用该特性可以实现延迟消息。

    延迟插件

    通过安装插件,自定义交换机,让交换机拥有延迟发送消息的能力,从而实现延迟消息。

    结论

    由于死信队列方式需要创建两个交换机(死信队列交换机+处理队列交换机)、两个队列(死信队列+处理队列),而延迟插件方式只需创建一个交换机和一个队列,所以后者使用起来更简单。

    项目源码地址

    转发+关注,然后私信我关键字 “项目” 即可获得【项目源码地址】的免费领取方式!

    相关推荐

    sharding-jdbc实现`分库分表`与`读写分离`

    一、前言本文将基于以下环境整合...

    三分钟了解mysql中主键、外键、非空、唯一、默认约束是什么

    在数据库中,数据表是数据库中最重要、最基本的操作对象,是数据存储的基本单位。数据表被定义为列的集合,数据在表中是按照行和列的格式来存储的。每一行代表一条唯一的记录,每一列代表记录中的一个域。...

    MySQL8行级锁_mysql如何加行级锁

    MySQL8行级锁版本:8.0.34基本概念...

    mysql使用小技巧_mysql使用入门

    1、MySQL中有许多很实用的函数,好好利用它们可以省去很多时间:group_concat()将取到的值用逗号连接,可以这么用:selectgroup_concat(distinctid)fr...

    MySQL/MariaDB中如何支持全部的Unicode?

    永远不要在MySQL中使用utf8,并且始终使用utf8mb4。utf8mb4介绍MySQL/MariaDB中,utf8字符集并不是对Unicode的真正实现,即不是真正的UTF-8编码,因...

    聊聊 MySQL Server 可执行注释,你懂了吗?

    前言MySQLServer当前支持如下3种注释风格:...

    MySQL系列-源码编译安装(v5.7.34)

    一、系统环境要求...

    MySQL的锁就锁住我啦!与腾讯大佬的技术交谈,是我小看它了

    对酒当歌,人生几何!朝朝暮暮,唯有己脱。苦苦寻觅找工作之间,殊不知今日之事乃我心之痛,难道是我不配拥有工作嘛。自面试后他所谓的等待都过去一段时日,可惜在下京东上的小金库都要见低啦。每每想到不由心中一...

    MySQL字符问题_mysql中字符串的位置

    中文写入乱码问题:我输入的中文编码是urf8的,建的库是urf8的,但是插入mysql总是乱码,一堆"???????????????????????"我用的是ibatis,终于找到原因了,我是这么解决...

    深圳尚学堂:mysql基本sql语句大全(三)

    数据开发-经典1.按姓氏笔画排序:Select*FromTableNameOrderByCustomerNameCollateChinese_PRC_Stroke_ci_as//从少...

    MySQL进行行级锁的?一会next-key锁,一会间隙锁,一会记录锁?

    大家好,是不是很多人都对MySQL加行级锁的规则搞的迷迷糊糊,一会是next-key锁,一会是间隙锁,一会又是记录锁。坦白说,确实还挺复杂的,但是好在我找点了点规律,也知道如何如何用命令分析加...

    一文讲清怎么利用Python Django实现Excel数据表的导入导出功能

    摘要:Python作为一门简单易学且功能强大的编程语言,广受程序员、数据分析师和AI工程师的青睐。本文系统讲解了如何使用Python的Django框架结合openpyxl库实现Excel...

    用DataX实现两个MySQL实例间的数据同步

    DataXDataX使用Java实现。如果可以实现数据库实例之间准实时的...

    MySQL数据库知识_mysql数据库基础知识

    MySQL是一种关系型数据库管理系统;那废话不多说,直接上自己以前学习整理文档:查看数据库命令:(1).查看存储过程状态:showprocedurestatus;(2).显示系统变量:show...

    如何为MySQL中的JSON字段设置索引

    背景MySQL在2015年中发布的5.7.8版本中首次引入了JSON数据类型。自此,它成了一种逃离严格列定义的方式,可以存储各种形状和大小的JSON文档,例如审计日志、配置信息、第三方数据包、用户自定...

    取消回复欢迎 发表评论: