百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

RabbitMQ非常实用技巧,动态调整消息并发处理能力

ztj100 2024-11-23 00:04 24 浏览 0 评论

1. 简介

RabbitMQ 是一个开源的消息代理和队列服务器,用于通过轻量级和可靠的消息传递,在服务器之间进行通信。在Spring Boot项目中我们一般都是通过@RabbitListener进行消息监听。可以通过配置消息监听器并发数来提高系统的消息处理能力。

在实际应用中,根据业务场景的不同,我们可能需要动态调整 RabbitMQ 消息监听的并发数。例如,当RabbitMQ消息积压过多时,这时候我们就可以考虑通过动态调整并发数,以提高消息处理速度;而在系统自身负载过高时,这时候可以考虑通过减少并发数来减轻系统的整体压力。本篇文章将通过具体的示例来展示如何调整运行中消息监听处理器的并发数。

注意:动态调整并发监听数还可以帮助我们更好地控制系统的稳定性和可靠性。通过实时监测系统的负载情况和消息处理速度,我们可以及时发现潜在的问题并进行调整,从而确保系统的正常运行。

2. 实战案例

依赖管理

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.1 配置管理

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtualHost: test
    publisherConfirmType: correlated
    publisherReturns: true
    listener: 
      simple:
        # 手动应答
        acknowledgeMode: manual
        concurrency: 2
        max-concurrency: 2

2.3 创建交换机及队列

通过管理界面创建交换机及队列。

  • 交换机名:test.exchange类型为topic
  • 队列名: test
  • 将交换机与队列进行绑定路由key:akf.#


2.4 消息队列准备消息

通过如下接口,先往队列中插入100条消息

@Resource
private RabbitTemplate rabbitTemplate ;


@GetMapping("/send")
public String send() {
  new Thread(() -> {
    for (int i = 0; i < 100; i++) {
      rabbitTemplate.convertAndSend("test.exchange", "akf.a", "message - " + i) ;
    }
  }).start() ;
  return "success" ;
}

2.5 消息监听器

@RabbitListener(queues = "test")
public void listener1(String message) {
  System.out.printf("%s - 接收到消息:%s%n", Thread.currentThread().getName(), message) ;
  try {
    TimeUnit.SECONDS.sleep(2) ;
  } catch (InterruptedException e) {}
}

2.6 测试

测试上面的消息监听器是正常的

2.7 调整并发数

在上一步的测试中我们发现控制台打印的始终是一个线程在执行消息处理。但是在一开始的配置文件中我们将concurrency属性设置的为2,起码这里应该是2个线程交替执行才对,这是为什么呢?

Spring监听RabbitMQ的消息时默认并不是一条一条的从RabbitMQ中去,是一次预期一批数据,这一批消费完后才进行下一批的获取,默认预期250条。而我们向队列中存入的数据才100条,所以控制台中你只能看到一个线程打印,因为你没有足够的消息供其它线程去获取处理。我们可以通过如下配置进行预期数的设置:

spring:
  rabbitmq:
    listener: 
      simple:
        prefetch: 5

重新启动服务,测试如下

2个线程交替执行;接下来该如何实现动态调整并发数呢?

首先,修改消息监听器配置

@RabbitListener(id = "test-queue", queues = "test", ackMode = "AUTO")
public void listener1(String message) {
  // ...
}

id: 这里最好是设置唯一的id值,我们是要通过该id值来获取当前队列的消息监听容器。
ackMode: AUTO 这里设置的应答模式,用来覆盖配置文件中的设置。

其次,通过RabbitListenerEndpointRegistry操作

@Resource
private RabbitListenerEndpointRegistry registry ;


@GetMapping("/modify/{count}")
public Object modify(@PathVariable("count") Integer count) {
  // 这里通过id获取对应的队列监听器;所以上面一定要定义唯一的id值
  MessageListenerContainer listenerContainer = registry.getListenerContainer("test-queue") ;
  if (listenerContainer instanceof SimpleMessageListenerContainer container) {
    container.setConcurrentConsumers(count) ;
  }
  return String.format("并发接收消息:%d%n", count) ;
}


最后,测试

首先将服务启动,控制输出如下(当前只有2个线程处理)

目前只有2个线程

调用上面的接口修改并发数为3个后,控制台输出

成功增加了一个消费者线程。

接下来再测试,如果修改的数量大于最大数(spring.rabbitmq.listener.simple.max-concurrency)

控制台抛出如下异常

不能超过最大数;再看看调小是否可以

可以动态调小。

我们也可以对消息监听器进行暂停消费和重新启动消息监听,这里就不在演示了,非常简单调用相应start/stop即可。

总结:在 Spring Boot 中动态调整 RabbitMQ 消息监听的并发数是一个重要的优化手段。通过合理设置并发数并根据系统负载情况进行动态调整,我们可以提高消息处理效率、节省系统资源、确保系统的稳定性和可靠性。在实际应用中,我们应该根据具体的业务场景和需求来选择合适的并发数调整策略,以达到最佳的性能和效果。

微信公众号:Spring全家桶实战案例源码

相关推荐

告别手动操作:一键多工作表合并的实用方法

通常情况下,我们需要将同一工作簿内不同工作表中的数据进行合并处理。如何快速有效地完成这些数据的整合呢?这主要取决于需要合并的源数据的结构。...

【MySQL技术专题】「优化技术系列」常用SQL的优化方案和技术思路

概述前面我们介绍了MySQL中怎么样通过索引来优化查询。日常开发中,除了使用查询外,我们还会使用一些其他的常用SQL,比如INSERT、GROUPBY等。对于这些SQL语句,我们该怎么样进行优化呢...

9.7寸视网膜屏原道M9i双系统安装教程

泡泡网平板电脑频道4月17日原道M9i采用Win8安卓双系统,对于喜欢折腾的朋友来说,刷机成了一件难事,那么原道M9i如何刷机呢?下面通过详细地图文,介绍原道M9i的刷机操作过程,在刷机的过程中,要...

如何做好分布式任务调度——Scheduler 的一些探索

作者:张宇轩,章逸,曾丹初识Scheduler找准定位:分布式任务调度平台...

mysqldump备份操作大全及相关参数详解

mysqldump简介mysqldump是用于转储MySQL数据库的实用程序,通常我们用来迁移和备份数据库;它自带的功能参数非常多,文中列举出几乎所有常用的导出操作方法,在文章末尾将所有的参数详细说明...

大厂面试冲刺,Java“实战”问题三连,你碰到了哪个?

推荐学习...

亿级分库分表,如何丝滑扩容、如何双写灰度

以下是基于亿级分库分表丝滑扩容与双写灰度设计方案,结合架构图与核心流程说明:一、总体设计目标...

MYSQL表设计规范(mysql表设计原则)

日常工作总结,不是通用规范一、表设计库名、表名、字段名必须使用小写字母,“_”分割。...

怎么解决MySQL中的Duplicate entry错误?

在使用MySQL数据库时,我们经常会遇到Duplicateentry错误,这是由于插入或更新数据时出现了重复的唯一键值。这种错误可能会导致数据的不一致性和完整性问题。为了解决这个问题,我们可以采取以...

高并发下如何防重?(高并发如何防止重复)

前言最近测试给我提了一个bug,说我之前提供的一个批量复制商品的接口,产生了重复的商品数据。...

性能压测数据告诉你MySQL和MariaDB该怎么选

1.压测环境为了尽可能的客观公正,本次选择同一物理机上的两台虚拟机,一台用作数据库服务器,一台用作运行压测工具mysqlslap,操作系统均为UbuntuServer22.04LTS。...

屠龙之技 --sql注入 不值得浪费超过十天 实战中sqlmap--lv 3通杀全国

MySQL小结发表于2020-09-21分类于知识整理阅读次数:本文字数:67k阅读时长≈1:01...

破防了,谁懂啊家人们:记一次 mysql 问题排查

作者:温粥一、前言谁懂啊家人们,作为一名java开发,原来以为mysql这东西,写写CRUD,不是有手就行吗;你说DDL啊,不就是设计个表结构,搞几个索引吗。...

SpringBoot系列Mybatis之批量插入的几种姿势

...

MySQL 之 Performance Schema(mysql安装及配置超详细教程)

MySQL之PerformanceSchema介绍PerformanceSchema提供了在数据库运行时实时检查MySQL服务器的内部执行情况的方法,通过监视MySQL服务器的事件来实现监视内...

取消回复欢迎 发表评论: