编辑

从 Apache Flink® 持续导入

功能简介

StarRocks 提供 flink-connector-starrocks,导入数据至 StarRocks,相比于 Flink 官方提供的 flink-connector-jdbc,导入性能更佳。 flink-connector-starrocks 的内部实现是通过缓存并批量由 Stream Load 导入。

支持的数据源

  • CSV
  • JSON

操作步骤

步骤一:添加 pom 依赖

源码地址

将以下内容加入pom.xml:

点击 版本信息 查看页面Latest Version信息,替换下面x.x.x内容

<dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <!-- for flink-1.15, connector 1.2.3+ -->
    <version>x.x.x_flink-1.15</version>
    <!-- for flink-1.14 -->
    <version>x.x.x_flink-1.14_2.11</version>
    <version>x.x.x_flink-1.14_2.12</version>
    <!-- for flink-1.13 -->
    <version>x.x.x_flink-1.13_2.11</version>
    <version>x.x.x_flink-1.13_2.12</version>
    <!-- for flink-1.12 -->
    <version>x.x.x_flink-1.12_2.11</version>
    <version>x.x.x_flink-1.12_2.12</version>
    <!-- for flink-1.11 -->
    <version>x.x.x_flink-1.11_2.11</version>
    <version>x.x.x_flink-1.11_2.12</version>
</dependency>
  • 如您使用 Flink DataStream API,则需要参考如下命令。

    // -------- 原始数据为 json 格式 --------
    fromElements(new String[]{
        "{\"score\": \"99\", \"name\": \"stephen\"}",
        "{\"score\": \"100\", \"name\": \"lebron\"}"
    }).addSink(
        StarRocksSink.sink(
            // the sink options
            StarRocksSinkOptions.builder()
                .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx")
                .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
                .withProperty("username", "xxx")
                .withProperty("password", "xxx")
                .withProperty("table-name", "xxx")
                // 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。
                // .withProperty("sink.properties.partial_update", "true")
                // .withProperty("sink.properties.columns", "k1,k2,k3")
                .withProperty("sink.properties.format", "json")
                .withProperty("sink.properties.strip_outer_array", "true")
                // 设置并行度,多并行度情况下需要考虑如何保证数据有序性
                .withProperty("sink.parallelism", "1")
                .build()
        )
    );
    
    // -------- 原始数据为 CSV 格式 --------
    class RowData {
        public int score;
        public String name;
        public RowData(int score, String name) {
            ......
        }
    }
    fromElements(
        new RowData[]{
            new RowData(99, "stephen"),
            new RowData(100, "lebron")
        }
    ).addSink(
        StarRocksSink.sink(
            // the table structure
            TableSchema.builder()
                .field("score", DataTypes.INT())
                .field("name", DataTypes.VARCHAR(20))
                .build(),
            // the sink options
            StarRocksSinkOptions.builder()
                .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx")
                .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port")
                .withProperty("username", "xxx")
                .withProperty("password", "xxx")
                .withProperty("table-name", "xxx")
                .withProperty("database-name", "xxx")
                // 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。
                // .withProperty("sink.properties.partial_update", "true")
                // .withProperty("sink.properties.columns", "k1,k2,k3")
                .withProperty("sink.properties.column_separator", "\\x01")
                .withProperty("sink.properties.row_delimiter", "\\x02")
                .build(),
            // set the slots with streamRowData
            (slots, streamRowData) -> {
                slots[0] = streamRowData.score;
                slots[1] = streamRowData.name;
            }
        )
    );
  • 如您使用 Flink Table API,则需要参考如下命令。

    // -------- 原始数据为 CSV 格式 --------
    // create a table with `structure` and `properties`
    // Needed: Add `com.starrocks.connector.flink.table.StarRocksDynamicTableSinkFactory`
    //         to: `src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory`
    tEnv.executeSql(
        "CREATE TABLE USER_RESULT(" +
            "name VARCHAR," +
            "score BIGINT" +
        ") WITH ( " +
            "'connector' = 'starrocks'," +
            "'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx'," +
            "'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port'," +
            "'database-name' = 'xxx'," +
            "'table-name' = 'xxx'," +
            "'username' = 'xxx'," +
            "'password' = 'xxx'," +
            "'sink.buffer-flush.max-rows' = '1000000'," +
            "'sink.buffer-flush.max-bytes' = '300000000'," +
            "'sink.buffer-flush.interval-ms' = '5000'," +
            // 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。
            // "'sink.properties.partial_update' = 'true'," +
            // "'sink.properties.columns' = 'k1,k2,k3'," + 
            "'sink.properties.column_separator' = '\\x01'," +
            "'sink.properties.row_delimiter' = '\\x02'," +
            "'sink.properties.*' = 'xxx'" + // stream load properties like `'sink.properties.columns' = 'k1, v1'`
            "'sink.max-retries' = '3'" +
        ")"
    );

参数说明

其中Sink选项如下:

OptionRequiredDefaultTypeDescription
connectorYESNONEStringstarrocks
jdbc-urlYESNONEStringthis will be used to execute queries in starrocks.
load-urlYESNONEStringfe_ip:http_port;fe_ip:http_port separated with ';', which would be used to do the batch sinking.
database-nameYESNONEStringstarrocks database name
table-nameYESNONEStringstarrocks table name
usernameYESNONEStringstarrocks connecting username
passwordYESNONEStringstarrocks connecting password
sink.semanticNOat-least-onceStringat-least-once or exactly-once(flush at checkpoint only and options like sink.buffer-flush.* won't work either).
sink.versionNOAUTOStringThe version of implementaion for sink exactly-once. Only availible for connector 1.2.4+. If V2, use StarRocks' stream load transaction interface which requires StarRocks 2.4+. If V1, use stream load non-transaction interface. If AUTO, connector will choose the stream load transaction interface automatically if the StarRocks supports the feature, otherwise choose non-transaction interface.
sink.buffer-flush.max-bytesNO94371840(90M)Stringthe max batching size of the serialized data, range: [64MB, 10GB].
sink.buffer-flush.max-rowsNO500000Stringthe max batching rows, range: [64,000, 5000,000].
sink.buffer-flush.interval-msNO300000Stringthe flushing time interval, range: [1000ms, 3600000ms].
sink.max-retriesNO3Stringmax retry times of the stream load request, range: [0, 10].
sink.connect.timeout-msNO1000StringTimeout in millisecond for connecting to the load-url, range: [100, 60000].
sink.properties.formatNOCSVStringThe file format of data loaded into starrocks. Valid values: CSV and JSON. Default value: CSV.
sink.properties.*NONONEStringthe stream load properties like 'sink.properties.columns' = 'k1, k2, k3',details in STREAM LOAD. Since 2.4, the flink-connector-starrocks supports partial updates for Primary Key model.
sink.properties.ignore_json_sizeNOfalseStringignore the batching size (100MB) of json data
sink.properties.timeoutNO600StringTimeout for transaction stream load when you use exactly-once sink. A new transaction will begin after a Flink checkpoint is triggered, and be committed when the next checkpoint is triggered, so you should set the value larger than the Flink checkpoint interval, otherwise the Flink job will fail because of transaction timeout .
Flink typeStarRocks type
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTEGERINTEGER
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
BINARYINT
CHARSTRING
VARCHARSTRING
STRINGSTRING
DATEDATE
TIMESTAMP_WITHOUT_TIME_ZONE(N)DATETIME
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)DATETIME
ARRAY\<T>ARRAY\<T>
MAP\<KT,VT>JSON STRING
ROW\<arg T...>JSON STRING

注意:当前不支持 Flink 的 BYTES、VARBINARY、TIME、INTERVAL、MULTISET、RAW,具体可参考 Flink 数据类型

注意事项

  • 自 2.4 版本 StarRocks 开始支持Stream Load 事务接口。自 Flink connector 1.2.4 版本起, Sink 基于事务接口重新设计实现了 exactly-once,相较于原来基于非事务接口的实现,降低了内存使用和 checkpoint 耗时,提高了作业的实时性和稳定性。 自 Flink connector 1.2.4 版本起,sink 默认使用事务接口实现。如果需要使用非事务接口实现,则需要配置 sink.versionV1

    注意

    如果只升级 StarRocks 或 Flink connector,sink 会自动选择非事务接口实现。

  • 基于Stream Load非事务接口实现的exactly-once,依赖flink的checkpoint-interval在每次checkpoint时保存批数据以及其label,在checkpoint完成后的第一次invoke中阻塞flush所有缓存在state当中的数据,以此达到精准一次。但如果StarRocks挂掉了,会导致用户的flink sink stream 算子长时间阻塞,并引起flink的监控报警或强制kill。

  • 默认使用csv格式进行导入,用户可以通过指定'sink.properties.row_delimiter' = '\\x02'(此参数自 StarRocks-1.15.0 开始支持)与'sink.properties.column_separator' = '\\x01'来自定义行分隔符与列分隔符。

  • 如果遇到导入停止的 情况,请尝试增加flink任务的内存。

  • 如果代码运行正常且能接收到数据,但是写入不成功时请确认当前机器能访问BE的http_port端口,这里指能ping通集群show backends显示的ip:port。举个例子:如果一台机器有外网和内网ip,且FE/BE的http_port均可通过外网ip:port访问,集群里绑定的ip为内网ip,任务里loadurl写的FE外网ip:http_port,FE会将写入任务转发给BE内网ip:port,这时如果Client机器ping不通BE的内网ip就会写入失败。

导入数据可观测指标

NameTypeDescription
totalFlushBytescountersuccessfully flushed bytes.
totalFlushRowscountersuccessfully flushed rows.
totalFlushSucceededTimescounternumber of times that the data-batch been successfully flushed.
totalFlushFailedTimescounternumber of times that the flushing been failed.

flink-connector-starrocks 导入底层调用的 Stream Load实现,可以在 flink 日志中查看导入状态

  • 日志中如果有 http://$fe:${http_port}/api/$db/$tbl/_stream_load 生成,表示成功触发了 Stream Load 任务,任务结果也会打印在 flink 日志中,返回值可参考 Stream Load 返回值

  • 日志中如果没有上述信息,请在 StarRocks 论坛 提问,我们会及时跟进。

常见问题

请参见 FLink Connector 常见问题