仅此一招,再无消息乱序的烦恼 excel乱序排列
ztj100 2024-12-29 07:20 76 浏览 0 评论
1. 概览
RocketMQ 早已提供了一组最佳实践,但工作在一线的伙伴却很少知道,项目中的各种随性代码经常导致消息错乱问题,严重影响业务的准确性。为了保障最佳实践的落地,降低一线伙伴的使用成本,统一 MQ 使用规范,需要对其进行抽象和封装…
1.1. 背景
RocketMQ的最佳实践中推荐:一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。
在使用rocketMQTemplate发送消息时,通过设置发送方法的destination参数来设置消息的目的地,destination的格式为topicName:tagName,:前面表示topic的名称,后面表示tags名称,简单示例如下:
// 计算 destination
protected String createDestination(String topic, String tag) {
if (org.apache.commons.lang3.StringUtils.isNotEmpty(tag)){
return topic + ":" + tag;
}else {
return topic;
}
}
// 发送信息
String destination = createDestination(topic, tag);
SendResult sendResult = this.rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000);
tags从命名来看像是一个复数,但发送消息时,目的地只能指定一个topic下的一个tag,不能指定多个。
但,在消费消息时,就变的没那么方便了,简单示例如下:
@Service
@RocketMQMessageListener(
topic = "consumer-test-topic-1",
consumerGroup ="user-message-consumer-1",
selectorExpression = "*",
consumeMode = ConsumeMode.ORDERLY
)
@Slf4j
public class RocketBasedUserMessageConsumer extends UserMessageConsumer
implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String tag = message.getTags();
byte[] body = message.getBody();
log.info("handle msg body {}", new String(body));
switch (tag){
case "UserCreatedEvent":
UserEvents.UserCreatedEvent createdEvent = JSON.parseObject(body, UserEvents.UserCreatedEvent.class);
handle(createdEvent);
return;
case "UserEnableEvent":
UserEvents.UserEnableEvent enableEvent = JSON.parseObject(body, UserEvents.UserEnableEvent.class);
handle(enableEvent);
return;
case "UserDisableEvent":
UserEvents.UserDisableEvent disableEvent = JSON.parseObject(body, UserEvents.UserDisableEvent.class);
handle(disableEvent);
return;
case "UserDeletedEvent":
UserEvents.UserDeletedEvent deletedEvent = JSON.parseObject(body, UserEvents.UserDeletedEvent.class);
handle(deletedEvent);
return;
}
}
}
该方法有几个问题:
- tag 维护成本较高,RocketMQMessageListener 设置 selectorExpression 为 *,将拉取全部数据,增加通讯成本;如果使用 tag1 || tag2 方式,每次调整都需要对代码和配置进行更新,特别容易遗漏;
- 充斥大量模板代码,比如 case 分支,反序列化,调用业务方法等;
- API 具有侵入性,开发是需要关心 RocketMQ API,存在一定学习成本;
1.2. 目标
提供一种面向业务场景的,灵活进行业务扩展的模式,具有以下特征:
- Tag 和代码保持一致,不需要多处配置,新增逻辑自动完成 Tag 注册;
- 消除模板方法,类中只保留核心业务方法,框架完成 方法分发、消息反序列化等操作;
- 代码零侵入,仅使用注解,无需了解 RocketMQ API;
2. 快速入门
框架依赖 rocketmq-spring-boot-starter 完成消息发送和回收。
2.1. 环境准备
2.1.1. 增加依赖
首先,增加 rocketmq 相关依赖。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
然后,增加 lego starter。
<dependency>
<groupId>com.geekhalo.lego</groupId>
<artifactId>lego-starter</artifactId>
<version>0.1.13-tag_based_dispatcher_message_consumer-SNAPSHOT</version>
</dependency>
2.1.2. 增加配置
在 application.yml 文件中增加 rocketmq 配置。
rocketmq:
name-server: http://127.0.0.1:9876
producer:
group: rocket-demo
2.2. 定义消费者
定义消费者,只需:
- 在 Bean 上增加 @TagBasedDispatcherMessageConsumer 注解,并指定 topic 和 consumer
- 在 Bean 的方法上添加 @HandleTag 注解,并指定监听的 tag
示例如下:
@TagBasedDispatcherMessageConsumer(
topic = "consumer-test-topic",
consumer = "user-message-consumer"
)
public class UserMessageConsumer {
private final Map<Long, List<UserEvents.UserEvent>> events = Maps.newHashMap();
public void clean(){
this.events.clear();;
}
public List<UserEvents.UserEvent> getUserEvents(Long userId){
return this.events.get(userId);
}
@HandleTag("UserCreatedEvent")
public void handle(UserEvents.UserCreatedEvent userCreatedEvent){
List<UserEvents.UserEvent> userEvents = this.events.computeIfAbsent(userCreatedEvent.getUserId(), userId -> new ArrayList<>());
userEvents.add(userCreatedEvent);
}
@HandleTag("UserEnableEvent")
public void handle(UserEvents.UserEnableEvent userEnableEvent){
List<UserEvents.UserEvent> userEvents = this.events.computeIfAbsent(userEnableEvent.getUserId(), userId -> new ArrayList<>());
userEvents.add(userEnableEvent);
}
@HandleTag("UserDisableEvent")
public void handle(UserEvents.UserDisableEvent userDisableEvent){
List<UserEvents.UserEvent> userEvents = this.events.computeIfAbsent(userDisableEvent.getUserId(), userId -> new ArrayList<>());
userEvents.add(userDisableEvent);
}
@HandleTag("UserDeletedEvent")
public void handle(UserEvents.UserDeletedEvent userDeletedEvent){
List<UserEvents.UserEvent> userEvents = this.events.computeIfAbsent(userDeletedEvent.getUserId(), userId -> new ArrayList<>());
userEvents.add(userDeletedEvent);
}
}
2.3. 测试
编写测试用例如下:
@SpringBootTest(classes = DemoApplication.class)
@Slf4j
class UserMessageConsumerTest {
@Autowired
private UserMessageConsumer userMessageConsumer;
@Autowired
private RocketMQTemplate rocketMQTemplate;
private List<Long> userIds;
@BeforeEach
void setUp() throws InterruptedException {
this.userMessageConsumer.clean();
this.userIds = new ArrayList<>();
for (int i = 0; i< 100; i++){
userIds.add(10000L + i);
}
this.userIds.forEach(userId -> sendMessage(userId));
TimeUnit.SECONDS.sleep(3);
}
private void sendMessage(Long userId) {
String topic = "consumer-test-topic";
{
String tag = "UserCreatedEvent";
UserEvents.UserCreatedEvent userCreatedEvent = new UserEvents.UserCreatedEvent();
userCreatedEvent.setUserId(userId);
userCreatedEvent.setUserName("Name-" + userId);
sendOrderlyMessage(topic, tag, userCreatedEvent);
}
{
String tag = "UserEnableEvent";
UserEvents.UserEnableEvent userEnableEvent = new UserEvents.UserEnableEvent();
userEnableEvent.setUserId(userId);
userEnableEvent.setUserName("Name-" + userId);
sendOrderlyMessage(topic, tag, userEnableEvent);
}
{
String tag = "UserDisableEvent";
UserEvents.UserDisableEvent userDisableEvent = new UserEvents.UserDisableEvent();
userDisableEvent.setUserId(userId);
userDisableEvent.setUserName("Name-" + userId);
sendOrderlyMessage(topic, tag, userDisableEvent);
}
{
String tag = "UserDeletedEvent";
UserEvents.UserDeletedEvent userDeletedEvent = new UserEvents.UserDeletedEvent();
userDeletedEvent.setUserId(userId);
userDeletedEvent.setUserName("Name-" + userId);
sendOrderlyMessage(topic, tag, userDeletedEvent);
}
}
private void sendOrderlyMessage(String topic, String tag, UserEvents.UserEvent event) {
String shardingKey = String.valueOf(event.getUserId());
String json = JSON.toJSONString(event);
Message<String> msg = MessageBuilder
.withPayload(json)
.build();
String destination = createDestination(topic, tag);
SendResult sendResult = this.rocketMQTemplate.syncSendOrderly(destination, msg, shardingKey, 2000);
log.info("Send result is {} for msg", sendResult, msg);
}
protected String createDestination(String topic, String tag) {
if (org.apache.commons.lang3.StringUtils.isNotEmpty(tag)){
return topic + ":" + tag;
}else {
return topic;
}
}
@AfterEach
void tearDown() {
}
@Test
void getUserEvents() {
this.userIds.forEach(userId ->{
List<UserEvents.UserEvent> userEvents = this.userMessageConsumer.getUserEvents(userId);
Assertions.assertEquals(4, userEvents.size());
Assertions.assertTrue(userEvents.get(0) instanceof UserEvents.UserCreatedEvent);
Assertions.assertTrue(userEvents.get(1) instanceof UserEvents.UserEnableEvent);
Assertions.assertTrue(userEvents.get(2) instanceof UserEvents.UserDisableEvent);
Assertions.assertTrue(userEvents.get(3) instanceof UserEvents.UserDeletedEvent);
});
}
}
启动时,可以看到如下日志:
TagBasedDispatcherConsumerContainer : success to subscribe http://127.0.0.1:9876, topic consumer-test-topic, tag UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent, group user-message-consumer
从日志上可以看出,框架以组 group user-message-consumer 创建 Consumer,并订阅 consumer-test-topic 的 UserCreatedEvent||UserEnableEvent||UserDeletedEvent||UserDisableEvent 等 Tag,初始化流程符合预期。
测试逻辑比较简单,逻辑如下:
- 创建 100 个用户
- 每个用户创建并依次发布领域事件,UserCreatedEvent、UserEnableEvent、UserDisableEvent、UserDeletedEvent
- 消费发送完成后,停顿 3 秒
- 依次检测每个用户收到的消息,并对顺序进行检测
观察日志,可以看到发送和消费日志交替出现:
UserMessageConsumerTest : Send result is SendResult [sendStatus=SEND_OK, msgId=2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4900FD, offsetMsgId=C0A8010A00002A9F00000000056077FB, messageQueue=MessageQueue [topic=consumer-test-topic, brokerName=bogon, queueId=2], queueOffset=1121] for msg
TagBasedDispatcherConsumerContainer : consume 2408820718EADE005827F0B9E9D4D6D9B98158644D467D38DE4700FC cost: 0 ms
用例通过,运行结果符合预期。
3. 设计&扩展
3.1. 初始化流程
image
框架初始化流程如下:
- TagBasedDispatcherConsumerContainerRegistry 实现 Spring 的 BeanPostProcessor 接口,依次对托管 bean 进行处理;
- 如果 Bean 上存在 @TagBasedDispatcherMessageConsumer 注解,便会提取配置信息,构建 TagBasedDispatcherConsumerContainer 实例
- TagBasedDispatcherConsumerContainer 收集方法上的 @HandleTag 注解,结合 @TagBasedDispatcherMessageConsumer 上的 topic、consumer 等信息构建 DefaultMQPushConsumer 并完成 topic 和 tag 的订阅
- TagBasedDispatcherConsumerContainer 内部会构建 tag 与 method 的映射关系,以对指定tag进行处理;
3.2. 运行流程
image
运行流程如下:
- 消息发送者将消息发送至 MQ;
- MQ 将消息发送至 Consumer;
- Consumer 收到消息后,根据 tag 对消息进行分发;
- 处理器对消息进行反序列化,获取调用参数,然后调用方法执行业务逻辑;
4. 项目信息
项目仓库地址:https://gitee.com/litao851025/lego
项目文档地址:https://gitee.com/litao851025/lego/wikis/support/TagBasedDispatcherMessageConsumer
相关推荐
- Python 操作excel的坑__真实的行和列
-
大佬给的建议__如何快速处理excelopenpyxl库操作excel的时候,单个表的数据量大一些处理速度还能接受,如果涉及多个表甚至多个excel文件的时候速度会很慢,还是建议用pandas来处理,...
- Python os.path模块使用指南:轻松处理文件路径
-
前言在Python编程中,文件和目录的操作是非常重要的一部分。为了方便用户进行文件和目录的操作,Python标准库提供了os模块。其中,os.path子模块提供了一些处理文件路径的函数和方法。本文主要...
- Python常用内置模块介绍——文件与系统操作详解
-
Python提供了多个强大的内置模块用于文件和系统操作,下面我将详细介绍最常用的几个模块及其核心功能。1.os模块-操作系统交互...
- Python Flask 建站框架实操教程(flask框架网页)
-
下面我将带您从零开始构建一个完整的Flask网站,包含用户认证、数据库操作和前端模板等核心功能。##第一部分:基础项目搭建###1.创建项目环境```bash...
- 为你的python程序上锁:软件序列号生成器
-
序列号很多同学可能开发了非常多的程序了,并且进行了...
- PO设计模式全攻略,在 UI 自动化中的实践总结(以企业微信为例)
-
一、什么是PO设计模式?PO(PageObject)设计模式将某个页面的所有元素对象定位和对元素对象的操作封装成一个Page类,即一个py文件,并以页面为单位来写测试用例,实现页面对象和测试用例的...
- 这种小工具居然也能在某鱼卖钱?我用Python一天能写...
-
前两天在某鱼闲逛,本来想找个二手机械键盘,结果刷着刷着突然看到有人在卖——Word批量转PDF小工具...
- python打包成exe,程序有图标,但是任务栏和窗口都没有显示图标
-
代码中指定图标信息#设置应用ID,确保任务栏图标正确显示ifsys.platform=="win32":importctypesapp_id=...
- 使用Python构建电影推荐系统(用python做推荐系统)
-
在日常数据挖掘工作中,除了会涉及到使用Python处理分类或预测任务,有时候还会涉及推荐系统相关任务。...
- python爬取并分析淘宝商品信息(python爬取淘宝商品数据)
-
python爬取并分析淘宝商品信息背景介绍一、模拟登陆二、爬取商品信息1.定义相关参数2.分析并定义正则3.数据爬取三、简单数据分析1.导入库2.中文显示3.读取数据4.分析价格分布5.分析销售...
- OpenCV入门学习基础教程(从小白变大神)
-
Opencv是用于快速处理图像处理、计算机视觉问题的工具,支持多种语言进行开发如c++、python、java等,下面这篇文章主要给大家介绍了关于openCV入门学习基础教程的相关资料,需要的朋友可以...
- python图像处理-一行代码实现灰度图抠图
-
抠图是ps的最基本技能,利用python可以实现用一行代码实现灰度图抠图。基础算法是...
- 从头开始学python:如何用Matplotlib绘图表
-
Matplotlib是一个用于绘制图表的库。如果你有用过python处理数据,那Matplotlib可以更直观的帮你把数据展示出来。直接上代码看例子:importmatplotlib.pyplot...
- Python爬取爱奇艺腾讯视频 250,000 条数据分析为什么李诞不值得了
-
在《Python爬取爱奇艺52432条数据分析谁才是《奇葩说》的焦点人物?》这篇文章中,我们从爱奇艺爬取了5万多条评论数据,并对一些关键数据进行了分析,由此总结出了一些明面上看不到的数据,并...
- Python Matplotlib 库使用基本指南
-
简介Matplotlib是一个广泛使用的Python数据可视化库,它可以创建各种类型的图表、图形和可视化效果。无论是简单的折线图还是复杂的热力图,Matplotlib提供了丰富的功能来满足我们...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- Python 操作excel的坑__真实的行和列
- Python os.path模块使用指南:轻松处理文件路径
- Python常用内置模块介绍——文件与系统操作详解
- Python Flask 建站框架实操教程(flask框架网页)
- 为你的python程序上锁:软件序列号生成器
- PO设计模式全攻略,在 UI 自动化中的实践总结(以企业微信为例)
- 这种小工具居然也能在某鱼卖钱?我用Python一天能写...
- python打包成exe,程序有图标,但是任务栏和窗口都没有显示图标
- 使用Python构建电影推荐系统(用python做推荐系统)
- python爬取并分析淘宝商品信息(python爬取淘宝商品数据)
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)
- vmware17pro最新密钥 (34)
- mysql单表最大数据量 (35)