Spring Boot + MySQL + Canal 数据同步方案(Spring Boot Starter 实现)
ztj100 2025-04-02 00:36 15 浏览 0 评论
在微服务架构下,数据同步是一个常见的需求,尤其是当需要将一个 MySQL 数据库中的数据同步到其他数据库或系统时。今天,我们将一起学习如何通过 Spring Boot Starter 集成 Canal 实现数据同步,从而简化配置和开发。
为什么选择 Canal?
Canal 是阿里巴巴开源的一款基于 MySQL Binlog 的增量订阅与消费组件。它可以实时捕获 MySQL 的数据变化,并将变动的数据推送到消费者端,适用于数据同步、增量备份等场景。相比传统的定时任务拉取数据,Canal 提供了更高效、实时的数据同步方案。
架构设计
整个数据同步流程大致可以分为三个部分:MySQL、Canal、Spring Boot。MySQL 用于存储数据,Canal 监听 MySQL 的 Binlog,将数据变动捕捉并传输到 Spring Boot 服务,Spring Boot 服务处理同步逻辑。
架构图
配置与实现
MySQL 配置
首先,确保你的 MySQL 启用了 binlog(二进制日志)。打开 MySQL 配置文件 my.cnf,添加或修改如下配置:
[mysqld]
log-bin=mysql-bin
binlog-format=row
server-id=1
这将使 MySQL 记录所有数据变更,并且 Canal 将从这些日志中获取增量数据。
Canal 配置
接下来,下载并配置 Canal。在 Canal GitHub 上有相关的安装文档。假设你已经成功启动 Canal 服务,配置文件 instance.properties 中需要设置 MySQL 的连接信息:
canal.instance.master.address=localhost:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=root_password
canal.instance.destination=example
这配置了 Canal 连接到 MySQL 的信息。
Spring Boot Starter 实现
在 Spring Boot 项目中,我们可以通过自定义 Spring Boot Starter 来封装 Canal 客户端,使得使用起来更简单。
创建 Canal Starter
首先,创建一个新的 Maven 工程作为 Starter:
com.alibaba
canal.client
1.1.4
org.springframework.boot
spring-boot-starter
编写 Canal 客户端配置类
创建 CanalClientConfig 类,来封装 Canal 客户端的配置和初始化:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.impl.CanalConnectorImpl;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CanalClientConfig {
@Bean
public CanalConnector canalConnector() {
CanalConnector connector = CanalConnectorImpl.newCanalConnector("localhost", 11111, "example", "canal_client");
connector.connect();
connector.subscribe(".*\\..*"); // 订阅所有数据库的所有表
return connector;
}
}
编写数据同步服务
接着,创建一个服务类 CanalSyncService 来处理从 Canal 获取的数据,并同步到目标数据库:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class CanalSyncService {
@Autowired
private CanalConnector canalConnector;
public void startSync() {
while (true) {
// 获取 binlog 数据
Message message = canalConnector.getWithoutAck(100);
if (message != null && !message.getEntries().isEmpty()) {
for (Entry entry : message.getEntries()) {
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
// 处理变动的数据
handleRowChange(rowChange);
}
}
}
}
}
private void handleRowChange(RowChange rowChange) {
// 处理数据逻辑,比如同步到另一个数据库
for (RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == EventType.INSERT) {
// 插入数据
insertDataToTarget(rowData);
} else if (rowChange.getEventType() == EventType.UPDATE) {
// 更新数据
updateDataToTarget(rowData);
} else if (rowChange.getEventType() == EventType.DELETE) {
// 删除数据
deleteDataToTarget(rowData);
}
}
}
}
启动应用
在 Spring Boot 启动类中,调用 CanalSyncService 启动数据同步:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application implements CommandLineRunner {
@Autowired
private CanalSyncService canalSyncService;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception {
canalSyncService.startSync();
}
}
数据同步
通过 Canal 捕获到的增量数据(如插入、更新、删除)会进入 handleRowChange 方法。在此方法中,你可以根据需要将这些数据同步到目标系统或数据库。
总结与优化建议
通过 Spring Boot Starter 集成 Canal,我们可以实现更加简洁、灵活的数据同步方案。整个过程将 Canal 的连接、数据处理等封装为一个启动器,减少了配置和开发的复杂度。
在实际应用中,我们还可以根据业务需求进行更多优化:
- 数据过滤:根据需要过滤不必要的数据变动,减少同步负担。
- 错误处理:处理 Canal 客户端连接失败、数据格式错误等异常。
- 数据重试机制:对于同步失败的数据,可以设计重试机制,确保数据一致性。
相关推荐
- Linux日志相关命令—查看\关键词查询\截取\日志压缩备份
-
一、查看1、动态日志查看。说明:程序启动可以动态查看运行日志。...
- Dify+微信智能生态:手把手教你搭建私有化客服解决方案
-
一、Dify简介1.为什么要使用Dify?零代码门槛:通过直观界面和预设模板,非技术人员也能快速创建智能助手1。模型生态丰富...
- 小白入门必知必会-RocketMQ安装(rocketmq下载安装)
-
一RocketMQ基础1.1介绍MQ是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程API的软件系统。RocketMQ是使用Java语言开发的一款MQ产品,...
- Linux搭建Weblogic集群(linux weblogic安装与配置图文详解)
-
一、Java环境配置拿到新申请的两台Linux机器后,首先需要安装JDK,下载地址请戳这里。1、执行命令:rpm-qa|grepjava,查询是否存在系统自带的openjdk。如果为空,...
- 从0开始,让你的Spring Boot项目跑在Linux服务器
-
1搭建Linux服务器1.1购买阿里云服务器或安装虚拟机这里建议是CentOS7.X或CentOS8.X,当然其他的Linux如deepin、Ubuntu也可以,只是软件环境的安装包和安装方式...
- 搞定这8个Kafka生产级容量评估,每日10亿+请求轻松拿捏
-
本篇文章通过场景驱动的方式来深度剖析Kafka生产级容量评估方案如何分析,申请和实施。...
- 还在用nohup启动java jar服务?试试强大的systemctl吧
-
nohup直译过来就是不挂断,要运行后台中的nohup命令,添加&(表示“and”的符号)到命令的尾部,使用nohup启动的例子:nohup/usr/bin/java-jar/dat...
- prometheus、exporter和grafana的简单使用
-
一、基本介绍1、prometheusPrometheus(普罗米修斯)是一套开源的监控&报警&时间序列数据库的组合,由SoundCloud公司开发。...
- Mock工具之Moco使用教程(mock 工具)
-
目录一、什么是Moco二、安装&配置...
- SpringBoot入门系列(三十)Spring Boot项目打包、发布与部署
-
今天介绍SpringBoot项目是如何打包、发布的。SpringBoot使用了内嵌容器,因此它的部署方式也变得非常简单灵活,一方面可以将SpringBoot项目打包成独立的jar或者war包来运...
- chatgpt-on-wechat:智能对话的全新可能与开源魅力
-
简介chatgpt-on-wechat(简称CoW)项目是基于大模型的智能对话机器人,支持微信公众号、企业微信应用、飞书、钉钉接入,可选择GPT3.5/GPT4.0/Claude/Gemini/Lin...
- 免费快速实现内网穿透:windows远程桌面连接实战
-
本次介绍使用frp实现内网穿透,文章是之前写的,写了很多篇,后续会一一整理出来,希望帮助到有需要的朋友。frp简介...
- 手摸手教你 CentOS 入门必备基础知识(建议收藏)
-
这里记录一下我的CentOS学习过程,相当于自己记个笔记,同时分享出来,如果有同学刚好有需要而这个文章帮助到了你的话,在下也会十分开心。文章最后推介了几个免费视频,B站和慕课上的免费学习视频挺多...
- 分布式数据库基础性能测试(分布式数据库实验)
-
最近对原生HTAP(cockroachdb和tidb)数据库同时以数仓为起点做HTAP数据库的greenplum进行了相关场景性能测试,场景分为OLTP的TPC-C和TPC-B测试以及简单的OL...
-
- 使用X11VNC远程连接统信UOS(vnc 远程连接)
-
原文链接:使用X11VNC远程连接统信UOS...
-
2025-05-02 14:51 ztj100
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- Linux日志相关命令—查看\关键词查询\截取\日志压缩备份
- Dify+微信智能生态:手把手教你搭建私有化客服解决方案
- 小白入门必知必会-RocketMQ安装(rocketmq下载安装)
- Linux搭建Weblogic集群(linux weblogic安装与配置图文详解)
- 从0开始,让你的Spring Boot项目跑在Linux服务器
- 搞定这8个Kafka生产级容量评估,每日10亿+请求轻松拿捏
- 还在用nohup启动java jar服务?试试强大的systemctl吧
- prometheus、exporter和grafana的简单使用
- Mock工具之Moco使用教程(mock 工具)
- SpringBoot入门系列(三十)Spring Boot项目打包、发布与部署
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- node卸载 (33)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- exceptionininitializererror (33)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)