- StarRocks介绍
- 快速开始
- 表设计
- 导入数据
- 导出数据
- 使用StarRocks
- 参考手册
- SQL参考
- 用户账户管理
- 集群管理
- ADMIN CANCEL REPAIR
- ADMIN CHECK TABLET
- ADMIN REPAIR
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- DROP FILE
- INSTALL PLUGIN
- SHOW BACKENDS
- SHOW BROKER
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW TABLE STATUS
- SHOW FILE
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER TABLE
- ALTER VIEW
- BACKUP
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE DATABASE
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- RECOVER
- RESTORE
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- DML
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- RESUME ROUTINE LOAD
- CREATE ROUTINE LOAD
- SELECT
- SHOW ALTER
- SHOW BACKUP
- SHOW CREATE TABLE
- SHOW CRAETE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- 数据类型
- 辅助命令
- 函数参考
- 系统变量
- 错误码
- 系统限制
- SQL参考
- 管理手册
- 常见问题解答
- 性能测试
- 开发指南
- 外部系统集成
Flink Connector
本文介绍 Flink 如何通过 flink-connector-starrocks 的 source 功能读取 StarRocks 数据。
如果 Flink 需要通过 flink-connector-starrocks 的 sink 功能,将数据写入至 StarRocks,请参见数据导入章节的 Flink connector。
功能简介
Flink 可以通过 flink-connector-starrocks 的 source 功能读取 StarRocks 的数据。相较于 Flink 官方提供的 Flink JDBC connector,flink-connector-starrocks 的 source 功能具备并行读取 StarRocks 的 BE 节点数据的能力,大大提高了数据读取效率。以下是两种连接器的实现方案对比。
flink-connector-starrocks 的实现方案:Flink 先从 FE 节点获取查询计划(Query Plan),Flink 再将获取到的查询计划作为参数,下发至 BE 节点,然后获取 BE 节点返回的数据。
Flink JDBC connector 的实现方案:Flink JDBC connector 仅能从 FE 单点上串行读取数据,数据读取效率较低。
操作步骤
步骤一:准备flink-connector-starrocks
根据 Flink 的版本,选择对应的分支。
运行如下脚本,生成与 BE 节点 Thrift 接口交互的 Java class 文件,用于 flink-connector-starrocks 直接调用 BE 节点 Thrift 接口。
# 如使用 Linux 操作系统,则需要执行如下命令。 ./build-thrift.sh # 如使用 Windows 操作系统,则需要执行如下命令。 ./build-thrift.bat
将源码编译成 JAR 包,并将 JAR 包放在 Flink 的 lib 目录中。
重启 Flink。
步骤二:调用 flink-connector-starrocks ,读取 StarRocks 数据
flink-connector-starrocks 的 source 功能暂时无法保证 exactly-once 语义。如果读取任务失败,您需要重复本步骤,再次创建读取任务。
如您使用 Flink SQL 客户端(推荐),则需要参考如下命令,调用 flink-connector-starrocks,读取 StarRocks 的数据。相关参数说明,请参见参数说明。
-- 根据 StarRocks 的表,创建表和配置属性(包括 flink-connector-starrocks 和库表的信息)。 CREATE TABLE flink_test ( date_1 DATE, datetime_1 TIMESTAMP(6), char_1 CHAR(20), varchar_1 VARCHAR, boolean_1 BOOLEAN, tinyint_1 TINYINT, smallint_1 SMALLINT, int_1 INT, bigint_1 BIGINT, largeint_1 STRING, float_1 FLOAT, double_1 DOUBLE,FLI decimal_1 DECIMAL(27,9) ) WITH ( 'connector'='starrocks', 'scan-url'='192.168.xxx.xxx:8030,192.168.xxx.xxx:8030', 'jdbc-url'='jdbc:mysql://192.168.xxx.xxx:9030', 'username'='root', 'password'='xxxxxx', 'database-name'='flink_test', 'table-name'='flink_test' ); -- 使用 SQL 语句读取 StarRocks 数据。 select date_1, smallint_1 from flink_test where char_1 <> 'A' and int_1 = -126;
- 仅支持使用部分 SQL 语句读取 StarRocks 数据,如
select ... from table_name where ...
。暂不支持除 COUNT 外的聚合函数。 - 支持谓词下推。使用 SQL 语句时,支持自动进行谓词下推,比如上述例子中的过滤条件
char_1 <> 'A' and int_1 = -126
,会直接发送到 BE 节点的存储层进行过滤,不需要额外配置。
- 仅支持使用部分 SQL 语句读取 StarRocks 数据,如
如您使用 Flink DataStream ,则需要先添加依赖,然后调用 flink-connector-starrocks,读取 StarRocks 的数据。
在 pom.xml 文件中添加如下依赖。
x.x.x需要替换为 flink-connector-starrocks 的最新版本号,您可以单击版本信息获取。
<dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <!-- for flink-1.11 --> <version>x.x.x_flink-1.11_2.11</version> <version>x.x.x_flink-1.11_2.12</version> <!-- for flink-1.12 --> <version>x.x.x_flink-1.12_2.11</version> <version>x.x.x_flink-1.12_2.12</version> <!-- for flink-1.13 --> <version>x.x.x_flink-1.13_2.11</version> <version>x.x.x_flink-1.13_2.12</version> </dependency>
参考如下示例代码,调用 flink-connector-starrocks,读取 StarRocks 的数据。相关参数说明,请参见参数说明。
StarRocksSourceOptions options = StarRocksSourceOptions.builder() .withProperty("scan-url", "192.168.xxx.xxx:8030,192.168.xxx.xxx:8030") .withProperty("jdbc-url", "jdbc:mysql://192.168.xxx.xxx:9030") .withProperty("username", "root") .withProperty("password", "xxxxxx") .withProperty("table-name", "flink_test") .withProperty("database-name", "test") .withProperty("cloumns", "char_1, date_1") .withProperty("filters", "int_1 = 10") .build(); TableSchema tableSchema = TableSchema.builder() .field("date_1", DataTypes.DATE()) .field("datetime_1", DataTypes.TIMESTAMP(6)) .field("char_1", DataTypes.CHAR(20)) .field("varchar_1", DataTypes.STRING()) .field("boolean_1", DataTypes.BOOLEAN()) .field("tinyint_1", DataTypes.TINYINT()) .field("smallint_1", DataTypes.SMALLINT()) .field("int_1", DataTypes.INT()) .field("bigint_1", DataTypes.BIGINT()) .field("largeint_1", DataTypes.STRING()) .field("float_1", DataTypes.FLOAT()) .field("double_1", DataTypes.DOUBLE()) .field("decimal_1", DataTypes.DECIMAL(27, 9)) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(StarRocksSource.source(options, tableSchema)).setParallelism(5).print(); env.execute("StarRocks flink source");
参数说明
参数 | 是否必填 | 数据类型 | 描述 |
---|---|---|---|
connector | 是 | String | 固定为 starrocks。 |
scan-url | 是 | String | FE 节点的连接地址,用于通过 Web 服务器访问 FE 节点。 具体格式为\<FE 节点的 IP 地址>:\<FE 的 http_port>,端口号默认为8030。多个地址之间用英文半角逗号分隔。例如192.168.xxx.xxx:8030,192.168.xxx.xxx:8030。 |
jdbc-url | 是 | String | FE 节点的连接地址,用于访问 FE 节点上的 MySQL 客户端。具体格式为 jdbc:mysql://\<FE 节点的 IP 地址>:\<FE 的 query_port>,端口号默认为9030。 |
username | 是 | String | StarRocks 中的用户名称。需具备目标数据库表的读权限。用户权限说明,请参见用户权限。 |
password | 是 | String | StarRocks 的用户密码。 |
database-name | 是 | String | StarRocks 数据库的名称。 |
table-name | 是 | String | StarRocks 数据表的名称。 |
scan.connect.timeout-ms | 否 | String | flink-connector-starrocks 连接 StarRocks 的时间上限,单位为毫秒,默认值为1000。超过该时间上限,则将报错。 |
scan.params.keep-alive-min | 否 | String | 查询任务的保活时间,单位为分钟。默认值为10,建议取值大于等于5。 |
scan.params.query-timeout-s | 否 | String | 查询任务的超时时间,单位为秒,默认值为600。如果超过该时间,仍未返回查询结果,则停止查询任务。 |
scan.params.mem-limit-byte | 否 | String | BE 节点中单个查询的内存上限,单位为字节,默认值为1073741824(1G)。 |
scan.max-retries | 否 | String | 查询失败时的最大重试次数,默认值为1。超过该数量上限,则将报错。 |
Streaming中特有参数
参数 | 是否必填 | 数据类型 | 描述 |
---|---|---|---|
scan.columns | 否 | String | 选择特定的columns。不同columns之间以逗号分隔。 |
scan.filter | 否 | String | 在SQL中设置过滤方式。ps:"tinyint_1 = 100" |
Flink 与 StarRocks 的数据类型映射关系
该数据类型映射关系仅适用于 Flink 读取 StarRocks 数据。如需要查看 Flink 将数据写入至 StarRocks 的数据类型映射关系,请参见数据导入章节的 Flink connector。
StarRocks | Flink |
---|---|
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
LARGEINT | STRING |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR | CHAR |
VARCHAR | STRING |
后续步骤
Flink 成功读取 StarRocks 数据后,您可以使用 Flink 官方的 Flink WEBUI 界面观察读取任务。比如 Flink WEBUI 的 Metrics 页面会显示成功读取的数据行数(totalScannedRows)。