百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

快速尝鲜:RabbitMQ 搭建完就得用起来

ztj100 2024-11-23 00:04 11 浏览 0 评论

在项目真正开始之前我们先来简单介绍下 RabbitMQ 的工作流程:

  • 生产者往交换机中发送消息;
  • 交换机通过规则绑定队列,通过路由键将消息存储到队列中;
  • 消费者获取队列中的消息进行消费;

环境:SpringBoot 2.6.3、JDK 1.8

项目搭建

首先创建 SpringBoot 项目 rabbit-mq

  1. 引入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

复制代码

  1. yml 文件配置
spring:
  rabbitmq:
    host: 127.0.0.1     //rabbitMQ服务地址
    port: 15672   //这个地方暂时先用我们之前配置的15672
    username: cheetah   //自己的账户名
    password: 123456    //自己的密码

复制代码

  1. 直连交换机

本项目以直连交换机为例,至于其他的交换机类型将在后文中给出详细介绍。

@Configuration
public class DirectRabbitConfig {


    /**
     * 定义交换机
     **/
    @Bean
    public DirectExchange directExchange(){
        /**
         * 交换机名称
         * 持久性标志:是否持久化,默认是 true 即声明一个持久的 exchange,该exchange将在服务器重启后继续运行
         * 自动删除标志:是否自动删除,默认为 false, 如果服务器想在 exchange不再使用时删除它,则设置为 true
         **/
        return new DirectExchange("directExchange", true, false);
    }


    /**
     * 定义队列
     **/
    @Bean
    public Queue directQueue(){
        /**
         * name:队列名称
         * durable:是否持久化,默认是 true,持久化队列,会被存储在磁盘上,当消息代理重启时仍然存在
         * exclusive:是否排他,默认为 false,true则表示声明了一个排他队列(该队列将仅由声明者连接使用),如果连接关闭,则队列被删除。此参考优先级高于durable
         * autoDelete:是否自动删除, 默认是 false,true则表示当队列不再使用时,服务器删除该队列
         **/
        return new Queue("directQueue",true);
    }


    /**
     * 队列和交换机绑定
     * 设置路由键:directRouting
     **/
    @Bean
    Binding bindingDirect(){
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("directRouting");
    }




}

复制代码

  1. 消息发送
@RestController
public class SendMessageController {


    @Autowired
    private RabbitTemplate rabbitTemplate;


    @GetMapping("/sendMessage")
    public String sendMessage(){
        //将消息携带路由键值
        rabbitTemplate.convertAndSend("directExchange", "directRouting", "发送消息!");
        return "ok";
    }


}

复制代码

我们先启动程序,在浏览器访问下

http://127.0.0.1:9001/sendMessage

报错如下:

我们之前已经给该用户分配过权限了,如果之前未分配,直接在客户端中配置:

之所以访问不到,是因为我们使用的端口号不正确

所以我们需要将端口改为 5672 (如果是阿里云服务器实例,需要将该端口 开放权限

我们再来访问下

http://127.0.0.1:9001/sendMessage

请求返回"OK",控制台输出

客户端相关页面截图如下:

  1. 消息消费
@Component
@RabbitListener(queues = "directQueue")//监听队列名称
public class MQReciever {


    @RabbitHandler
    public void process(String message){
        System.out.println("接收到的消息是:"+ message);
    }
}

复制代码

启动项目,发现消息已经被消费。

为了防止消息丢失, RabbitMQ 增加了消息确认机制:生产者消息确认机制和消费者消息确认机制。

确认机制

一、生产者消息确认机制

  1. yml 中增加配置信息
spring:
  rabbitmq:
    #确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    #确认消息已发送到队列(Queue)
    publisher-returns: true

复制代码

spring.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果

  1. 增加回调
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
 RabbitTemplate rabbitTemplate = new RabbitTemplate();
 rabbitTemplate.setConnectionFactory(connectionFactory);
 //设置开启 Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
 rabbitTemplate.setMandatory(true);


 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  @Override
  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
   System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);
   System.out.println("ConfirmCallback:     "+"确认情况:"+ack);
   System.out.println("ConfirmCallback:     "+"原因:"+cause);
  }
 });


 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback(){
  @Override
  public void returnedMessage(ReturnedMessage returned) {
   System.out.println("ReturnCallback:     "+"消息:"+returned.getMessage());
   System.out.println("ReturnCallback:     "+"回应码:"+returned.getReplyCode());
   System.out.println("ReturnCallback:     "+"回应信息:"+returned.getReplyText());
   System.out.println("ReturnCallback:     "+"交换机:"+returned.getExchange());
   System.out.println("ReturnCallback:     "+"路由键:"+returned.getRoutingKey());
  }
 });
 return rabbitTemplate;
}

复制代码

  • confirm 机制是只保证消息到达 exchange ,并不保证消息可以路由到正确的地方 queue
  • 当前的 exchange 不存在或者指定的路由 key 路由不到才会触发 return 机制

大家可以自行演示以下情况的执行结果:

  • 不存在交换机和队列
  • 存在交换机,不存在队列
  • 消息推送成功

二、消费者消息的确认机制

默认情况下如果一个消息被消费者正确接收则会从队列中移除。如果一个队列没被任何消费者订阅,那么这个队列中的消息会被缓存,当有消费者订阅时则会立即发送,进而从队列中移除。

消费者消息的确认机制可以分为以下 3 种:

  1. 自动确认

AcknowledgeMode.NONE 默认为自动确认,不管消费者是否成功处理了消息,消息都会从队列中被移除。

  1. 根据情况确认

AcknowledgeMode.AUTO 根据方法的执行情况来决定是否确认还是拒绝(是否重新入队列)

  • 如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认
  • 当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且消息不会重回队列
  • 当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认
  • 其他的异常,则消息会被拒绝,并且该消息会重回队列,如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的。可以通过 setDefaultRequeueRejected (默认是 true )去设置

可能造成消息丢失,一般是需要我们在 try-catch 捕捉异常后, 打印日志 用于追踪数据,这样找出对应数据再做后续处理。

  1. 手动确认

AcknowledgeMode.MANUAL 对于手动确认,也是我们工作中最常用到的,它的用法如下:

/*
 * 肯定确认
 * deliveryTag:消息队列数据的唯一id
 * multiple:是否批量 
 * true :一次性确认所有小于等于deliveryTag的消息
 * false:对当前消息进行确认;
 */
channel.basicAck(long deliveryTag, boolean multiple); 

复制代码

/*
 * 否定确认
 * multiple:是否批量 
 *   true:一次性拒绝所有小于deliveryTag的消息
 *   false:对当前消息进行确认;
 * requeue:被拒绝的是否重新入列,
 *   true:就是将数据重新丢回队列里,那么下次还会消费这消息;
 *   false:就是拒绝处理该消息,服务器把该消息丢掉即可。 
 */
channel.basicNack(long deliveryTag, boolean multiple, boolean requeue);

复制代码

/*
 * 用于否定确认,但与basicNack相比有一个限制,一次只能拒绝单条消息
 */
channel.basicReject(long deliveryTag, boolean requeue);  

复制代码

手动确认

在 yml 配置中开启手动确认模式

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

复制代码

或者在代码中开启

@Configuration
public class MessageListenerConfig {


    @Autowired
    private CachingConnectionFactory connectionFactory;


    @Autowired
    private MQReciever mqReciever;//消息接收处理类


    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        //并发使用者的数量
        container.setConcurrentConsumers(1);
        //消费者人数上限
        container.setMaxConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
        //设置一个队列,此处支持设置多个
        container.setQueueNames("directQueue");
        container.setMessageListener(mqReciever);
        return container;
    }
}

复制代码

消息消费类

@Component
@RabbitListener(queues = "directQueue")//监听队列名称
public class MQReciever implements ChannelAwareMessageListener {


    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            String msg = message.toString();
            String[] msgArray = msg.split("'");//可以点进Message里面看源码,单引号直接的数据就是我们的map消息数据
            System.out.println("消费的消息内容:"+msgArray[1]);
            System.out.println("消费的主题消息来自:"+message.getMessageProperties().getConsumerQueue());
            
            //业务处理
            ......
            
            channel.basicAck(deliveryTag, true);
            
        } catch (Exception e) {
            //拒绝重新入队列
            channel.basicReject(deliveryTag, false);   
            e.printStackTrace();
        }
    }
} 

原文 https://xie.infoq.cn/article/96df79f01c32a63129dfe7a29

相关推荐

告别手动操作:一键多工作表合并的实用方法

通常情况下,我们需要将同一工作簿内不同工作表中的数据进行合并处理。如何快速有效地完成这些数据的整合呢?这主要取决于需要合并的源数据的结构。...

【MySQL技术专题】「优化技术系列」常用SQL的优化方案和技术思路

概述前面我们介绍了MySQL中怎么样通过索引来优化查询。日常开发中,除了使用查询外,我们还会使用一些其他的常用SQL,比如INSERT、GROUPBY等。对于这些SQL语句,我们该怎么样进行优化呢...

9.7寸视网膜屏原道M9i双系统安装教程

泡泡网平板电脑频道4月17日原道M9i采用Win8安卓双系统,对于喜欢折腾的朋友来说,刷机成了一件难事,那么原道M9i如何刷机呢?下面通过详细地图文,介绍原道M9i的刷机操作过程,在刷机的过程中,要...

如何做好分布式任务调度——Scheduler 的一些探索

作者:张宇轩,章逸,曾丹初识Scheduler找准定位:分布式任务调度平台...

mysqldump备份操作大全及相关参数详解

mysqldump简介mysqldump是用于转储MySQL数据库的实用程序,通常我们用来迁移和备份数据库;它自带的功能参数非常多,文中列举出几乎所有常用的导出操作方法,在文章末尾将所有的参数详细说明...

大厂面试冲刺,Java“实战”问题三连,你碰到了哪个?

推荐学习...

亿级分库分表,如何丝滑扩容、如何双写灰度

以下是基于亿级分库分表丝滑扩容与双写灰度设计方案,结合架构图与核心流程说明:一、总体设计目标...

MYSQL表设计规范(mysql表设计原则)

日常工作总结,不是通用规范一、表设计库名、表名、字段名必须使用小写字母,“_”分割。...

怎么解决MySQL中的Duplicate entry错误?

在使用MySQL数据库时,我们经常会遇到Duplicateentry错误,这是由于插入或更新数据时出现了重复的唯一键值。这种错误可能会导致数据的不一致性和完整性问题。为了解决这个问题,我们可以采取以...

高并发下如何防重?(高并发如何防止重复)

前言最近测试给我提了一个bug,说我之前提供的一个批量复制商品的接口,产生了重复的商品数据。...

性能压测数据告诉你MySQL和MariaDB该怎么选

1.压测环境为了尽可能的客观公正,本次选择同一物理机上的两台虚拟机,一台用作数据库服务器,一台用作运行压测工具mysqlslap,操作系统均为UbuntuServer22.04LTS。...

屠龙之技 --sql注入 不值得浪费超过十天 实战中sqlmap--lv 3通杀全国

MySQL小结发表于2020-09-21分类于知识整理阅读次数:本文字数:67k阅读时长≈1:01...

破防了,谁懂啊家人们:记一次 mysql 问题排查

作者:温粥一、前言谁懂啊家人们,作为一名java开发,原来以为mysql这东西,写写CRUD,不是有手就行吗;你说DDL啊,不就是设计个表结构,搞几个索引吗。...

SpringBoot系列Mybatis之批量插入的几种姿势

...

MySQL 之 Performance Schema(mysql安装及配置超详细教程)

MySQL之PerformanceSchema介绍PerformanceSchema提供了在数据库运行时实时检查MySQL服务器的内部执行情况的方法,通过监视MySQL服务器的事件来实现监视内...

取消回复欢迎 发表评论: