从 MySQL 实时同步
StarRocks 支持多种方式将 MySQL 的数据实时同步至 StarRocks,支撑实时分析和处理海量数据的需求。
本文介绍如何将 MySQL 的数据通过 Apache Flink® 实时(秒级)同步至 StarRocks。
注意
导入操作需要目标表的 INSERT 权限。如果您的用户账号没有 INSERT 权限,请参考 GRANT 给用户赋权。
基本原理
从 MySQL 同步至 Flink 需要使用 Flink CDC,本文使用 Flink CDC 的版本小于 3.0,因此需要借助 SMT 同步表结构。 然而如果使用 Flink CDC 3.0,则无需借助 SMT,即可将表结构同步至 StarRocks,甚至可以同步整个 MySQL 数据库、分库分表的结构,同时也支持同步 schema change。具体的使用方式,参见从 MySQL 到 StarRocks 的流式 ELT 管道。
将 MySQL 的数据通过 Flink 同步至 StarRocks 分成同步库表结构、同步数据两个阶段进行。首先 StarRocks Migration Tool (数据迁移工具,以下简称 SMT) 将 MySQL 的库表结构转化成 StarRocks 的建库和建表语句。然后 Flink 集群运行 Flink job,同步 MySQL 全量及增量数据至 StarRocks。具体同步流程如下:
该同步流程能够保证端到端的 exactly-once 的语义一致性。
-
同步库表结构
SMT 根据其配置文件中源 MySQL 和目标 StarRocks 的信息,读取 MySQL 中待同步的库表结构,并生成 SQL 文件,用于在 StarRocks 内创建对应的目标库表。
-
同步数据
Flink SQL 客户端执行导入数据的 SQL 语句(
INSERT INTO SELECT
语句),向 Flink 集群提交一个或者多个长时间运行的 Flink job。Flink集群运行 Flink job ,Flink cdc connector 先读取数据库的历史全量数据,然后无缝切换到增量读取,并且发给 flink-connector-starrocks,最后 flink-connector-starrocks 攒微批数据同步至 StarRocks。信息仅支持同步 DML,不支持同步 DDL。
业务场景
以商品累计销量实时榜单为例,存储在 MySQL 中的原始订单表,通过 Flink 处理计算出产品销量的实时排行,并实时同步至 StarRocks 的主键表中。最终用户可以通过可视化工具连接StarRocks查看到实时刷新的榜单。
准备工作
下载并安装同步工具
同步时需要使用 SMT、 Flink、Flink CDC connector、flink-connector-starrocks,下载和安装步骤如下:
-
下载、安装并启动 Flink 集群。
说明:下载和安装方式也可以参考 Flink 官方文档。
-
您需要提前在操作系统中安装 Java 8 或者 Java 11,以正常运行 Flink。您可以通过以下命令 来检查已经安装的 Java 版本。
# 查看java版本
java -version
# 如下显示已经安装 java 8
openjdk version "1.8.0_322"
OpenJDK Runtime Environment (Temurin)(build 1.8.0_322-b06)
OpenJDK 64-Bit Server VM (Temurin)(build 25.322-b06, mixed mode) -
下载并解压 Flink。本示例使用 Flink 1.14.5。
说明:推荐使用 1.14 及以上版本,最低支持 1.11 版本。
# 下载 Flink
wget https://archive.apache.org/dist/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
# 解压 Flink
tar -xzf flink-1.14.5-bin-scala_2.11.tgz
# 进入 Flink 目录
cd flink-1.14.5 -
启动 Flink 集群。
# 启动 Flink 集群
./bin/start-cluster.sh
# 返回如下信息,表示成功启动 flink 集群
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.
-
-
下载 Flink CDC connector。本示例的数据源为 MySQL,因此下载 flink-sql-connector-mysql-cdc-x.x.x.jar。并且版本需支持对应的 Flink 版本,两者版本支持度,请参见 Supported Flink Versions。由于本文使用 Flink 1.14.5,因此可以使用 flink-sql-connector-mysql-cdc-2.2.0.jar。
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.0/flink-sql-connector-mysql-cdc-2.2.0.jar
-
下载 flink-connector-starrocks,并且其版本需要对应 Flink 的版本。
flink-connector-starrocks 的 JAR 包 (x.x.x_flink-y.yy_z.zz.jar) 会包含三个版本号:
-
第一个版本号 x.x.x 为 flink-connector-starrocks 的版本号。
-
第二个版本号 y.yy 为其支持的 Flink 版本号。
-
第三个版本号 z.zz 为 Flink 支持的 Scala 版本号。如果 Flink 为 1.14.x 以及之前版本,则需要下载带有 Scala 版本号的 flink-connector-starrocks。
由于本文使用 Flink 版本号 1.14.5,Scala 版本号 2.11,因此可以下载 flink-connector-starrocks JAR 包 1.2.3_flink-1.14_2.11.jar。
-
-
将 Flink CDC connector、Flink-connector-starrocks 的 JAR 包 flink-sql-connector-mysql-cdc-2.2.0.jar、1.2.3_flink-1.14_2.11.jar 移动至 Flink 的 lib 目录。
注意
如果 Flink 已经处于运行状态中,则需要先停止 Flink,然后重启 Flink ,以加载并生效 JAR 包。
./bin/stop-cluster.sh
./bin/start-cluster.sh -
下载并解压 SMT,并将其放在 flink-1.14.5 目录下。您可以根据操作系统和 CPU 架构选择对应的 SMT 安装包。
# 适用于 Linux x86
wget https://releases.starrocks.io/resources/smt.tar.gz
# 适用于 macOS ARM64
wget https://releases.starrocks.io/resources/smt_darwin_arm64.tar.gz
开启 MySQL Binlog 日志
您需要确保已经开启 MySQL Binlog 日志,实时同步时需要读取 MySQL Binlog 日志数据,解析并同步至 StarRocks。
-
编辑 MySQL 配置文件 my.cnf(默认路径为 /etc/my.cnf),以开启 MySQL Binlog。
# 开启 Binlog 日志
log_bin = ON
# 设置 Binlog 的存储位置
log_bin =/var/lib/mysql/mysql-bin
# 设置 server_id
# 在 MySQL 5.7.3 及以后版本,如果没有 server_id,那么设置 binlog 后无法开启 MySQL 服务
server_id = 1
# 设置 Binlog 模式为 ROW
binlog_format = ROW
# binlog 日志的基本文件名,后面会追加标识来表示每一个 Binlog 文件
log_bin_basename =/var/lib/mysql/mysql-bin
# binlog 文件的索引文件,管理所有 Binlog 文件的目录
log_bin_index =/var/lib/mysql/mysql-bin.index -
执行如下命令,重启 MySQL,生效修改后的配置文件:
# 使用 service 启动
service mysqld restart
# 使用 mysqld 脚本启动
/etc/init.d/mysqld restart