Apache Flink
从 Apache Flink® 持续导入
StarRocks 提供 Apache Flink® 连接器 (以下简称 Flink connector),可以通过 Flink 导入数据至 StarRocks表。
基本原理是 Flink connector 在内存中积攒小批数据,再通过 Stream Load 一次性导入 StarRocks。
Flink Connector 支持 DataStream API,Table API & SQL 和 Python API。
StarRocks 提供的 Flink connector,相比于 Flink 提供的 flink-connector-jdbc,性能更优越和稳定。
注意
使用 Flink connector 导入数据至 StarRocks 需要目标表的 SELECT 和 INSERT 权限。如果您的用户账号没有这些权限,请参考 GRANT 给用户赋权。
版本要求
Connector | Flink | StarRocks | Java | Scala |
---|---|---|---|---|
1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 及以上 | 8 | 2.11,2.12 |
1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 及以上 | 8 | 2.11,2.12 |
1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 及以上 | 8 | 2.11,2.12 |
1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 及以上 | 8 | 2.11,2.12 |
获取 Flink connector
您可以通过以下方式获取 Flink connector JAR 文件:
- 直接下载已经编译好的 JAR 文件。
- 在 Maven 项目的 pom 文件添加 Flink connector 为依赖项,作为依赖下载。
- 通过源码手动编译成 JAR 文件。
Flink connector JAR 文件的命名格式如下:
- 适用于 Flink 1.15 版本及以后的 Flink connector 命名格式为
flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar
。例如您安装了 Flink 1.15,并且想要使用 1.2.7 版本的 Flink connector,则您可以使用flink-connector-starrocks-1.2.7_flink-1.15.jar
。 - 适用于 Flink 1.15 版本之前的 Flink connector 命名格式为
flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar
。例如您安装了 Flink 1.14 和 Scala 2.12,并且您想要使用 1.2.7 版本的 Flink connector,您可以使用flink-connector-starrocks-1.2.7_flink-1.14_2.12.jar
。
注意
一般情况下最新版本的 Flink connector 只维护最近 3 个版本的 Flink。
直接下载
可以在 Maven Central Repository 获取不同版本的 Flink connector JAR 文件。
Maven 依赖
在 Maven 项目的 pom.xml
文件中,根据以下格式将 Flink connector 添加为依赖项。将 flink_version
、scala_version
和 connector_version
分别替换为相应的版本。
-
适用于 Flink 1.15 版本及以后的 Flink connector
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}</version>
</dependency> -
适用于 Flink 1.15 版本之前的 Flink connector
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}_${scala_version}</version>
</dependency>
手动编译
-
执行以下命令将 Flink connector 的源代码编译成一个 JAR 文件。请注意,将
flink_version
替换为相应的Flink 版本。sh build.sh <flink_version>
例如,如果您的环境中的 Flink 版本为1.15,您需要执行以下命令:
sh build.sh 1.15
-
前往
target/
目录,找到编译完成的 Flink connector JAR 文件,例如flink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar
,该文件在编译过程中生成。注意:
未正式发布的 Flink connector 的名称包含
SNAPSHOT
后缀。
参数说明
参数 | 是否必填 | 默认值 | 描述 |
---|---|---|---|
connector | Yes | NONE | 固定设置为 starrocks 。 |
jdbc-url | Yes | NONE | 用于访问 FE 节点上的 MySQL 服务器。多个地址用英文逗号(,)分隔。格式:jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2> 。 |
load-url | Yes | NONE | 用于访问 FE 节点上的 HTTP 服务器。多个地址用英文分号(;)分隔。格式:<fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2> 。 |
database-name | Yes | NONE | StarRocks 数据库名。 |
table-name | Yes | NONE | StarRocks 表名。 |
username | Yes | NONE | StarRocks 集群的用户名。使用 Flink connector 导入数据至 StarRocks 需要目标表的 SELECT 和 INSERT 权限。如果您的用户账号没有这些权限,请参考 GRANT 给用户赋权。 |
password | Yes | NONE | StarRocks 集群的用户密码。 |
sink.semantic | No | at-least-once | sink 保证的语义。有效值:at-least-once 和 exactly-once。 |
sink.version | No | AUTO | 导入数据的接口。此参数自 Flink connector 1.2.4 开始支持。
|
sink.label-prefix | No | NONE | 指定 Stream Load 使用的 label 的前缀。 如果 Flink connector 版本为 1.2.8 及以上,并且 sink 保证 exactly-once 语义,则建议配置 label 前缀。详细信息,参见exactly once。 |
sink.buffer-flush.max-bytes | No | 94371840(90M) | 积 攒在内存的数据大小,达到该阈值后数据通过 Stream Load 一次性导入 StarRocks。取值范围:[64MB, 10GB]。将此参数设置为较大的值可以提高导入性能,但可能会增加导入延迟。 该参数只在 sink.semantic 为at-least-once 才会生效。 sink.semantic 为 exactly-once ,则只有 Flink checkpoint 触发时 flush 内存的数据,因此该参数不生效。 |
sink.buffer-flush.max-rows | No | 500000 | 积攒在内存的数据条数,达到该阈值后数据通过 Stream Load 一次性导入 StarRocks。取值范围:[64000, 5000000]。该参数只在 sink.version 为 V1 ,sink.semantic 为 at-least-once 才会生效。 |
sink.buffer-flush.interval-ms | No | 300000 | 数据发送的间隔,用于控制数据写入 StarRocks 的延迟,取值范围:[1000, 3600000]。该参数只在 sink.semantic 为 at-least-once 才会生效。 |
sink.max-retries | No | 3 | Stream Load 失败后的重试次数。超过该数量上限,则数据导入任务报错。取值范围:[0, 10]。该参数只在 sink.version 为 V1 才会生效。 |
sink.connect.timeout-ms | No | 30000 | 与 FE 建立 HTTP 连接的超时时间。取值范围:[100, 60000]。 Flink connector v1.2.9 之前,默认值为 1000 。 |
sink.socket.timeout-ms | No | -1 | 此参数自 Flink connector 1.2.10 开始支持。HTTP 客户端等待数据的超时时间。单位:毫秒。默认值 -1 表示没有超时时间。 |
sink.wait-for-continue.timeout-ms | No | 10000 | 此参数自 Flink connector 1.2.7 开始支持。等待 FE HTTP 100-continue 应答的超时时间。取值范围:[3000, 60000]。 |
sink.ignore.update-before | No | TRUE | 此参数自 Flink connector 1.2.8 开始支持。将数据导入到主键表时,是否忽略来自 Flink 的 UPDATE_BEFORE 记录。如果将此参数设置为 false,则将该记录在主键表中视为 DELETE 操作。 |
sink.parallelism | No | NONE | 写入的并行度。仅适用于 Flink SQL。如果未设置, Flink planner 将决定并行度。在多并行度的场景中,用户需要确保数据按正确顺序写入。 |
sink.properties.* | No | NONE | Stream Load 的参数,控制 Stream Load 导入行为。例如 参数 sink.properties.format 表示 Stream Load 所导入的数据格式,如 CSV 或者 JSON。全部参数和解释,请参见 STREAM LOAD。 |
sink.properties.format | No | csv | Stream Load 导入时的数据格式。Flink connector 会将内存的数据转换为对应格式,然后通过 Stream Load 导入至 StarRocks。取值为 CSV 或者 JSON。 |
sink.properties.column_separator | No | \t | CSV 数据的列分隔符。 |
sink.properties.row_delimiter | No | \n | CSV 数据的行分隔符。 |
sink.properties.max_filter_ratio | No | 0 | 导入作业的最大容错率,即导入作业能够容忍的因数据质量不合格而过滤掉的数据行所占的最大比例。取值范围:0~1。默认值:0 。详细信息,请参见 STREAM LOAD。 |
sink.properties.partial_update | No | false | 是否使用部分更新。取值包括 TRUE 和 FALSE 。默认值:FALSE 。 |
sink.properties.partial_update_mode | No | row | 指定部分更新的模式,取 值包括 row 和 column 。
|
sink.properties.strict_mode | No | false | 是否为 Stream Load 启用严格模式。在导入数据中出现不合格行(如列值不一致)时,严格模式会影响导入行为。有效值: true 和 false 。具体参考 STREAM LOAD。 |
sink.properties.compression | No | NONE | 此参数自 Flink connector 1.2.10 开始支持。指定用于 Stream Load 的压缩算法。目前只支持 JSON 格式的压缩。有效值:lz4_frame 。仅 StarRocks v3.2.7 及更高版本支持 JSON 格式的压缩。 |
数据类型映射
Flink 数据类型 | StarRocks 数据类型 |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
BINARY | INT |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
ARRAY<T> | ARRAY<T> |
MAP<KT,VT> | JSON STRING |
ROW<arg T...> | JSON STRING |
使用说明
Exactly Once
-
如果您希望 sink 保证 exactly-once 语义,则建议升级 StarRocks 到 2.5 或更高版本,并将 Flink connector 升级到 1.2.4 或更高版本。
- 自 2.4 版本 StarRocks 开始支持 Stream Load 事务接口。自 Flink connector 1.2.4 版本起, Sink 基于 Stream Load 事务接口重新设计 exactly-once 的实现,相较于原来基于 Stream Load 非事务接口实现的 exactly-once,降低了内存使用和 checkpoint 耗时,提高了作业的实时性和稳定性。
- 自 Flink connector 1.2.4 版本起,如果 StarRocks 支持 Stream Load 事务接口,则 Sink 默认使用 Stream Load 事务接口,如果需要使用 Stream Load 非事务接口实现,则需要配置
sink.version
为V1
。
注意
如果只升级 StarRocks 或 Flink connector,sink 会自动选择 Stream Load 非事务接口实现。
-
sink 保证 exactly-once 语义相关配置
-
sink.semantic
的值必须为exactly-once
. -
如果 Flink connector 版本为 1.2.8 及更高,则建议指定
sink.label-prefix
的值。需要注意的是,label 前缀在 StarRocks 的所有类型的导入作业中必须是唯一的,包括 Flink job、Routine Load 和 Broker Load。- 如果指定了 label 前缀,Flink connector 将使用 label 前缀清理因为 Flink job 失败而生成的未完成事务,例如在checkpoint 进行过程中 Flink job 失败。如果使用
SHOW PROC '/transactions/<db_id>/running';
查看这些事务在 StarRock 的状态,则返回结果会显示事务通常处于PREPARED
状态。当 Flink job 从 checkpoint 恢复时,Flink connector 将根据 label 前缀和 checkpoint 中的信息找到这些未完成的事务,并中止事务。当 Flink job 因某种原因退出时,由于采用了两阶段提交机制来实现 exactly-once语义,Flink connector 无法中止事务。当 Flink 作业退出时,Flink connector 尚未收到来自 Flink checkpoint coordinator 的通知,说明这些事务是否应包含在成功的 checkpoint 中,如果中止这些事务,则可能导致数据丢失。您可以在这篇文章中了解如何在 Flink 中实现端到端的 exactly-once。 - 如果未指定 label 前缀,则未完成的事务将在超时后由 StarRocks 清理。然而,如果 Flink job 在事务超时之前频繁失败,则运行中的事务数量可能会达到 StarRocks 的
max_running_txn_num_per_db
限制。超时长度由 StarRocks FE 配置prepared_transaction_default_timeout_second
控制,默认值为86400
(1天)。如果未指定 label 前缀,您可以设置一个较小的值,使事务更快超时。
- 如果指定了 label 前缀,Flink connector 将使用 label 前缀清理因为 Flink job 失败而生成的未完成事务,例如在checkpoint 进行过程中 Flink job 失败。如果使用
-
-
如果您确定 Flink job 将在长时间停止后最终会使用 checkpoint 或 savepoint 恢复,则为避免数据丢失,请调整以下 StarRocks 配置:
-
prepared_transaction_default_timeout_second
:StarRocks FE 参数,默认值为86400
。此参数值需要大于 Flink job 的停止时间。否则,在重新启动 Flink job 之前,可能会因事务超时而中止未完成事务,这些事务可能包含在成功 checkpoint 中的,如果中止,则会导致数据丢失。请注意,当您设置一个较大的值时,则建议指定
sink.label-prefix
的值,则 Flink connector 可以根据 label 前缀和检查点中的一些信息来清理未完成的事务,而不是因事务超时后由 StarRocks 清理(这可能会导致数据丢失)。 -
label_keep_max_second
和label_keep_max_num
:StarRocks FE 参数,默认值分别为259200
和1000
。更多信息,参见FE 配置。label_keep_max_second
的值需要大于 Flink job 的停止时间。否则,Flink connector 无法使用保存在 Flink 的 savepoint 或 checkpoint 中的事务 label 来检查事务在 StarRocks 中的状态,并判断这些事务是否已提交,最终可能导致数据丢失。
您可以使用
ADMIN SET FRONTEND CONFIG
修改上述配置。ADMIN SET FRONTEND CONFIG ("prepared_transaction_default_timeout_second" = "3600");
ADMIN SET FRONTEND CONFIG ("label_keep_max_second" = "259200");
ADMIN SET FRONTEND CONFIG ("label_keep_max_num" = "1000"); -
Flush 策略
Flink connector 先在内存中 buffer 数据,然后通过 Stream Load 将其一次性 flush 到 StarRocks。在 at-least-once 和 exactly-once 场景中使用不同的方式触发 flush 。
对于 at-least-once,在满足以下任何条件时触发 flush:
- buffer 数据的字节达到限制
sink.buffer-flush.max-bytes
- buffer 数据行数达到限制
sink.buffer-flush.max-rows
。(仅适用于版本 V1) - 自上次 flush 以来经过的时间达到限制
sink.buffer-flush.interval-ms
- 触发了 checkpoint
对于 exactly-once,仅在触发 checkpoint 时触发 flush。
监控导入指标
Flink connector 提供以下指标来监控导入情况。
指标名称 | 类型 | 描述 |
---|---|---|
totalFlushBytes | Counter | 成功 flush 的字节。 |
totalFlushRows | Counter | 成功 flush 的行数。 |
totalFlushSucceededTimes | Counter | flush 数据的成功次数。 |
totalFlushFailedTimes | Counter | flush 数据的失败次数。 |
totalFilteredRows | Counter | 已过滤的行数,这些行数也包含在 totalFlushRows 中。 |
使用示例
准备工作
创建 StarRocks 表
创建数据库 test
,并创建主键表 score_board
。
CREATE DATABASE test;
CREATE TABLE test.score_board(
id int(11) NOT NULL COMMENT "",
name varchar(65533) NULL DEFAULT "" COMMENT "",
score int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(id)
DISTRIBUTED BY HASH(id);
Flink 环境
-
下载 Flink 二进制文件 Flink 1.15.2,并解压到目录
flink-1.15.2
。 -
下载 Flink connector 1.2.7,并将其放置在目录
flink-1.15.2/lib
中。 -
运行以下命令启动 Flink 集群:
cd flink-1.15.2
./bin/start-cluster.sh
网络配置
确保 Flink 所在机器能够访问 StarRocks 集群中 FE 节点的 http_port
(默认 8030
) 和 query_port
端口(默认 9030
),以及 BE 节点的 be_http_port
端口(默认 8040
)。
使用 Flink SQL 写入数据
-
运行以下命令以启动 Flink SQL 客户端。
./bin/sql-client.sh
-
在 Flink SQL 客户端,创建一个表
score_board
,并且插入数据。 注意,如果您想将数据导入到 StarRocks 主键表中,您必须在 Flink 表的 DDL 中定义主键。对于其他类型的 StarRocks 表,这是可选的。CREATE TABLE `score_board` (
`id` INT,
`name` STRING,
`score` INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
'load-url' = '127.0.0.1:8030',
'database-name' = 'test',
'table-name' = 'score_board',
'username' = 'root',
'password' = ''
);
INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);
使用 Flink DataStream 写入数据
根据 input records 的类型,编写对应 Flink DataStream 作业,例如 input records 为 CSV 格式的 Java String
、JSON 格式的 Java String
或自定义的 Java 对象。
-
如果 input records 为 CSV 格式的
String
,对应的 Flink DataStream 作业的主要代码如下所示,完整代码请参见 LoadCsvRecords/**
* Generate CSV-format records. Each record has three values separated by "\t".
* These values will be loaded to the columns `id`, `name`, and `score` in the StarRocks table.
*/
String[] records = new String[]{
"1\tstarrocks-csv\t100",
"2\tflink-csv\t100"
};
DataStream<String> source = env.fromElements(records);
/**
* Configure the Flink connector with the required properties.
* You also need to add properties "sink.properties.format" and "sink.properties.column_separator"
* to tell the Flink connector the input records are CSV-format, and the column separator is "\t".
* You can also use other column separators in the CSV-format records,
* but remember to modify the "sink.properties.column_separator" correspondingly.
*/
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", jdbcUrl)
.withProperty("load-url", loadUrl)
.withProperty("database-name", "test")
.withProperty("table-name", "score_board")
.withProperty("username", "root")
.withProperty("password", "")
.withProperty("sink.properties.format", "csv")
.withProperty("sink.properties.column_separator", "\t")
.build();
// Create the sink with the options.
SinkFunction<String> starRockSink = StarRocksSink.sink(options);
source.addSink(starRockSink); -
如果 input records 为 JSON 格式的
String
,对应的 Flink DataStream 作业的主要代码如下所示,完整代码请参见LoadJsonRecords/**
* Generate JSON-format records.
* Each record has three key-value pairs corresponding to the columns id, name, and score in the StarRocks table.
*/
String[] records = new String[]{
"{\"id\":1, \"name\":\"starrocks-json\", \"score\":100}",
"{\"id\":2, \"name\":\"flink-json\", \"score\":100}",
};
DataStream<String> source = env.fromElements(records);
/**
* Configure the Flink connector with the required properties.
* You also need to add properties "sink.properties.format" and "sink.properties.strip_outer_array"
* to tell the Flink connector the input records are JSON-format and to strip the outermost array structure.
*/
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", jdbcUrl)
.withProperty("load-url", loadUrl)
.withProperty("database-name", "test")
.withProperty("table-name", "score_board")
.withProperty("username", "root")
.withProperty("password", "")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
.build();
// Create the sink with the options.
SinkFunction<String> starRockSink = StarRocksSink.sink(options);
source.addSink(starRockSink); -
如果 input records 为自定义的 Java 对象,对应的 Flink DataStream 作业的主要代码如下所示,完整代码请参见LoadCustomJavaRecords
-
本示例中,input record 是一个简单的 POJO
RowData
。public static class RowData {
public int id;
public String name;
public int score;
public RowData() {}
public RowData(int id, String name, int score) {
this.id = id;
this.name = name;
this.score = score;
}
} -
主要代码如下所示:
// Generate records which use RowData as the container.
RowData[] records = new RowData[]{
new RowData(1, "starrocks-rowdata", 100),
new RowData(2, "flink-rowdata", 100),
};
DataStream<RowData> source = env.fromElements(records);
// Configure the Flink connector with the required properties.
StarRocksSinkOptions options = StarRocksSinkOptions.builder()
.withProperty("jdbc-url", jdbcUrl)
.withProperty("load-url", loadUrl)
.withProperty("database-name", "test")
.withProperty("table-name", "score_board")
.withProperty("username", "root")
.withProperty("password", "")
.build();
/**
* The Flink connector will use a Java object array (Object[]) to represent a row to be loaded into the StarRocks table,
* and each element is the value for a column.
* You need to define the schema of the Object[] which matches that of the StarRocks table.
*/
TableSchema schema = TableSchema.builder()
.field("id", DataTypes.INT().notNull())
.field("name", DataTypes.STRING())
.field("score", DataTypes.INT())
// When the StarRocks table is a Primary Key table, you must specify notNull(), for example, DataTypes.INT().notNull(), for the primary key `id`.
.primaryKey("id")
.build();
// Transform the RowData to the Object[] according to the schema.
RowDataTransformer transformer = new RowDataTransformer();
// Create the sink with the schema, options, and transformer.
SinkFunction<RowData> starRockSink = StarRocksSink.sink(schema, options, transformer);
source.addSink(starRockSink); -
其中
RowDataTransformer
定义如下:private static class RowDataTransformer implements StarRocksSinkRowBuilder<RowData> {
/**
* Set each element of the object array according to the input RowData.
* The schema of the array matches that of the StarRocks table.
*/
@Override
public void accept(Object[] internalRow, RowData rowData) {
internalRow[0] = rowData.id;
internalRow[1] = rowData.name;
internalRow[2] = rowData.score;
// When the StarRocks table is a Primary Key table, you need to set the last element to indicate whether the data loading is an UPSERT or DELETE operation.
internalRow[internalRow.length - 1] = StarRocksSinkOP.UPSERT.ordinal();
}
}
-
使用 Flink CDC 3.0 同步数据(支持 schema change)
Flink CDC 3.0 框架可以轻松地从 CDC 数据源(如 MySQL、Kafka)到 StarRocks 构建流式 ELT 管道。该管道能够将整个数据库、分库分表以及来自源端的 schema change 同步到 StarRocks。
自 v1.2.9 起,StarRocks 提供的 Flink connector 已经集成至该框架中,并且被命名为 StarRocks Pipeline Connector。StarRocks Pipeline Connector 支持:
- 自动创建数据库/表
- 同步 schema change
- 同步全量和增量数据
快速上手教程可以参考从 MySQL 到 StarRocks 的流式 ELT 管道。
建议您使用 StarRocks v3.2.1 及以后的版本,以开启 fast_schema_evolution,来提高加减列的速度并降低资源使用。
最佳实践
导入至主键表
本节将展示如何将数据导入到 StarRocks 主键表中,以实现部分更新和条件更新。以下示例使用 Flink SQL。 部分更新和条件更新的更多介绍,请参见通过导入实现数据变更。