百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

RabbitMQ实现延迟消息居然如此简单,整个插件就完事了

ztj100 2024-11-23 00:04 15 浏览 0 评论

RabbitMQ实现延迟消息的方式有两种,一种是使用死信队列实现,另一种是使用延迟插件实现。死信队列实现我们以前曾经讲过这次我们讲个更简单的,使用延迟插件实现。

插件安装

首先我们需要下载并安装RabbitMQ的延迟插件。

  • 去RabbitMQ的官网下载插件,插件地址:https://www.rabbitmq.com/community-plugins.html
  • 直接搜索rabbitmq_delayed_message_exchange即可找到我们需要下载的插件,下载和RabbitMQ配套的版本,不要弄错;
  • 将插件文件复制到RabbitMQ安装目录的plugins目录下;
    • 进入RabbitMQ安装目录的sbin目录下,使用如下命令启用延迟插件;
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    • 启用插件成功后就可以看到如下信息,之后重新启动RabbitMQ服务即可。

    实现延迟消息

    接下来我们需要在SpringBoot中实现延迟消息功能,这次依然沿用商品下单的场景。比如说有个用户下单了,他60分钟不支付订单,订单就会被取消,这就是一个典型的延迟消息使用场景。

    • 首先我们需要在pom.xml文件中添加AMQP相关依赖;
    <!--消息队列相关依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    • 之后在application.yml添加RabbitMQ的相关配置;
    spring:
      rabbitmq:
        host: localhost # rabbitmq的连接地址
        port: 5672 # rabbitmq的连接端口号
        virtual-host: /mall # rabbitmq的虚拟host
        username: mall # rabbitmq的用户名
        password: mall # rabbitmq的密码
        publisher-confirms: true #如果对异步消息需要回调必须设置为true
    • 接下来创建RabbitMQ的Java配置,主要用于配置交换机、队列和绑定关系;
    /**
     * 消息队列配置
     * Created by macro on 2018/9/14.
     */
    @Configuration
    public class RabbitMqConfig {
    
        /**
         * 订单延迟插件消息队列所绑定的交换机
         */
        @Bean
        CustomExchange  orderPluginDirect() {
            //创建一个自定义交换机,可以发送延迟消息
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), "x-delayed-message",true, false,args);
        }
    
        /**
         * 订单延迟插件队列
         */
        @Bean
        public Queue orderPluginQueue() {
            return new Queue(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getName());
        }
    
        /**
         * 将订单延迟插件队列绑定到交换机
         */
        @Bean
        public Binding orderPluginBinding(CustomExchange orderPluginDirect,Queue orderPluginQueue) {
            return BindingBuilder
                    .bind(orderPluginQueue)
                    .to(orderPluginDirect)
                    .with(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey())
                    .noargs();
        }
    
    }
    • 创建一个取消订单消息的发出者,通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间;
    /**
     * 取消订单消息的发出者
     * Created by macro on 2018/9/14.
     */
    @Component
    public class CancelOrderSender {
        private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void sendMessage(Long orderId,final long delayTimes){
            //给延迟队列发送消息
            amqpTemplate.convertAndSend(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //给消息设置延迟毫秒值
                    message.getMessageProperties().setHeader("x-delay",delayTimes);
                    return message;
                }
            });
            LOGGER.info("send delay message orderId:{}",orderId);
        }
    }
    • 创建一个取消订单消息的接收者,用于处理订单延迟插件队列中的消息。
    /**
     * 取消订单消息的处理者
     * Created by macro on 2018/9/14.
     */
    @Component
    @RabbitListener(queues = "mall.order.cancel.plugin")
    public class CancelOrderReceiver {
        private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
        @Autowired
        private OmsPortalOrderService portalOrderService;
        @RabbitHandler
        public void handle(Long orderId){
            LOGGER.info("receive delay message orderId:{}",orderId);
            portalOrderService.cancelOrder(orderId);
        }
    }
    • 然后在我们的订单业务实现类中添加如下逻辑,当下单成功之前,往消息队列中发送一个取消订单的延迟消息,这样如果订单没有被支付的话,就能取消订单了;
    /**
     * 前台订单管理Service
     * Created by macro on 2018/8/30.
     */
    @Service
    public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
        private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);
        @Autowired
        private CancelOrderSender cancelOrderSender;
    
        @Override
        public CommonResult generateOrder(OrderParam orderParam) {
            //todo 执行一系类下单操作,具体参考mall项目
            LOGGER.info("process generateOrder");
            //下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成)
            sendDelayMessageCancelOrder(11L);
            return CommonResult.success(null, "下单成功");
        }
    
        @Override
        public void cancelOrder(Long orderId) {
            //todo 执行一系类取消订单操作,具体参考mall项目
            LOGGER.info("process cancelOrder orderId:{}",orderId);
        }
    
        private void sendDelayMessageCancelOrder(Long orderId) {
            //获取订单超时时间,假设为60分钟(测试用的30秒)
            long delayTimes = 30 * 1000;
            //发送延迟消息
            cancelOrderSender.sendMessage(orderId, delayTimes);
        }
    
    }
    • 启动项目后,在Swagger中调用下单接口;
  • 调用完成后查看控制台日志可以发现,从消息发送和消息接收处理正好相差了30s,我们设置的延迟时间。
  • 2020-06-08 13:46:01.474  INFO 1644 --- [nio-8080-exec-1] c.m.m.t.s.i.OmsPortalOrderServiceImpl    : process generateOrder
    2020-06-08 13:46:01.482  INFO 1644 --- [nio-8080-exec-1] c.m.m.tiny.component.CancelOrderSender   : send delay message orderId:11
    2020-06-08 13:46:31.517  INFO 1644 --- [cTaskExecutor-4] c.m.m.t.component.CancelOrderReceiver    : receive delay message orderId:11
    2020-06-08 13:46:31.520  INFO 1644 --- [cTaskExecutor-4] c.m.m.t.s.i.OmsPortalOrderServiceImpl    : process cancelOrder orderId:11

    两种实现方式对比

    我们之前使用过死信队列的方式,这里我们把两种方式做个对比,先来聊下这两种方式的实现原理。

    死信队列

    死信队列是这样一个队列,如果消息发送到该队列并超过了设置的时间,就会被转发到设置好的处理超时消息的队列当中去,利用该特性可以实现延迟消息。

    延迟插件

    通过安装插件,自定义交换机,让交换机拥有延迟发送消息的能力,从而实现延迟消息。

    结论

    由于死信队列方式需要创建两个交换机(死信队列交换机+处理队列交换机)、两个队列(死信队列+处理队列),而延迟插件方式只需创建一个交换机和一个队列,所以后者使用起来更简单。

    项目源码地址

    转发+关注,然后私信我关键字 “项目” 即可获得【项目源码地址】的免费领取方式!

    相关推荐

    30天学会Python编程:16. Python常用标准库使用教程

    16.1collections模块16.1.1高级数据结构16.1.2示例...

    强烈推荐!Python 这个宝藏库 re 正则匹配

    Python的re模块(RegularExpression正则表达式)提供各种正则表达式的匹配操作。...

    Python爬虫中正则表达式的用法,只讲如何应用,不讲原理

    Python爬虫:正则的用法(非原理)。大家好,这节课给大家讲正则的实际用法,不讲原理,通俗易懂的讲如何用正则抓取内容。·导入re库,这里是需要从html这段字符串中提取出中间的那几个文字。实例一个对...

    Python数据分析实战-正则提取文本的URL网址和邮箱(源码和效果)

    实现功能:Python数据分析实战-利用正则表达式提取文本中的URL网址和邮箱...

    python爬虫教程之爬取当当网 Top 500 本五星好评书籍

    我们使用requests和re来写一个爬虫作为一个爱看书的你(说的跟真的似的)怎么能发现好书呢?所以我们爬取当当网的前500本好五星评书籍怎么样?ok接下来就是学习python的正确姿...

    深入理解re模块:Python中的正则表达式神器解析

    在Python中,"re"是一个强大的模块,用于处理正则表达式(regularexpressions)。正则表达式是一种强大的文本模式匹配工具,用于在字符串中查找、替换或提取特定模式...

    如何使用正则表达式和 Python 匹配不以模式开头的字符串

    需要在Python中使用正则表达式来匹配不以给定模式开头的字符串吗?如果是这样,你可以使用下面的语法来查找所有的字符串,除了那些不以https开始的字符串。r"^(?!https).*&...

    先Mark后用!8分钟读懂 Python 性能优化

    从本文总结了Python开发时,遇到的性能优化问题的定位和解决。概述:性能优化的原则——优化需要优化的部分。性能优化的一般步骤:首先,让你的程序跑起来结果一切正常。然后,运行这个结果正常的代码,看看它...

    Python“三步”即可爬取,毋庸置疑

    声明:本实例仅供学习,切忌遵守robots协议,请不要使用多线程等方式频繁访问网站。#第一步导入模块importreimportrequests#第二步获取你想爬取的网页地址,发送请求,获取网页内...

    简单学Python——re库(正则表达式)2(split、findall、和sub)

    1、split():分割字符串,返回列表语法:re.split('分隔符','目标字符串')例如:importrere.split(',','...

    Lavazza拉瓦萨再度牵手上海大师赛

    阅读此文前,麻烦您点击一下“关注”,方便您进行讨论和分享。Lavazza拉瓦萨再度牵手上海大师赛标题:2024上海大师赛:网球与咖啡的浪漫邂逅在2024年的上海劳力士大师赛上,拉瓦萨咖啡再次成为官...

    ArkUI-X构建Android平台AAR及使用

    本教程主要讲述如何利用ArkUI-XSDK完成AndroidAAR开发,实现基于ArkTS的声明式开发范式在android平台显示。包括:1.跨平台Library工程开发介绍...

    Deepseek写歌详细教程(怎样用deepseek写歌功能)

    以下为结合DeepSeek及相关工具实现AI写歌的详细教程,涵盖作词、作曲、演唱全流程:一、核心流程三步法1.AI生成歌词-打开DeepSeek(网页/APP/API),使用结构化提示词生成歌词:...

    “AI说唱解说影视”走红,“零基础入行”靠谱吗?本报记者实测

    “手里翻找冻鱼,精心的布局;老漠却不言语,脸上带笑意……”《狂飙》剧情被写成歌词,再配上“科目三”背景音乐的演唱,这段1分钟30秒的视频受到了无数网友的点赞。最近一段时间随着AI技术的发展,说唱解说影...

    AI音乐制作神器揭秘!3款工具让你秒变高手

    在音乐创作的领域里,每个人都有一颗想要成为大师的心。但是面对复杂的乐理知识和繁复的制作过程,许多人的热情被一点点消磨。...

    取消回复欢迎 发表评论: