编辑

Flink Connector

本文介绍 Flink 如何通过 flink-connector-starrocks 的 source 功能读取 StarRocks 数据。

如果 Flink 需要通过 flink-connector-starrocks 的 sink 功能,将数据写入至 StarRocks,请参见数据导入章节的 Flink connector

功能简介

Flink 可以通过 flink-connector-starrocks 的 source 功能读取 StarRocks 的数据。相较于 Flink 官方提供的 Flink JDBC connector,flink-connector-starrocks 的 source 功能具备并行读取 StarRocks 的 BE 节点数据的能力,大大提高了数据读取效率。以下是两种连接器的实现方案对比。

  • flink-connector-starrocks 的实现方案:Flink 先从 FE 节点获取查询计划(Query Plan),Flink 再将获取到的查询计划作为参数,下发至 BE 节点,然后获取 BE 节点返回的数据。

    asset

  • Flink JDBC connector 的实现方案:Flink JDBC connector 仅能从 FE 单点上串行读取数据,数据读取效率较低。

    asset

操作步骤

  1. 下载源码 flink-connector-starrocks

  2. 根据 Flink 的版本,选择对应的分支。

  3. 运行如下脚本,生成与 BE 节点 Thrift 接口交互的 Java class 文件,用于 flink-connector-starrocks 直接调用 BE 节点 Thrift 接口。

     # 如使用 Linux 操作系统,则需要执行如下命令。
    ./build-thrift.sh   
     # 如使用 Windows 操作系统,则需要执行如下命令。
    ./build-thrift.bat
  4. 将源码编译成 JAR 包,并将 JAR 包放在 Flink 的 lib 目录中。

  5. 重启 Flink。

flink-connector-starrocks 的 source 功能暂时无法保证 exactly-once 语义。如果读取任务失败,您需要重复本步骤,再次创建读取任务。

  • 如您使用 Flink SQL 客户端(推荐),则需要参考如下命令,调用 flink-connector-starrocks,读取 StarRocks 的数据。相关参数说明,请参见参数说明

    -- 根据 StarRocks 的表,创建表和配置属性(包括 flink-connector-starrocks 和库表的信息)。
    CREATE TABLE flink_test (
        date_1 DATE,
        datetime_1 TIMESTAMP(6),
        char_1 CHAR(20),
        varchar_1 VARCHAR,
        boolean_1 BOOLEAN,
        tinyint_1 TINYINT,
        smallint_1 SMALLINT,
        int_1 INT,
        bigint_1 BIGINT,
        largeint_1 STRING,
        float_1 FLOAT,
        double_1 DOUBLE,FLI
        decimal_1 DECIMAL(27,9)
    ) WITH (
       'connector'='starrocks',
       'scan-url'='192.168.xxx.xxx:8030,192.168.xxx.xxx:8030',
       'jdbc-url'='jdbc:mysql://192.168.xxx.xxx:9030',
       'username'='root',
       'password'='xxxxxx',
       'database-name'='flink_test',
       'table-name'='flink_test'
    );
    -- 使用 SQL 语句读取 StarRocks 数据。
    select date_1, smallint_1 from flink_test where char_1 <> 'A' and int_1 = -126;
    • 仅支持使用部分 SQL 语句读取 StarRocks 数据,如select ... from table_name where ...。暂不支持除 COUNT 外的聚合函数。
    • 支持谓词下推。使用 SQL 语句时,支持自动进行谓词下推,比如上述例子中的过滤条件 char_1 <> 'A' and int_1 = -126,会直接发送到 BE 节点的存储层进行过滤,不需要额外配置。
  • 如您使用 Flink DataStream ,则需要先添加依赖,然后调用 flink-connector-starrocks,读取 StarRocks 的数据。

  1. 在 pom.xml 文件中添加如下依赖。

    x.x.x需要替换为 flink-connector-starrocks 的最新版本号,您可以单击版本信息获取。

    <dependency>    
        <groupId>com.starrocks</groupId>
        <artifactId>flink-connector-starrocks</artifactId>
        <!-- for flink-1.11 -->
        <version>x.x.x_flink-1.11_2.11</version>
        <version>x.x.x_flink-1.11_2.12</version>
        <!-- for flink-1.12 -->
        <version>x.x.x_flink-1.12_2.11</version>
        <version>x.x.x_flink-1.12_2.12</version>
        <!-- for flink-1.13 -->
        <version>x.x.x_flink-1.13_2.11</version>
        <version>x.x.x_flink-1.13_2.12</version>
    </dependency>
  2. 参考如下示例代码,调用 flink-connector-starrocks,读取 StarRocks 的数据。相关参数说明,请参见参数说明

    StarRocksSourceOptions options = StarRocksSourceOptions.builder()
            .withProperty("scan-url", "192.168.xxx.xxx:8030,192.168.xxx.xxx:8030")
            .withProperty("jdbc-url", "jdbc:mysql://192.168.xxx.xxx:9030")
            .withProperty("username", "root")
            .withProperty("password", "xxxxxx")
            .withProperty("table-name", "flink_test")
            .withProperty("database-name", "test")
            .withProperty("cloumns", "char_1, date_1")        
            .withProperty("filters", "int_1 = 10")
            .build();
    TableSchema tableSchema = TableSchema.builder()
            .field("date_1", DataTypes.DATE())
            .field("datetime_1", DataTypes.TIMESTAMP(6))
            .field("char_1", DataTypes.CHAR(20))
            .field("varchar_1", DataTypes.STRING())
            .field("boolean_1", DataTypes.BOOLEAN())
            .field("tinyint_1", DataTypes.TINYINT())
            .field("smallint_1", DataTypes.SMALLINT())
            .field("int_1", DataTypes.INT())
            .field("bigint_1", DataTypes.BIGINT())
            .field("largeint_1", DataTypes.STRING())
            .field("float_1", DataTypes.FLOAT())
            .field("double_1", DataTypes.DOUBLE())
            .field("decimal_1", DataTypes.DECIMAL(27, 9))
            .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.addSource(StarRocksSource.source(options, tableSchema)).setParallelism(5).print();
    env.execute("StarRocks flink source");

参数说明

参数是否必填数据类型描述
connectorString固定为 starrocks。
scan-urlStringFE 节点的连接地址,用于通过 Web 服务器访问 FE 节点。 具体格式为\<FE 节点的 IP 地址>:\<FE 的 http_port>,端口号默认为8030。多个地址之间用英文半角逗号分隔。例如192.168.xxx.xxx:8030,192.168.xxx.xxx:8030。
jdbc-urlStringFE 节点的连接地址,用于访问 FE 节点上的 MySQL 客户端。具体格式为 jdbc:mysql://\<FE 节点的 IP 地址>:\<FE 的 query_port>,端口号默认为9030。
usernameStringStarRocks 中的用户名称。需具备目标数据库表的读权限。用户权限说明,请参见用户权限
passwordStringStarRocks 的用户密码。
database-nameStringStarRocks 数据库的名称。
table-nameStringStarRocks 数据表的名称。
scan.connect.timeout-msStringflink-connector-starrocks 连接 StarRocks 的时间上限,单位为毫秒,默认值为1000。超过该时间上限,则将报错。
scan.params.keep-alive-minString查询任务的保活时间,单位为分钟。默认值为10,建议取值大于等于5。
scan.params.query-timeout-sString查询任务的超时时间,单位为秒,默认值为600。如果超过该时间,仍未返回查询结果,则停止查询任务。
scan.params.mem-limit-byteStringBE 节点中单个查询的内存上限,单位为字节,默认值为1073741824(1G)。
scan.max-retriesString查询失败时的最大重试次数,默认值为1。超过该数量上限,则将报错。

Streaming中特有参数

参数是否必填数据类型描述
scan.columnsString选择特定的columns。不同columns之间以逗号分隔。
scan.filterString在SQL中设置过滤方式。ps:"tinyint_1 = 100"

该数据类型映射关系仅适用于 Flink 读取 StarRocks 数据。如需要查看 Flink 将数据写入至 StarRocks 的数据类型映射关系,请参见数据导入章节的 Flink connector

StarRocksFlink
NULLNULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
LARGEINTSTRING
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
DECIMALV2DECIMAL
DECIMAL32DECIMAL
DECIMAL64DECIMAL
DECIMAL128DECIMAL
CHARCHAR
VARCHARSTRING

后续步骤

Flink 成功读取 StarRocks 数据后,您可以使用 Flink 官方的 Flink WEBUI 界面观察读取任务。比如 Flink WEBUI 的 Metrics 页面会显示成功读取的数据行数(totalScannedRows)。