RocketMQ源码分析之消息消费 轮询机制 PullRequestHoldService
ztj100 2024-10-27 18:33 61 浏览 0 评论
一、前言
RocketMQ 消费过程中的轮询机制是啥?
1.1 消息消费方式
RocketMQ 支持多种消费方式,包括 Push 模式和 Pull 模式
- Pull 模式:用户自己进行消息的拉取和消费进度的更新
- Push 模式:Broker 将新的消息自动发送给用户进行消费
1.2 Push 消费模式
我们一般使用 RocketMQ 使用的是 Push 模式,因为比较方便,不需要手动拉取消息和更新消费进度。
那么你有没有想过 Push 模式是如何做到能够立即消费新的消息?
1.2.1 Push 模式原理
实际上,在 Push 消费时,消费者是在不断轮询 Broker,询问是否有新消息可供消费。一旦有新消息到达,马上拉取该消息。也就是说 Push 模式内部也用了 Pull 消息的模式,这样就可以立即消费到最新的消息。
1.3 如何进行轮询?
那么 Push 模式或 Pull 模式如何进行消息的查询?
能够想到的比较笨的方法是,每隔一定的时间(如1ms)就向 Broker 发送一个查询请求,如果没有新消息则立刻返回。可想而知这种方法非常浪费网络资源。
RocketMQ 为了提高网络性能,在拉取消息时如果没有新消息,不会马上返回,而是会将该查询请求挂起一段时间,然后再重试查询。如果一直没有新消息,直到轮询时间超过设定的阈值才会返回。
根据轮询设定的超时阈值大小的不同,RocketMQ 有两种轮询方式,分别为长轮询(默认)和短轮询。
1.4 长轮询和短轮询
RocketMQ 的 Broker 端参数 longPollingEnable 可以配置轮询方式,默认为 true
- 短轮询:longPollingEnable=false,轮询时间为 shortPollingTimeMills ,默认为 1s
- 长轮询:longPollingEnable=true,轮询时间为 5s。拉取请求挂起时间:受 DefaultMQPullConsumer 的 brokerSuspendMaxTimeMillis 控制,默认push模式固定15s,pull模式固定20s。
二、源码分析
- 挂起拉取请求;
- 线程不断轮询判断该offset是否有新的消息达到;
- 调用PullMessageProcessor的executeRequestWhenWakeup方法向PullMessageExecutor拉取线程池提交任务;
- PullMessageProcessor线程处理任务交给processRequest方法进行处理;
- NotifyMessageArrivingListener监听消息到达;
1、挂起拉取请求
// 挂起拉取请求
// 根据主题名称 + 队列id, 获取 ManyPullRequest, 对于同一个 topic + 队列的拉取请求用 ManyPullRequest包装,
// 然后将 pullRequest 添加到 ManyPullRequest 中
public void suspendPullRequest(
final String topic,
final int queueId,
final PullRequest pullRequest) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (null == mpr) {
mpr = new ManyPullRequest();
ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
if (prev != null) {
mpr = prev;
}
}
mpr.addPullRequest(pullRequest);
}
private String buildKey(final String topic, final int queueId) {
StringBuilder sb = new StringBuilder(topic.length() + 5);
sb.append(topic);
sb.append(TOPIC_QUEUEID_SEPARATOR);
sb.append(queueId);
return sb.toString();
}
2、线程不断轮询判断该offset是否有新的消息达到
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
// 当前是否启用long polling,长轮询,一个请求过来了挂起,每次只挂起 5s,然后就去尝试拉取。
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000); // 长轮询一个等待周期是5s
}
// 如果不开启长轮询模式,则只挂起一次,挂起时间为 shortPollingTimeMills,然后去尝试查找消息
else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
// 检查当前挂起的请求,把拉取的topic+queue的最大offset查一下,通知一下新消息来了,你不用挂起了
// 你可以拿到消息走了
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}
// 你可以在线程体里进入等待的状态,还可以设置超时时间
protected void waitForRunning(long interval) {
// 需要进行等待的时候,必须把已经通过过这个标识从true修改为false
// 默认就是false,所以说一般这个逻辑是不跑的
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
// entry to wait
// 无论此时你的countDownLatch.value是多少,此时都需要复位成1,让我们可以去进行一个等待
waitPoint.reset();
try {
// 他可以等待一定的时间,可以去等待别人进行一个countDown操作
// 开始来等待别人,指定我们的一个超时时间
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false); // 他一定会把hasNotified设置为false
this.onWaitEnd();
}
}
protected void checkHoldRequest() {
for (String key : this.pullRequestTable.keySet()) {
String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
if (2 == kArray.length) {
String topic = kArray[0];
int queueId = Integer.parseInt(kArray[1]);
// 根据主题,消费队列ID查找队列的最大偏移量。
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
try {
// 根据该offset,判断是否有新的消息达到。
this.notifyMessageArriving(topic, queueId, offset);
} catch (Throwable e) {
log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
}
}
}
}
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
notifyMessageArriving(topic, queueId, maxOffset, null, 0, null, null);
}
/**
* 通知消息到达
*
* @param topic 主题名称
* @param queueId 队列id
* @param maxOffset 消费队列当前最大偏移量
* @param tagsCode 消息tag hashcode,基于tag消息过滤
* @param msgStoreTime 消息存储时间
* @param filterBitMap 过滤位图
* @param properties 消息属性,基于属性的消息过滤
*/
public void notifyMessageArriving(
final String topic,
final int queueId,
final long maxOffset, // 最大的offset已经到了哪儿去了
final Long tagsCode,
long msgStoreTime,
byte[] filterBitMap,
Map<String, String> properties) {
String key = this.buildKey(topic, queueId);
// 拿到当前挂起等待topic@queue里的消息
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
// 获取主题与队列的所有 PullRequest 并清除内部 pullRequest 集合,避免重复拉取
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
List<PullRequest> replayList = new ArrayList<PullRequest>();
for (PullRequest request : requestList) {
long newestOffset = maxOffset; // topic@queue里最新到达的消息最大的offset,新消息的offset
if (newestOffset <= request.getPullFromThisOffset()) { // 如果说最新的消息是小于等于拉取请求里要拉取的起始offset
// 查询消息存储组件里的topic@queue最大的offset,查询出来了以后设置为topic@queue里的最大offset
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(
topic, queueId
);
}
// 如果队列最大偏移量大于 pullFromThisOffset 说明有新的消息到达,
// 先简单对消息根据 tag,属性进行一次消息过滤,
// 如果 tag,属性为空,则消息过滤器会返回true,然后 executeRequestWhenWakeup进行消息拉取,结束长轮询
if (newestOffset > request.getPullFromThisOffset()) {
// 根据我们的消费队列判断一下是否匹配,新到达的消息tags是否跟拉取请求里的tags是匹配的
boolean match = request.getMessageFilter().isMatchedByConsumeQueue(
tagsCode,
new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap)
);
// match by bit map, need eval again when properties is not null.
if (match && properties != null) {
// 继续判断一下跟commitlog想比是否匹配
match = request.getMessageFilter().isMatchedByCommitLog(
null, properties);
}
if (match) {
try {
// 调用PullMessageProcessor进行消息拉取
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(
request.getClientChannel(),
request.getRequestCommand()
);
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
}
// 如果挂起时间超过 suspendTimeoutMillisLong,则超时,结束长轮询,调用executeRequestWhenWakeup 进行消息拉取,并返回结果到客户端
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(
request.getClientChannel(),
request.getRequestCommand()
);
} catch (Throwable e) {
log.error("execute request when wakeup failed.", e);
}
continue;
}
replayList.add(request);
}
if (!replayList.isEmpty()) {
// 如果待拉取偏移量大于消息消费队列最大偏移量,并且未超时,调用 mpr.addPullRequest(replayList) 将拉取任务重新放入,待下一次检测
mpr.addPullRequest(replayList);
}
}
}
}
3、调用PullMessageProcessor的executeRequestWhenWakeup方法向PullMessageExecutor拉取线程池提交任务
public void executeRequestWhenWakeup(final Channel channel,
final RemotingCommand request) throws RemotingCommandException {
Runnable run = new Runnable() {
@Override
public void run() {
try {
final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
if (response != null) {
response.setOpaque(request.getOpaque());
response.markResponseType();
try {
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
log.error("processRequestWrapper response to {} failed",
future.channel().remoteAddress(), future.cause());
log.error(request.toString());
log.error(response.toString());
}
}
});
} catch (Throwable e) {
log.error("processRequestWrapper process request over, but response failed", e);
log.error(request.toString());
log.error(response.toString());
}
}
} catch (RemotingCommandException e1) {
log.error("excuteRequestWhenWakeup run", e1);
}
}
};
// 最终将线程封装成一个RequestTask提交至线程池执行
this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request));
}
4、PullMessageProcessor线程处理任务交给processRequest方法进行处理
线程run方法逻辑,processRequest方法上文有分析过;
5、NotifyMessageArrivingListener监听消息到达
public class NotifyMessageArrivingListener implements MessageArrivingListener {
private final PullRequestHoldService pullRequestHoldService;
public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) {
this.pullRequestHoldService = pullRequestHoldService;
}
@Override
public void arriving(
String topic,
int queueId,
long logicOffset,
long tagsCode,
long msgStoreTime,
byte[] filterBitMap,
Map<String, String> properties) {
this.pullRequestHoldService.notifyMessageArriving(
topic,
queueId,
logicOffset,
tagsCode,
msgStoreTime,
filterBitMap,
properties
);
}
}
也就是说当provider消息推送到broker后,broker会通知NotifyMessageArrivingListener消息到达,从而及时的结束挂起的轮询的链接。
三、总结
要开启长轮询, 在 broker 配置文件中 longPollingEnable=true, 默认是开启的。
??消息拉取为了提高网络性能,在消息服务端根据拉取偏移量去物理文件查找消息时没有找到,并不立即返回消息未找到,而是会将该线程挂起一段时间,然后再次重试,直到重试。挂起分为长轮询或短轮询,在broker 端可以通过 longPollingEnable=true 来开启长轮询。
短轮询:longPollingEnable=false,第一次未拉取到消息后等待 shortPollingTimeMills时间后再试。shortPollingTimeMills默认为1S。
长轮询:longPollingEnable=true,会根据消费者端设置的挂起超时时间,受DefaultMQPullConsumer 的brokerSuspendMaxTimeMillis控制,默认20s,(brokerSuspendMaxTimeMillis),长轮询有两个线程来相互实现。
PullRequestHoldService:每隔5s重试一次。
DefaultMessageStore#ReputMessageService,每当有消息到达后,转发消息,然后调用PullRequestHoldService 线程中的拉取任务,尝试拉取,每处理一次,Thread.sleep(1), 继续下一次检查。
相关推荐
- 再说圆的面积-蒙特卡洛(蒙特卡洛方法求圆周率的matlab程序)
-
在微积分-圆的面积和周长(1)介绍微积分方法求解圆的面积,本文使用蒙特卡洛方法求解圆面积。...
- python创建分类器小结(pytorch分类数据集创建)
-
简介:分类是指利用数据的特性将其分成若干类型的过程。监督学习分类器就是用带标记的训练数据建立一个模型,然后对未知数据进行分类。...
- matplotlib——绘制散点图(matplotlib散点图颜色和图例)
-
绘制散点图不同条件(维度)之间的内在关联关系观察数据的离散聚合程度...
- python实现实时绘制数据(python如何绘制)
-
方法一importmatplotlib.pyplotaspltimportnumpyasnpimporttimefrommathimport*plt.ion()#...
- 简单学Python——matplotlib库3——绘制散点图
-
前面我们学习了用matplotlib绘制折线图,今天我们学习绘制散点图。其实简单的散点图与折线图的语法基本相同,只是作图函数由plot()变成了scatter()。下面就绘制一个散点图:import...
- 数据分析-相关性分析可视化(相关性分析数据处理)
-
前面介绍了相关性分析的原理、流程和常用的皮尔逊相关系数和斯皮尔曼相关系数,具体可以参考...
- 免费Python机器学习课程一:线性回归算法
-
学习线性回归的概念并从头开始在python中开发完整的线性回归算法最基本的机器学习算法必须是具有单个变量的线性回归算法。如今,可用的高级机器学习算法,库和技术如此之多,以至于线性回归似乎并不重要。但是...
- 用Python进行机器学习(2)之逻辑回归
-
前面介绍了线性回归,本次介绍的是逻辑回归。逻辑回归虽然名字里面带有“回归”两个字,但是它是一种分类算法,通常用于解决二分类问题,比如某个邮件是否是广告邮件,比如某个评价是否为正向的评价。逻辑回归也可以...
- 【Python机器学习系列】拟合和回归傻傻分不清?一文带你彻底搞懂
-
一、拟合和回归的区别拟合...
- 推荐2个十分好用的pandas数据探索分析神器
-
作者:俊欣来源:关于数据分析与可视化...
- 向量数据库:解锁大模型记忆的关键!选型指南+实战案例全解析
-
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在...
- 用Python进行机器学习(11)-主成分分析PCA
-
我们在机器学习中有时候需要处理很多个参数,但是这些参数有时候彼此之间是有着各种关系的,这个时候我们就会想:是否可以找到一种方式来降低参数的个数呢?这就是今天我们要介绍的主成分分析,英文是Princip...
- 神经网络基础深度解析:从感知机到反向传播
-
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在...
- Python实现基于机器学习的RFM模型
-
CDA数据分析师出品作者:CDALevelⅠ持证人岗位:数据分析师行业:大数据...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)
- vmware17pro最新密钥 (34)
- mysql单表最大数据量 (35)