rocketmq延迟消息实现原理(上) rocket mq怎么实现延迟消息的
ztj100 2024-12-25 16:49 16 浏览 0 评论
什么是延迟消息
延迟消息是指,生产者发送一条消息到到消息队列后并不期望这条消息马上会被消费者消费到,而是期望到了指定的时间(消息延迟一定时间),可以认为定时到当前时间加上一定的延迟时间。消费者才可以消费到这条消息。
这篇文章为什么说是实现原理(上)呢,因为rocketmq4版本针对延迟消息只支持默认等级的,不支持精确任意秒级的延迟或定时消息,在rocketmq5版本中支持定时到任意秒级时间的定时消息,所以这篇文章叫上,下步再继续讲解支持定时任意秒级时间的原理。
应用场景
在电子商务中,如果提交订单,可以发送延迟消息,30分钟后可以查看订单状态。如果订单仍未付款,则可以取消订单并释放库存。
发送延迟消息
发送延迟的消息代码示例,如下代码块所示:
// 实例化一个生产者来产生消息
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
Message message = new Message("TestTopic", ("Hello scheduled message ").getBytes());
// 设置延时等级
message.setDelayTimeLevel(5);
// 发送消息
producer.send(message);
// 关闭生产者
producer.shutdown();
可以看到相对于发送普通消息,增加了设置延迟等级这个属性,这个延迟等级是个int类型的属性,在发送的消息中是放到消息的扩展属性中的,key:DELAY,value:xx(等级值,如上面的5)
// Message的扩展属性
private Map<String, String> properties;
public void setDelayTimeLevel(int level) {
this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}
rocketmq目前支持的延迟等级是固定的,写在消息存储配置类(org.apache.rocketmq.store.config.MessageStoreConfig)中的一个属性,如下代码所示,可以看到支持18个等级的延迟设置(从1s~2h)。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
上面发送代码中设置的等级5对应的就是延迟1m的设置,具体等级的设置与延迟的时间对应关系,是在rocketmq启动的时候就会加载解析上面的配置信息,进行延迟等级与时间的映射,具体有两块地方用到这个配置,分别是消息存储与定时调度。
延迟消息存储
解析延迟时间代码是org.apache.rocketmq.store.DefaultMessageStore中,核心代码如下:
// 存储延迟级别与时间的映射
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<>(32);
// 解析上面的延迟时间,放入到上面的delayLevelTable中
public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
String levelString = messageStoreConfig.getMessageDelayLevel();
try {
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
this.delayLevelTable.put(level, delayTimeMillis);
}
} catch (Exception e) {
LOGGER.error("parse message delay level failed. messageDelayLevel = {}", levelString, e);
return false;
}
return true;
}
消息发送到broker后,消息进行存储org.apache.rocketmq.store.DefaultMessageStore#putMessage在消息存储方法中,默认注册了3个hook前置处理,针对延迟消息处理,我们主要看下面这个hook方法的处理,代码如下
public static PutMessageResult handleScheduleMessage(BrokerController brokerController,
final MessageExtBrokerInner msg) {
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 其他......
// 处理延迟消息逻辑
if (msg.getDelayTimeLevel() > 0) {
transformDelayLevelMessage(brokerController, msg);
}
}
return null;
}
public static void transformDelayLevelMessage(BrokerController brokerController, MessageExtBrokerInner msg) {
// 超过最大的延迟等级,设置为最大的等级,即延迟2h
if (msg.getDelayTimeLevel() > brokerController.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(brokerController.getScheduleMessageService().getMaxDelayLevel());
}
// 把真实的topic与队列id放到扩展属性中,进行记录
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 设置延迟消息topic为 SCHEDULE_TOPIC_XXXX
msg.setTopic(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC);
// 设置延迟消息队列id为延迟等级-1
msg.setQueueId(ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()));
}
上面的代码执行,总结如下:
- 如果设置的延迟等级大于最大的延迟等级,设置为最大的;
- 把延迟消息的真实topic与queueId放到消息的扩展属性中;
- 把延迟消息的topic用默认的SCHEDULE_TOPIC_XXXX进行替换
- queueId设置为延迟级别-1;
- 写入到commitlog日志中,及异步写入到对应的18个consumequeue(这里的消息队列的topic是默认的SCHEDULE_TOPIC_XXXX,tagscode是通过存储的时间进行计算),后续由下面的定时任务进行消息这些消息队列。经典的消息存储结构如下所示。
- 注意:这个时候由于存储的消息及消费队列并不是真实的topic及queueId,所以此时是无法被消息者所消费的,只能通过定时调度进行消费。
定时调度
定时消息服务类org.apache.rocketmq.broker.schedule.ScheduleMessageService,启动方法中,加载配置解析代码如下:
// 存储延迟级别与时间的映射
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<>(32);
/**
* 启动方法
*/
public void start() {
// cas操作,进行启动后标记
if (started.compareAndSet(false, true)) {
// 调用加载防范
this.load();
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
if (this.enableAsyncDeliver) {
this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
}
// 遍历延迟等级与时间的映射关系
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
if (this.enableAsyncDeliver) {
this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
// 为每个延迟时间创建一个的定时任务(DeliverDelayedMessageTimerTask是个线程)
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
}
}
// 持久化操作
scheduledPersistService.scheduleAtFixedRate(() -> {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}, 10000, this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
}
}
/**
* 加载方法
*/
@Override
public boolean load() {
boolean result = super.load();
result = result && this.parseDelayLevel();
result = result && this.correctDelayOffset();
return result;
}
/**
* 解析配置的延迟时间与等级进行映射
*/
public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
String levelString = this.brokerController.getMessageStoreConfig().getMessageDelayLevel();
try {
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
this.delayLevelTable.put(level, delayTimeMillis);
if (this.enableAsyncDeliver) {
this.deliverPendingTable.put(level, new LinkedBlockingQueue<>());
}
}
} catch (Exception e) {
log.error("parse message delay level failed. messageDelayLevel = {}", levelString, e);
return false;
}
return true;
}
核心类org.apache.rocketmq.broker.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask
class DeliverDelayedMessageTimerTask implements Runnable {
// 延迟级别
private final int delayLevel;
// 偏移量
private final long offset;
public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
this.delayLevel = delayLevel;
this.offset = offset;
}
@Override
public void run() {
try {
if (isStarted()) {
// 核心执行逻辑
this.executeOnTimeUp();
}
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeUp exception", e);
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);
}
}
这里我们主要看执行方法executeOnTimeUp,该方法执行逻辑步骤总结:
- 根据默认的延迟消息topic与延迟队列id获取消息消费队列,如果未找到,说明并没有当前延迟的消息,则根据延时级别创建下一次调度任务;
- 根据offset从消息消费队列中获取当前队列中所有有效的消息,如果未找到,则更新一下延迟队列定时拉取进度并创建定时任务待下一次继续尝试。
- 遍历ConsumeQueue,每一个标准ConsumeQueue条目为20个字节,解析出消息的物理偏移量、消息长度、消息taggcode,为从commitlog加载具体的消息做准备。
- 根据消息物理偏移量与消息大小从commitlog文件中查找消息。如果未找到消息,打印错误日志,根据延迟时间创建下一个定时器。
- 根据消息重新构建新的消息对象,清除消息的延迟级别属性(delayLevel)、并从消息属性中恢复消息真实的消息主题与消息消费队列,消息的消费次数reconsumeTimes并不会丢失。
- 将消息再次存入到commitlog中,并异步转发到对应的消息队列上,此时消费者可以进行消息消费。
相关推荐
- 使用Python编写Ping监测程序(python 测验)
-
Ping是一种常用的网络诊断工具,它可以测试两台计算机之间的连通性;如果您需要监测某个IP地址的连通情况,可以使用Python编写一个Ping监测程序;本文将介绍如何使用Python编写Ping监测程...
- 批量ping!有了这个小工具,python再也香不了一点
-
号主:老杨丨11年资深网络工程师,更多网工提升干货,请关注公众号:网络工程师俱乐部下午好,我的网工朋友。在咱们网工的日常工作中,经常需要检测多个IP地址的连通性。不知道你是否也有这样的经历:对着电脑屏...
- python之ping主机(python获取ping结果)
-
#coding=utf-8frompythonpingimportpingforiinrange(100,255):ip='192.168.1.'+...
- 网站安全提速秘籍!Nginx配置HTTPS+反向代理实战指南
-
太好了,你直接问到重点场景了:Nginx+HTTPS+反向代理,这个组合是现代Web架构中最常见的一种部署方式。咱们就从理论原理→实操配置→常见问题排查→高级玩法一层层剖开说,...
- Vue开发中使用iframe(vue 使用iframe)
-
内容:iframe全屏显示...
- Vue3项目实践-第五篇(改造登录页-Axios模拟请求数据)
-
本文将介绍以下内容:项目中的public目录和访问静态资源文件的方法使用json文件代替http模拟请求使用Axios直接访问json文件改造登录页,配合Axios进行登录请求,并...
- Vue基础四——Vue-router配置子路由
-
我们上节课初步了解Vue-router的初步知识,也学会了基本的跳转,那我们这节课学习一下子菜单的路由方式,也叫子路由。子路由的情况一般用在一个页面有他的基础模版,然后它下面的页面都隶属于这个模版,只...
- Vue3.0权限管理实现流程【实践】(vue权限管理系统教程)
-
作者:lxcan转发链接:https://segmentfault.com/a/1190000022431839一、整体思路...
- swiper在vue中正确的使用方法(vue中如何使用swiper)
-
swiper是网页中非常强大的一款轮播插件,说是轮播插件都不恰当,因为它能做的事情太多了,swiper在vue下也是能用的,需要依赖专门的vue-swiper插件,因为vue是没有操作dom的逻辑的,...
- Vue怎么实现权限管理?控制到按钮级别的权限怎么做?
-
在Vue项目中实现权限管理,尤其是控制到按钮级别的权限控制,通常包括以下几个方面:一、权限管理的层级划分...
- 【Vue3】保姆级毫无废话的进阶到实战教程 - 01
-
作为一个React、Vue双修选手,在Vue3逐渐稳定下来之后,是时候摸摸Vue3了。Vue3的变化不可谓不大,所以,本系列主要通过对Vue3中的一些BigChanges做...
- Vue3开发极简入门(13):编程式导航路由
-
前面几节文章,写的都是配置路由。但是在实际项目中,下面这种路由导航的写法才是最常用的:比如登录页面,服务端校验成功后,跳转至系统功能页面;通过浏览器输入URL直接进入系统功能页面后,读取本地存储的To...
- vue路由同页面重定向(vue路由重定向到外部url)
-
在Vue中,可以使用路由的重定向功能来实现同页面的重定向。首先,在路由配置文件(通常是`router/index.js`)中,定义一个新的路由,用于重定向到同一个页面。例如,我们可以定义一个名为`Re...
- 那个 Vue 的路由,路由是干什么用的?
-
在Vue里,路由就像“页面导航的指挥官”,专门负责管理页面(组件)的切换和显示逻辑。简单来说,它能让单页应用(SPA)像多页应用一样实现“不同URL对应不同页面”的效果,但整个过程不会刷新网页。一、路...
- Vue3项目投屏功能开发!(vue投票功能)
-
最近接了个大屏项目,产品想在不同的显示器上展示大屏项目不同的页面,做出来的效果图大概长这样...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)
- vmware17pro最新密钥 (34)
- mysql单表最大数据量 (35)