RocketMQ源码分析之消息消费 轮询机制 PullRequestHoldService
ztj100 2024-10-27 18:33 36 浏览 0 评论
一、前言
RocketMQ 消费过程中的轮询机制是啥?
1.1 消息消费方式
RocketMQ 支持多种消费方式,包括 Push 模式和 Pull 模式
- Pull 模式:用户自己进行消息的拉取和消费进度的更新
- Push 模式:Broker 将新的消息自动发送给用户进行消费
1.2 Push 消费模式
我们一般使用 RocketMQ 使用的是 Push 模式,因为比较方便,不需要手动拉取消息和更新消费进度。
那么你有没有想过 Push 模式是如何做到能够立即消费新的消息?
1.2.1 Push 模式原理
实际上,在 Push 消费时,消费者是在不断轮询 Broker,询问是否有新消息可供消费。一旦有新消息到达,马上拉取该消息。也就是说 Push 模式内部也用了 Pull 消息的模式,这样就可以立即消费到最新的消息。
1.3 如何进行轮询?
那么 Push 模式或 Pull 模式如何进行消息的查询?
能够想到的比较笨的方法是,每隔一定的时间(如1ms)就向 Broker 发送一个查询请求,如果没有新消息则立刻返回。可想而知这种方法非常浪费网络资源。
RocketMQ 为了提高网络性能,在拉取消息时如果没有新消息,不会马上返回,而是会将该查询请求挂起一段时间,然后再重试查询。如果一直没有新消息,直到轮询时间超过设定的阈值才会返回。
根据轮询设定的超时阈值大小的不同,RocketMQ 有两种轮询方式,分别为长轮询(默认)和短轮询。
1.4 长轮询和短轮询
RocketMQ 的 Broker 端参数 longPollingEnable 可以配置轮询方式,默认为 true
- 短轮询:longPollingEnable=false,轮询时间为 shortPollingTimeMills ,默认为 1s
- 长轮询:longPollingEnable=true,轮询时间为 5s。拉取请求挂起时间:受 DefaultMQPullConsumer 的 brokerSuspendMaxTimeMillis 控制,默认push模式固定15s,pull模式固定20s。
二、源码分析
- 挂起拉取请求;
- 线程不断轮询判断该offset是否有新的消息达到;
- 调用PullMessageProcessor的executeRequestWhenWakeup方法向PullMessageExecutor拉取线程池提交任务;
- PullMessageProcessor线程处理任务交给processRequest方法进行处理;
- NotifyMessageArrivingListener监听消息到达;
1、挂起拉取请求
// 挂起拉取请求
// 根据主题名称 + 队列id, 获取 ManyPullRequest, 对于同一个 topic + 队列的拉取请求用 ManyPullRequest包装,
// 然后将 pullRequest 添加到 ManyPullRequest 中
public void suspendPullRequest(
final String topic,
final int queueId,
final PullRequest pullRequest) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}
mpr.addPullRequest(pullRequest);
}
private String buildKey(final String topic, final int queueId) {
StringBuilder sb = new StringBuilder(topic.length() + 5);
sb.append(topic);
sb.append(TOPIC_QUEUEID_SEPARATOR);
sb.append(queueId);
return sb.toString();
}
2、线程不断轮询判断该offset是否有新的消息达到
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
// 当前是否启用long polling,长轮询,一个请求过来了挂起,每次只挂起 5s,然后就去尝试拉取。
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000); // 长轮询一个等待周期是5s
}
// 如果不开启长轮询模式,则只挂起一次,挂起时间为 shortPollingTimeMills,然后去尝试查找消息
else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
// 检查当前挂起的请求,把拉取的topic+queue的最大offset查一下,通知一下新消息来了,你不用挂起了
// 你可以拿到消息走了
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}
// 你可以在线程体里进入等待的状态,还可以设置超时时间
protected void waitForRunning(long interval) {
// 需要进行等待的时候,必须把已经通过过这个标识从true修改为false
// 默认就是false,所以说一般这个逻辑是不跑的
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
// entry to wait
// 无论此时你的countDownLatch.value是多少,此时都需要复位成1,让我们可以去进行一个等待
waitPoint.reset();
try {
// 他可以等待一定的时间,可以去等待别人进行一个countDown操作
// 开始来等待别人,指定我们的一个超时时间
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false); // 他一定会把hasNotified设置为false
this.onWaitEnd();
}
}
protected void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
// 根据主题,消费队列ID查找队列的最大偏移量。
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
// 根据该offset,判断是否有新的消息达到。
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
}
/**
* 通知消息到达
*
* @param topic 主题名称
* @param queueId 队列id
* @param maxOffset 消费队列当前最大偏移量
* @param tagsCode 消息tag hashcode,基于tag消息过滤
* @param msgStoreTime 消息存储时间
* @param filterBitMap 过滤位图
* @param properties 消息属性,基于属性的消息过滤
*/
public void notifyMessageArriving(
final String topic,
final int queueId,
final long maxOffset, // 最大的offset已经到了哪儿去了
final Long tagsCode,
long msgStoreTime,
byte[] filterBitMap,
Map<String, String> properties) {
String key = this.buildKey(topic, queueId);
// 拿到当前挂起等待topic@queue里的消息
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
// 获取主题与队列的所有 PullRequest 并清除内部 pullRequest 集合,避免重复拉取
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
List<PullRequest> replayList = new ArrayList<PullRequest>();
for (PullRequest request : requestList) {
long newestOffset = maxOffset; // topic@queue里最新到达的消息最大的offset,新消息的offset
if (newestOffset <= request.getPullFromThisOffset()) { // 如果说最新的消息是小于等于拉取请求里要拉取的起始offset
// 查询消息存储组件里的topic@queue最大的offset,查询出来了以后设置为topic@queue里的最大offset
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(
topic, queueId
);
}
// 如果队列最大偏移量大于 pullFromThisOffset 说明有新的消息到达,
// 先简单对消息根据 tag,属性进行一次消息过滤,
// 如果 tag,属性为空,则消息过滤器会返回true,然后 executeRequestWhenWakeup进行消息拉取,结束长轮询
if (newestOffset > request.getPullFromThisOffset()) {
// 根据我们的消费队列判断一下是否匹配,新到达的消息tags是否跟拉取请求里的tags是匹配的
boolean match = request.getMessageFilter().isMatchedByConsumeQueue(
tagsCode,
new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)
);
// match by bit map, need eval again when properties is not null.
if (match && properties != null) {
// 继续判断一下跟commitlog想比是否匹配
match = request.getMessageFilter().isMatchedByCommitLog(
null, properties);
}
if (match) {
try {
// 调用PullMessageProcessor进行消息拉取
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(
request.getClientChannel(),
request.getRequestCommand()
);
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
}
// 如果挂起时间超过 suspendTimeoutMillisLong,则超时,结束长轮询,调用executeRequestWhenWakeup 进行消息拉取,并返回结果到客户端
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(
request.getClientChannel(),
request.getRequestCommand()
);
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
replayList.add(request);
}
if (!replayList.isEmpty()) {
// 如果待拉取偏移量大于消息消费队列最大偏移量,并且未超时,调用 mpr.addPullRequest(replayList) 将拉取任务重新放入,待下一次检测
mpr.addPullRequest(replayList);
}
}
}
}
3、调用PullMessageProcessor的executeRequestWhenWakeup方法向PullMessageExecutor拉取线程池提交任务
public void executeRequestWhenWakeup(final Channel channel,
final RemotingCommand request) throws RemotingCommandException {
Runnable run = new Runnable() {
@Override
public void run() {
try {
final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
if (response != null) {
response.setOpaque(request.getOpaque());
response.markResponseType();
try {
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("processRequestWrapper response to {} failed",
future.channel().remoteAddress(), future.cause());
log.error(request.toString());
log.error(response.toString());
}
}
});
} catch (Throwable e) {
log.error("processRequestWrapper process request over, but response failed", e);
log.error(request.toString());
log.error(response.toString());
}
}
} catch (RemotingCommandException e1) {
log.error("excuteRequestWhenWakeup run", e1);
}
}
};
// 最终将线程封装成一个RequestTask提交至线程池执行
this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
}
4、PullMessageProcessor线程处理任务交给processRequest方法进行处理
线程run方法逻辑,processRequest方法上文有分析过;
5、NotifyMessageArrivingListener监听消息到达
public class NotifyMessageArrivingListener implements MessageArrivingListener {
private final PullRequestHoldService pullRequestHoldService;
public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) {
this.pullRequestHoldService = pullRequestHoldService;
}
@Override
public void arriving(
String topic,
int queueId,
long logicOffset,
long tagsCode,
long msgStoreTime,
byte[] filterBitMap,
Map<String, String> properties) {
this.pullRequestHoldService.notifyMessageArriving(
topic,
queueId,
logicOffset,
tagsCode,
msgStoreTime,
filterBitMap,
properties
);
}
}
也就是说当provider消息推送到broker后,broker会通知NotifyMessageArrivingListener消息到达,从而及时的结束挂起的轮询的链接。
三、总结
要开启长轮询, 在 broker 配置文件中 longPollingEnable=true, 默认是开启的。
??消息拉取为了提高网络性能,在消息服务端根据拉取偏移量去物理文件查找消息时没有找到,并不立即返回消息未找到,而是会将该线程挂起一段时间,然后再次重试,直到重试。挂起分为长轮询或短轮询,在broker 端可以通过 longPollingEnable=true 来开启长轮询。
短轮询:longPollingEnable=false,第一次未拉取到消息后等待 shortPollingTimeMills时间后再试。shortPollingTimeMills默认为1S。
长轮询:longPollingEnable=true,会根据消费者端设置的挂起超时时间,受DefaultMQPullConsumer 的brokerSuspendMaxTimeMillis控制,默认20s,(brokerSuspendMaxTimeMillis),长轮询有两个线程来相互实现。
PullRequestHoldService:每隔5s重试一次。
DefaultMessageStore#ReputMessageService,每当有消息到达后,转发消息,然后调用PullRequestHoldService 线程中的拉取任务,尝试拉取,每处理一次,Thread.sleep(1), 继续下一次检查。
相关推荐
- 如何将数据仓库迁移到阿里云 AnalyticDB for PostgreSQL
-
阿里云AnalyticDBforPostgreSQL(以下简称ADBPG,即原HybridDBforPostgreSQL)为基于PostgreSQL内核的MPP架构的实时数据仓库服务,可以...
- Python数据分析:探索性分析
-
写在前面如果你忘记了前面的文章,可以看看加深印象:Python数据处理...
- C++基础语法梳理:算法丨十大排序算法(二)
-
本期是C++基础语法分享的第十六节,今天给大家来梳理一下十大排序算法后五个!归并排序...
- C 语言的标准库有哪些
-
C语言的标准库并不是一个单一的实体,而是由一系列头文件(headerfiles)组成的集合。每个头文件声明了一组相关的函数、宏、类型和常量。程序员通过在代码中使用#include<...
- [深度学习] ncnn安装和调用基础教程
-
1介绍ncnn是腾讯开发的一个为手机端极致优化的高性能神经网络前向计算框架,无第三方依赖,跨平台,但是通常都需要protobuf和opencv。ncnn目前已在腾讯多款应用中使用,如QQ,Qzon...
- 用rust实现经典的冒泡排序和快速排序
-
1.假设待排序数组如下letmutarr=[5,3,8,4,2,7,1];...
- ncnn+PPYOLOv2首次结合!全网最详细代码解读来了
-
编辑:好困LRS【新智元导读】今天给大家安利一个宝藏仓库miemiedetection,该仓库集合了PPYOLO、PPYOLOv2、PPYOLOE三个算法pytorch实现三合一,其中的PPYOL...
- C++特性使用建议
-
1.引用参数使用引用替代指针且所有不变的引用参数必须加上const。在C语言中,如果函数需要修改变量的值,参数必须为指针,如...
- Qt4/5升级到Qt6吐血经验总结V202308
-
00:直观总结增加了很多轮子,同时原有模块拆分的也更细致,估计为了方便拓展个管理。把一些过度封装的东西移除了(比如同样的功能有多个函数),保证了只有一个函数执行该功能。把一些Qt5中兼容Qt4的方法废...
- 到底什么是C++11新特性,请看下文
-
C++11是一个比较大的更新,引入了很多新特性,以下是对这些特性的详细解释,帮助您快速理解C++11的内容1.自动类型推导(auto和decltype)...
- 掌握C++11这些特性,代码简洁性、安全性和性能轻松跃升!
-
C++11(又称C++0x)是C++编程语言的一次重大更新,引入了许多新特性,显著提升了代码简洁性、安全性和性能。以下是主要特性的分类介绍及示例:一、核心语言特性1.自动类型推导(auto)编译器自...
- 经典算法——凸包算法
-
凸包算法(ConvexHull)一、概念与问题描述凸包是指在平面上给定一组点,找到包含这些点的最小面积或最小周长的凸多边形。这个多边形没有任何内凹部分,即从一个多边形内的任意一点画一条线到多边形边界...
- 一起学习c++11——c++11中的新增的容器
-
c++11新增的容器1:array当时的初衷是希望提供一个在栈上分配的,定长数组,而且可以使用stl中的模板算法。array的用法如下:#include<string>#includ...
- C++ 编程中的一些最佳实践
-
1.遵循代码简洁原则尽量避免冗余代码,通过模块化设计、清晰的命名和良好的结构,让代码更易于阅读和维护...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- node卸载 (33)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- exceptionininitializererror (33)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)