rocketmq延迟消息实现原理(上) rocket mq怎么实现延迟消息的
ztj100 2024-12-25 16:49 10 浏览 0 评论
什么是延迟消息
延迟消息是指,生产者发送一条消息到到消息队列后并不期望这条消息马上会被消费者消费到,而是期望到了指定的时间(消息延迟一定时间),可以认为定时到当前时间加上一定的延迟时间。消费者才可以消费到这条消息。
这篇文章为什么说是实现原理(上)呢,因为rocketmq4版本针对延迟消息只支持默认等级的,不支持精确任意秒级的延迟或定时消息,在rocketmq5版本中支持定时到任意秒级时间的定时消息,所以这篇文章叫上,下步再继续讲解支持定时任意秒级时间的原理。
应用场景
在电子商务中,如果提交订单,可以发送延迟消息,30分钟后可以查看订单状态。如果订单仍未付款,则可以取消订单并释放库存。
发送延迟消息
发送延迟的消息代码示例,如下代码块所示:
// 实例化一个生产者来产生消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
Message message = new Message("TestTopic", ("Hello scheduled message ").getBytes());
// 设置延时等级
message.setDelayTimeLevel(5);
// 发送消息
producer.send(message);
// 关闭生产者
producer.shutdown();
可以看到相对于发送普通消息,增加了设置延迟等级这个属性,这个延迟等级是个int类型的属性,在发送的消息中是放到消息的扩展属性中的,key:DELAY,value:xx(等级值,如上面的5)
// Message的扩展属性
private Map<String, String> properties;
public void setDelayTimeLevel(int level) {
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}
rocketmq目前支持的延迟等级是固定的,写在消息存储配置类(org.apache.rocketmq.store.config.MessageStoreConfig)中的一个属性,如下代码所示,可以看到支持18个等级的延迟设置(从1s~2h)。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
上面发送代码中设置的等级5对应的就是延迟1m的设置,具体等级的设置与延迟的时间对应关系,是在rocketmq启动的时候就会加载解析上面的配置信息,进行延迟等级与时间的映射,具体有两块地方用到这个配置,分别是消息存储与定时调度。
延迟消息存储
解析延迟时间代码是org.apache.rocketmq.store.DefaultMessageStore中,核心代码如下:
// 存储延迟级别与时间的映射
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<>(32);
// 解析上面的延迟时间,放入到上面的delayLevelTable中
public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
String levelString = messageStoreConfig.getMessageDelayLevel();
try {
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
this.delayLevelTable.put(level, delayTimeMillis);
}
} catch (Exception e) {
LOGGER.error("parse message delay level failed. messageDelayLevel = {}", levelString, e);
return false;
}
return true;
}
消息发送到broker后,消息进行存储org.apache.rocketmq.store.DefaultMessageStore#putMessage在消息存储方法中,默认注册了3个hook前置处理,针对延迟消息处理,我们主要看下面这个hook方法的处理,代码如下
public static PutMessageResult handleScheduleMessage(BrokerController brokerController,
final MessageExtBrokerInner msg) {
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 其他......
// 处理延迟消息逻辑
if (msg.getDelayTimeLevel() > 0) {
transformDelayLevelMessage(brokerController, msg);
}
}
return null;
}
public static void transformDelayLevelMessage(BrokerController brokerController, MessageExtBrokerInner msg) {
// 超过最大的延迟等级,设置为最大的等级,即延迟2h
if (msg.getDelayTimeLevel() > brokerController.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(brokerController.getScheduleMessageService().getMaxDelayLevel());
}
// 把真实的topic与队列id放到扩展属性中,进行记录
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 设置延迟消息topic为 SCHEDULE_TOPIC_XXXX
msg.setTopic(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC);
// 设置延迟消息队列id为延迟等级-1
msg.setQueueId(ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()));
}
上面的代码执行,总结如下:
- 如果设置的延迟等级大于最大的延迟等级,设置为最大的;
- 把延迟消息的真实topic与queueId放到消息的扩展属性中;
- 把延迟消息的topic用默认的SCHEDULE_TOPIC_XXXX进行替换
- queueId设置为延迟级别-1;
- 写入到commitlog日志中,及异步写入到对应的18个consumequeue(这里的消息队列的topic是默认的SCHEDULE_TOPIC_XXXX,tagscode是通过存储的时间进行计算),后续由下面的定时任务进行消息这些消息队列。经典的消息存储结构如下所示。
- 注意:这个时候由于存储的消息及消费队列并不是真实的topic及queueId,所以此时是无法被消息者所消费的,只能通过定时调度进行消费。
定时调度
定时消息服务类org.apache.rocketmq.broker.schedule.ScheduleMessageService,启动方法中,加载配置解析代码如下:
// 存储延迟级别与时间的映射
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<>(32);
/**
* 启动方法
*/
public void start() {
// cas操作,进行启动后标记
if (started.compareAndSet(false, true)) {
// 调用加载防范
this.load();
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
if (this.enableAsyncDeliver) {
this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
}
// 遍历延迟等级与时间的映射关系
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
if (this.enableAsyncDeliver) {
this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
// 为每个延迟时间创建一个的定时任务(DeliverDelayedMessageTimerTask是个线程)
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
// 持久化操作
scheduledPersistService.scheduleAtFixedRate(() -> {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}, 10000, this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
}
}
/**
* 加载方法
*/
@Override
public boolean load() {
boolean result = super.load();
result = result && this.parseDelayLevel();
result = result && this.correctDelayOffset();
return result;
}
/**
* 解析配置的延迟时间与等级进行映射
*/
public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
String levelString = this.brokerController.getMessageStoreConfig().getMessageDelayLevel();
try {
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
this.delayLevelTable.put(level, delayTimeMillis);
if (this.enableAsyncDeliver) {
this.deliverPendingTable.put(level, new LinkedBlockingQueue<>());
}
}
} catch (Exception e) {
log.error("parse message delay level failed. messageDelayLevel = {}", levelString, e);
return false;
}
return true;
}
核心类org.apache.rocketmq.broker.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask
class DeliverDelayedMessageTimerTask implements Runnable {
// 延迟级别
private final int delayLevel;
// 偏移量
private final long offset;
public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
this.delayLevel = delayLevel;
this.offset = offset;
}
@Override
public void run() {
try {
if (isStarted()) {
// 核心执行逻辑
this.executeOnTimeUp();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeUp exception", e);
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);
}
}
这里我们主要看执行方法executeOnTimeUp,该方法执行逻辑步骤总结:
- 根据默认的延迟消息topic与延迟队列id获取消息消费队列,如果未找到,说明并没有当前延迟的消息,则根据延时级别创建下一次调度任务;
- 根据offset从消息消费队列中获取当前队列中所有有效的消息,如果未找到,则更新一下延迟队列定时拉取进度并创建定时任务待下一次继续尝试。
- 遍历ConsumeQueue,每一个标准ConsumeQueue条目为20个字节,解析出消息的物理偏移量、消息长度、消息taggcode,为从commitlog加载具体的消息做准备。
- 根据消息物理偏移量与消息大小从commitlog文件中查找消息。如果未找到消息,打印错误日志,根据延迟时间创建下一个定时器。
- 根据消息重新构建新的消息对象,清除消息的延迟级别属性(delayLevel)、并从消息属性中恢复消息真实的消息主题与消息消费队列,消息的消费次数reconsumeTimes并不会丢失。
- 将消息再次存入到commitlog中,并异步转发到对应的消息队列上,此时消费者可以进行消息消费。
相关推荐
- Java项目宝塔搭建实战MES-Springboot开源MES智能制造系统源码
-
大家好啊,我是测评君,欢迎来到web测评。...
- 一个令人头秃的问题,Logback 日志级别设置竟然无效?
-
原文链接:https://mp.weixin.qq.com/s/EFvbFwetmXXA9ZGBGswUsQ原作者:小黑十一点半...
- 实战!SpringBoot + RabbitMQ死信队列实现超时关单
-
需求背景之为什么要有超时关单原因一:...
- 火了!阿里P8架构师编写堪称神级SpringBoot手册,GitHub星标99+
-
Springboot现在已成为企业面试中必备的知识点,以及企业应用的重要模块。今天小编给大家分享一份来着阿里P8架构师编写的...
- Java本地搭建宝塔部署实战springboot仓库管理系统源码
-
大家好啊,我是测评君,欢迎来到web测评。...
- 工具尝鲜(1)-Fleet构建运行一个Springboot入门Web项目
-
Fleet是JetBrains公司推出的轻量级编辑器,对标VSCode。该款产品还在公测当中,具体下载链接如下JetBrainsFleet:由JetBrains打造的下一代IDE。想要尝试的...
- SPRINGBOOT WEB 实现文件夹上传(保留目录结构)
-
网上搜到的SpringBoot的代码不多,完整的不多,能用的也不多,基本上大部分的文章只是提供了少量的代码,讲一下思路,或者实现方案。之前一般的做法都是使用HTML5来做的,大部都是传文件的,传文件夹...
- Java项目本地部署宝塔搭建实战报修小程序springboot版系统源码
-
大家好啊,我是测评君,欢迎来到web测评。...
- 新年IT界大笑料“工行取得基于SpringBoot的web系统后端实现专利
-
先看看专利描述...
- 看完SpringBoot源码后,整个人都精神了
-
前言当读完SpringBoot源码后,被Spring的设计者们折服,Spring系列中没有几行代码是我们看不懂的,而是难在理解设计思路,阅读Spring、SpringMVC、SpringBoot需要花...
- 阿里大牛再爆神著:SpringBoot+Cloud微服务手册
-
今天给大家分享的这份“Springboot+Springcloud微服务开发实战手册”共有以下三大特点...
- WebClient是什么?SpringBoot中如何使用WebClient?
-
WebClient是什么?WebClient是SpringFramework5引入的一个非阻塞、响应式的Web客户端库。它提供了一种简单而强大的方式来进行HTTP请求,并处理来自服务器的响应。与传...
- SpringBoot系列——基于mui的H5套壳APP开发web框架
-
前言 大致原理:创建一个main主页面,只有主页面有头部、尾部,中间内容嵌入iframe内容子页面,如果在当前页面进行跳转操作,也是在iframe中进行跳转,而如果点击尾部按钮切换模块、页面,那...
- 在Spring Boot中使用 jose4j 实现 JSON Web Token (JWT)
-
JSONWebToken或JWT作为服务之间安全通信的一种方式而闻名。...
- Spring Boot使用AOP方式实现统一的Web请求日志记录?
-
AOP简介AOP(AspectOrientedProgramming),面相切面编程,是通过代码预编译与运行时动态代理的方式来实现程序的统一功能维护的方案。AOP作为Spring框架的核心内容,通...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- Java项目宝塔搭建实战MES-Springboot开源MES智能制造系统源码
- 一个令人头秃的问题,Logback 日志级别设置竟然无效?
- 实战!SpringBoot + RabbitMQ死信队列实现超时关单
- 火了!阿里P8架构师编写堪称神级SpringBoot手册,GitHub星标99+
- Java本地搭建宝塔部署实战springboot仓库管理系统源码
- 工具尝鲜(1)-Fleet构建运行一个Springboot入门Web项目
- SPRINGBOOT WEB 实现文件夹上传(保留目录结构)
- Java项目本地部署宝塔搭建实战报修小程序springboot版系统源码
- 新年IT界大笑料“工行取得基于SpringBoot的web系统后端实现专利
- 看完SpringBoot源码后,整个人都精神了
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- node卸载 (33)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- exceptionininitializererror (33)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)