rocketmq延迟消息实现原理(上) rocket mq怎么实现延迟消息的
ztj100 2024-12-25 16:49 22 浏览 0 评论
什么是延迟消息
延迟消息是指,生产者发送一条消息到到消息队列后并不期望这条消息马上会被消费者消费到,而是期望到了指定的时间(消息延迟一定时间),可以认为定时到当前时间加上一定的延迟时间。消费者才可以消费到这条消息。
这篇文章为什么说是实现原理(上)呢,因为rocketmq4版本针对延迟消息只支持默认等级的,不支持精确任意秒级的延迟或定时消息,在rocketmq5版本中支持定时到任意秒级时间的定时消息,所以这篇文章叫上,下步再继续讲解支持定时任意秒级时间的原理。
应用场景
在电子商务中,如果提交订单,可以发送延迟消息,30分钟后可以查看订单状态。如果订单仍未付款,则可以取消订单并释放库存。
发送延迟消息
发送延迟的消息代码示例,如下代码块所示:
// 实例化一个生产者来产生消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
Message message = new Message("TestTopic", ("Hello scheduled message ").getBytes());
// 设置延时等级
message.setDelayTimeLevel(5);
// 发送消息
producer.send(message);
// 关闭生产者
producer.shutdown();
可以看到相对于发送普通消息,增加了设置延迟等级这个属性,这个延迟等级是个int类型的属性,在发送的消息中是放到消息的扩展属性中的,key:DELAY,value:xx(等级值,如上面的5)
// Message的扩展属性
private Map<String, String> properties;
public void setDelayTimeLevel(int level) {
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}
rocketmq目前支持的延迟等级是固定的,写在消息存储配置类(org.apache.rocketmq.store.config.MessageStoreConfig)中的一个属性,如下代码所示,可以看到支持18个等级的延迟设置(从1s~2h)。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
上面发送代码中设置的等级5对应的就是延迟1m的设置,具体等级的设置与延迟的时间对应关系,是在rocketmq启动的时候就会加载解析上面的配置信息,进行延迟等级与时间的映射,具体有两块地方用到这个配置,分别是消息存储与定时调度。
延迟消息存储
解析延迟时间代码是org.apache.rocketmq.store.DefaultMessageStore中,核心代码如下:
// 存储延迟级别与时间的映射
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<>(32);
// 解析上面的延迟时间,放入到上面的delayLevelTable中
public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
String levelString = messageStoreConfig.getMessageDelayLevel();
try {
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
this.delayLevelTable.put(level, delayTimeMillis);
}
} catch (Exception e) {
LOGGER.error("parse message delay level failed. messageDelayLevel = {}", levelString, e);
return false;
}
return true;
}
消息发送到broker后,消息进行存储org.apache.rocketmq.store.DefaultMessageStore#putMessage在消息存储方法中,默认注册了3个hook前置处理,针对延迟消息处理,我们主要看下面这个hook方法的处理,代码如下
public static PutMessageResult handleScheduleMessage(BrokerController brokerController,
final MessageExtBrokerInner msg) {
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 其他......
// 处理延迟消息逻辑
if (msg.getDelayTimeLevel() > 0) {
transformDelayLevelMessage(brokerController, msg);
}
}
return null;
}
public static void transformDelayLevelMessage(BrokerController brokerController, MessageExtBrokerInner msg) {
// 超过最大的延迟等级,设置为最大的等级,即延迟2h
if (msg.getDelayTimeLevel() > brokerController.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(brokerController.getScheduleMessageService().getMaxDelayLevel());
}
// 把真实的topic与队列id放到扩展属性中,进行记录
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 设置延迟消息topic为 SCHEDULE_TOPIC_XXXX
msg.setTopic(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC);
// 设置延迟消息队列id为延迟等级-1
msg.setQueueId(ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()));
}
上面的代码执行,总结如下:
- 如果设置的延迟等级大于最大的延迟等级,设置为最大的;
- 把延迟消息的真实topic与queueId放到消息的扩展属性中;
- 把延迟消息的topic用默认的SCHEDULE_TOPIC_XXXX进行替换
- queueId设置为延迟级别-1;
- 写入到commitlog日志中,及异步写入到对应的18个consumequeue(这里的消息队列的topic是默认的SCHEDULE_TOPIC_XXXX,tagscode是通过存储的时间进行计算),后续由下面的定时任务进行消息这些消息队列。经典的消息存储结构如下所示。
- 注意:这个时候由于存储的消息及消费队列并不是真实的topic及queueId,所以此时是无法被消息者所消费的,只能通过定时调度进行消费。
定时调度
定时消息服务类org.apache.rocketmq.broker.schedule.ScheduleMessageService,启动方法中,加载配置解析代码如下:
// 存储延迟级别与时间的映射
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<>(32);
/**
* 启动方法
*/
public void start() {
// cas操作,进行启动后标记
if (started.compareAndSet(false, true)) {
// 调用加载防范
this.load();
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
if (this.enableAsyncDeliver) {
this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
}
// 遍历延迟等级与时间的映射关系
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
if (this.enableAsyncDeliver) {
this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
// 为每个延迟时间创建一个的定时任务(DeliverDelayedMessageTimerTask是个线程)
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
// 持久化操作
scheduledPersistService.scheduleAtFixedRate(() -> {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}, 10000, this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
}
}
/**
* 加载方法
*/
@Override
public boolean load() {
boolean result = super.load();
result = result && this.parseDelayLevel();
result = result && this.correctDelayOffset();
return result;
}
/**
* 解析配置的延迟时间与等级进行映射
*/
public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
String levelString = this.brokerController.getMessageStoreConfig().getMessageDelayLevel();
try {
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
this.delayLevelTable.put(level, delayTimeMillis);
if (this.enableAsyncDeliver) {
this.deliverPendingTable.put(level, new LinkedBlockingQueue<>());
}
}
} catch (Exception e) {
log.error("parse message delay level failed. messageDelayLevel = {}", levelString, e);
return false;
}
return true;
}
核心类org.apache.rocketmq.broker.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask
class DeliverDelayedMessageTimerTask implements Runnable {
// 延迟级别
private final int delayLevel;
// 偏移量
private final long offset;
public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
this.delayLevel = delayLevel;
this.offset = offset;
}
@Override
public void run() {
try {
if (isStarted()) {
// 核心执行逻辑
this.executeOnTimeUp();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeUp exception", e);
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);
}
}
这里我们主要看执行方法executeOnTimeUp,该方法执行逻辑步骤总结:
- 根据默认的延迟消息topic与延迟队列id获取消息消费队列,如果未找到,说明并没有当前延迟的消息,则根据延时级别创建下一次调度任务;
- 根据offset从消息消费队列中获取当前队列中所有有效的消息,如果未找到,则更新一下延迟队列定时拉取进度并创建定时任务待下一次继续尝试。
- 遍历ConsumeQueue,每一个标准ConsumeQueue条目为20个字节,解析出消息的物理偏移量、消息长度、消息taggcode,为从commitlog加载具体的消息做准备。
- 根据消息物理偏移量与消息大小从commitlog文件中查找消息。如果未找到消息,打印错误日志,根据延迟时间创建下一个定时器。
- 根据消息重新构建新的消息对象,清除消息的延迟级别属性(delayLevel)、并从消息属性中恢复消息真实的消息主题与消息消费队列,消息的消费次数reconsumeTimes并不会丢失。
- 将消息再次存入到commitlog中,并异步转发到对应的消息队列上,此时消费者可以进行消息消费。
相关推荐
- sharding-jdbc实现`分库分表`与`读写分离`
-
一、前言本文将基于以下环境整合...
- 三分钟了解mysql中主键、外键、非空、唯一、默认约束是什么
-
在数据库中,数据表是数据库中最重要、最基本的操作对象,是数据存储的基本单位。数据表被定义为列的集合,数据在表中是按照行和列的格式来存储的。每一行代表一条唯一的记录,每一列代表记录中的一个域。...
- MySQL8行级锁_mysql如何加行级锁
-
MySQL8行级锁版本:8.0.34基本概念...
- mysql使用小技巧_mysql使用入门
-
1、MySQL中有许多很实用的函数,好好利用它们可以省去很多时间:group_concat()将取到的值用逗号连接,可以这么用:selectgroup_concat(distinctid)fr...
- MySQL/MariaDB中如何支持全部的Unicode?
-
永远不要在MySQL中使用utf8,并且始终使用utf8mb4。utf8mb4介绍MySQL/MariaDB中,utf8字符集并不是对Unicode的真正实现,即不是真正的UTF-8编码,因...
- 聊聊 MySQL Server 可执行注释,你懂了吗?
-
前言MySQLServer当前支持如下3种注释风格:...
- MySQL系列-源码编译安装(v5.7.34)
-
一、系统环境要求...
- MySQL的锁就锁住我啦!与腾讯大佬的技术交谈,是我小看它了
-
对酒当歌,人生几何!朝朝暮暮,唯有己脱。苦苦寻觅找工作之间,殊不知今日之事乃我心之痛,难道是我不配拥有工作嘛。自面试后他所谓的等待都过去一段时日,可惜在下京东上的小金库都要见低啦。每每想到不由心中一...
- MySQL字符问题_mysql中字符串的位置
-
中文写入乱码问题:我输入的中文编码是urf8的,建的库是urf8的,但是插入mysql总是乱码,一堆"???????????????????????"我用的是ibatis,终于找到原因了,我是这么解决...
- 深圳尚学堂:mysql基本sql语句大全(三)
-
数据开发-经典1.按姓氏笔画排序:Select*FromTableNameOrderByCustomerNameCollateChinese_PRC_Stroke_ci_as//从少...
- MySQL进行行级锁的?一会next-key锁,一会间隙锁,一会记录锁?
-
大家好,是不是很多人都对MySQL加行级锁的规则搞的迷迷糊糊,一会是next-key锁,一会是间隙锁,一会又是记录锁。坦白说,确实还挺复杂的,但是好在我找点了点规律,也知道如何如何用命令分析加...
- 一文讲清怎么利用Python Django实现Excel数据表的导入导出功能
-
摘要:Python作为一门简单易学且功能强大的编程语言,广受程序员、数据分析师和AI工程师的青睐。本文系统讲解了如何使用Python的Django框架结合openpyxl库实现Excel...
- 用DataX实现两个MySQL实例间的数据同步
-
DataXDataX使用Java实现。如果可以实现数据库实例之间准实时的...
- MySQL数据库知识_mysql数据库基础知识
-
MySQL是一种关系型数据库管理系统;那废话不多说,直接上自己以前学习整理文档:查看数据库命令:(1).查看存储过程状态:showprocedurestatus;(2).显示系统变量:show...
- 如何为MySQL中的JSON字段设置索引
-
背景MySQL在2015年中发布的5.7.8版本中首次引入了JSON数据类型。自此,它成了一种逃离严格列定义的方式,可以存储各种形状和大小的JSON文档,例如审计日志、配置信息、第三方数据包、用户自定...
你 发表评论:
欢迎- 一周热门
-
-
MySQL中这14个小玩意,让人眼前一亮!
-
旗舰机新标杆 OPPO Find X2系列正式发布 售价5499元起
-
【VueTorrent】一款吊炸天的qBittorrent主题,人人都可用
-
面试官:使用int类型做加减操作,是线程安全吗
-
C++编程知识:ToString()字符串转换你用正确了吗?
-
【Spring Boot】WebSocket 的 6 种集成方式
-
PyTorch 深度学习实战(26):多目标强化学习Multi-Objective RL
-
pytorch中的 scatter_()函数使用和详解
-
与 Java 17 相比,Java 21 究竟有多快?
-
基于TensorRT_LLM的大模型推理加速与OpenAI兼容服务优化
-
- 最近发表
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)
- vmware17pro最新密钥 (34)
- mysql单表最大数据量 (35)