百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术分类 > 正文

SpringBoot 整合 Flink 实时同步 MySQL

ztj100 2025-04-02 00:36 61 浏览 0 评论

1、需求

在 Flink 发布SpringBoot 打包的 jar 包能够实时同步 MySQL 表,做到原表进行新增、修改、删除的时候目标表都能对应同步。

2、设计

  1. 在 SpringBoot 用 Java 做业务代码的开发;
  2. 基于Flink CDC 用 FlinkSQL 做 Mysql 实时同步处理;
  3. 打包成 jar 包后用 Flink 运行并管理。

3、环境要求


MySQL

8.*

Flink

1.16.2

Flink CDC

2.3.0

Java

8

SpringBoot

2.7.12

3、代码实现

3.1、pom 文件

pom 文件可以整个复制过来,自己打包运行可能会遇到各种各样的错,可以直接全部复制。

pom 中的 mainClass 一定要替换成自己的



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.7.12
         
    
    com.usoten
    processminingreport
    0.0.1-SNAPSHOT
    ProcessMiningReport
    ProcessMiningReport
    
        1.8
        1.16.2
        2.3.0
    
    
        
            org.springframework.boot
            spring-boot-starter-web
           
        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
            com.mysql
            mysql-connector-j
        

        
        
            org.apache.flink
            flink-java
            ${flink.version}
            
                
                    org.slf4j
                    slf4j-api
                
            
        
        
            org.apache.flink
            flink-clients
            ${flink.version}
        
        
            org.apache.flink
            flink-streaming-java
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-java-bridge
            ${flink.version}
        
        
            org.apache.flink
            flink-table-planner-loader
            ${flink.version}
            provided
        
        
            org.apache.flink
            flink-table-runtime
            ${flink.version}
        
        
        
            org.apache.flink
            flink-runtime-web
            ${flink.version}
            test
        
        
            org.apache.flink
            flink-connector-base
            ${flink.version}
        

        
            org.apache.flink
            flink-connector-jdbc
            ${flink.version}
        

        
            org.apache.flink
            flink-connector-files
            ${flink.version}
        

        
        
            com.ververica
            flink-sql-connector-mysql-cdc
            ${flink-cdc.version}
        

    

    
        
            
                org.apache.maven.plugins
                maven-shade-plugin
                
                    true
                    true
                    
                        
                            *:*
                            
                                META-INF/*.SF
                                META-INF/*.DSA
                                META-INF/*.RSA
                            
                        
                    
                
                
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                    META-INF/spring.handlers
                                
                                
                                    META-INF/spring.schemas
                                
                                
                                    META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
                                
                                
                                    META-INF/spring/org.springframework.boot.actuate.autoconfigure.web.ManagementContextConfiguration.imports
                                
                                
                                    META-INF/spring.factories
                                
                                
                                
                                    
                                    com.usoten.processminingcdcmanager.ProcessMiningCdcManagerApplication
                                
                            
                        
                    
                
            
        
    

3.2、代码实现

需要将原始表同步到目标表,这里我们需要执行三个 SQL,所以入参用 List。具体 SQL 见后面 postman 调用时的例子。

启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProcessMiningCdcManagerApplication {

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(ProcessMiningCdcManagerApplication.class, args);

        while (true){
            Thread.sleep(30000);
        }
    }

}

controller层

import com.usoten.processminingcdcmanager.service.CdcBaseService;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

@RestController

@RequestMapping("/datasource")

public class CdcBaseController {

    private CdcBaseService cdcBaseService;

    public CdcBaseController(CdcBaseService cdcBaseService) {
        this.cdcBaseService = cdcBaseService;
    }

    @PostMapping("/cdc/executeSql")
    public void getColumnMetadata(@RequestBody List sqlList) {

        cdcBaseService.executeSql(sqlList);
    }
}

service层

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.RequestBody;

import java.util.List;

@Service
public class CdcBaseService {
    public void executeSql(@RequestBody List sqlList) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(3000);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        sqlList.forEach(sql -> {
                tEnv.executeSql(sql);
        });

    }
}

4、打包

执行 mvn clean package 打包,并将包放进 Flink 的 lib 目录下

5、运行 jar

nohup ./bin/flink run --class com.usoten.processminingcdcmanager.ProcessMiningCdcManagerApplication ./lib/processminingreport-0.0.1-SNAPSHOT.jar &
  • 报错1
20:30:59.960 [http-nio-8100-exec-1] ERROR o.a.c.c.C.[.[.[.[dispatcherServlet] - [log,175] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.IllegalStateException: Unable to instantiate java compiler] with root cause
java.lang.ClassCastException: org.codehaus.janino.CompilerFactory cannot be cast to org.codehaus.commons.compiler.ICompilerFactory
	at org.codehaus.commons.compiler.CompilerFactoryFactory.getCompilerFactory(CompilerFactoryFactory.java:129)
	at org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory(CompilerFactoryFactory.java:79)
	at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile(JaninoRelMetadataProvider.java:426)

这是因为Flink 把客户端的ClassLoader解析顺序调整为了Child优先,这就导致用户的Jar包不能包含Flink框架的classes,比如常见的Calcite、Flink-Planner依赖、Hive依赖等等。用户需要把有冲突classes的jar放到flink-home/lib下,或者调整策略为Parent优先,这里直接调整为 Parent 优先,$
FLINK_HOME/conf/flink-conf.yaml中 添加

classloader.resolve-order: parent-first
  • 报错2

如果是报下面的错,是因为main方法停止了,解决的话可以看前面的启动类加一个死循环

重新启动 Flink 再次运行 jar,出现下面日志即可

6、准备数据

  • 建一个 test 数据库
  • 建两张表 原表:test 目标表:test1
  • 先在 test 表中造几条数据
  • CREATE TABLE `test` (
    `id` int NOT NULL,
    `username` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
    `password` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
    CREATE TABLE `test` (
    `id` int NOT NULL,
    `username` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
    `password` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
  • mysql 需要开启 binlog,并且表有主键

7、发送请求

localhost:8100/datasource/cdc/executeSql

body

[
    "CREATE TABLE mysql_source ( id INT, username STRING,  password STRING,PRIMARY KEY(id) NOT ENFORCED) WITH ( 'connector' = 'mysql-cdc', 'table-name' = 'test', 'hostname' = 'localhost', 'database-name' = 'test', 'port' = '3306', 'username' = 'root', 'password' = '12345678', 'scan.startup.mode' = 'initial')",
    "CREATE TABLE oceanbase_sink ( id INT NOT NULL, username STRING,  password STRING, PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/test', 'table-name' = 'test1', 'username' = 'root', 'password' = '12345678')",
    "insert into oceanbase_sink select id,username,password from mysql_source"
]

请求响应成功过后,我们进入 Flink 页面就可以看到运行的任务,此任务会一直运行,监听并同步 MySQL。

验证

此时我们会发现test表中的数据会同步test1表中,然后u对test表做新增、修改、删除操作时,test1表都会做相应变化


Flink 入门系列入口:

第1章 Flink 基础概念

第3章 Flink的运行架构

第4章 Flink 基础API 三:转换算子(Transformation)

.....

更多内容持续更新,点关注、不迷路。。。

相关推荐

Linux集群自动化监控系统Zabbix集群搭建到实战

自动化监控系统...

systemd是什么如何使用_systemd/system

systemd是什么如何使用简介Systemd是一个在现代Linux发行版中广泛使用的系统和服务管理器。它负责启动系统并管理系统中运行的服务和进程。使用管理服务systemd可以用来启动、停止、...

Linux服务器日常巡检脚本分享_linux服务器监控脚本

Linux系统日常巡检脚本,巡检内容包含了,磁盘,...

7,MySQL管理员用户管理_mysql 管理员用户

一、首次设置密码1.初始化时设置(推荐)mysqld--initialize--user=mysql--datadir=/data/3306/data--basedir=/usr/local...

Python数据库编程教程:第 1 章 数据库基础与 Python 连接入门

1.1数据库的核心概念在开始Python数据库编程之前,我们需要先理解几个核心概念。数据库(Database)是按照数据结构来组织、存储和管理数据的仓库,它就像一个电子化的文件柜,能让我们高效...

Linux自定义开机自启动服务脚本_linux添加开机自启动脚本

设置WGCloud开机自动启动服务init.d目录下新建脚本在/etc/rc.d/init.d新建启动脚本wgcloudstart.sh,内容如下...

linux系统启动流程和服务管理,带你进去系统的世界

Linux启动流程Rhel6启动过程:开机自检bios-->MBR引导-->GRUB菜单-->加载内核-->init进程初始化Rhel7启动过程:开机自检BIOS-->M...

CentOS7系统如何修改主机名_centos更改主机名称

请关注本头条号,每天坚持更新原创干货技术文章。如需学习视频,请在微信搜索公众号“智传网优”直接开始自助视频学习1.前言本文将讲解CentOS7系统如何修改主机名。...

前端工程师需要熟悉的Linux服务器(SSH 终端操作)指令

在Linux服务器管理中,SSH(SecureShell)是远程操作的核心工具。以下是SSH终端操作的常用命令和技巧,涵盖连接、文件操作、系统管理等场景:一、SSH连接服务器1.基本连接...

Linux开机自启服务完全指南:3步搞定系统服务管理器配置

为什么需要配置开机自启?想象一下:电商服务器重启后,MySQL和Nginx没自动启动,整个网站瘫痪!这就是为什么开机自启是Linux运维的必备技能。自启服务能确保核心程序在系统启动时自动运行,避免人工...

Kubernetes 高可用(HA)集群部署指南

Kubernetes高可用(HA)集群部署指南本指南涵盖从概念理解、架构选择,到kubeadm高可用部署、生产优化、监控备份和运维的全流程,适用于希望搭建稳定、生产级Kubernetes集群...

Linux项目开发,你必须了解Systemd服务!

1.Systemd简介...

Linux系统systemd服务管理工具使用技巧

简介:在Linux系统里,systemd就像是所有进程的“源头”,它可是系统中PID值为1的进程哟。systemd其实是一堆工具的组合,它的作用可不止是启动操作系统这么简单,像后台服务...

Red Hat Enterprise Linux 10 安装 Kubernetes (K8s) 集群及高级管理

一、前言...

Linux下NetworkManager和network的和平共处

简介我们在使用CentoOS系统时偶尔会遇到配置都正确但network启动不了的问题,这问题经常是由NetworkManager引起的,关闭NetworkManage并取消开机启动network就能正...

取消回复欢迎 发表评论: