- StarRocks
- 产品简介
- 快速开始
- 表设计
- 导入数据
- 导出数据
- 查询数据源
- 查询加速
- 管理手册
- 部署集群
- 运维集群
- 数据恢复
- 管理用户权限及认证
- 性能调优
- 参考手册
- SQL参考
- 关键字
- 用户账户管理
- 集群管理
- ADMIN CANCEL REPAIR TABLE
- ADMIN CHECK TABLET
- ADMIN REPAIR TABLE
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE RESOURCE GROUP
- CREATE FILE
- DROP FILE
- DROP RESOURCE GROUP
- INSTALL PLUGIN
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROC
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW TABLE STATUS
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ALTER RESOURCE
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE DATABASE
- CREATE EXTERNAL CATALOG
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP ANALYZE
- DROP STATS
- DROP CATALOG
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW META
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL REFRESH MATERIALIZED VIEW
- CANCEL EXPORT
- CREATE ROUTINE LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- RESUME ROUTINE LOAD
- SELECT
- SHOW ALTER TABLE
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEW
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- 辅助命令
- 数据类型
- 函数参考
- Java UDF
- 窗口函数
- Lambda 表达式
- 日期函数
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_add
- hours_diff
- hours_sub
- microseconds_add
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- months_sub
- now
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_days
- to_date
- unix_timestamp
- utc_timestamp
- week
- weekofyear
- weeks_add
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- 加密函数
- 地理位置函数
- 字符串函数
- JSON 函数
- Map 函数
- 模糊/正则匹配函数
- 工具函数
- 聚合函数
- Bitmap 函数
- bitmap_and
- bitmap_andnot
- bitmap_contains
- bitmap_count
- bitmap_empty
- bitmap_from_string
- bitmap_hash
- bitmap_has_any
- bitmap_intersect
- bitmap_max
- bitmap_min
- bitmap_or
- bitmap_remove
- bitmap_to_array
- bitmap_to_base64
- base64_to_bitmap
- bitmap_to_string
- bitmap_union
- bitmap_union_count
- bitmap_union_int
- bitmap_xor
- intersect_count
- sub_bitmap
- to_bitmap
- 数组函数
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- arrays_overlap
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- array_to_bitmap
- reverse
- unnest
- bit函数
- cast函数
- hash函数
- 条件函数
- 百分位函数
- 标量函数
- 数学函数
- 系统变量
- 用户自定义变量
- 错误码
- 系统限制
- SQL参考
- 常见问题解答
- 性能测试
- 开发指南
从 Apache Flink® 持续导入
功能简介
StarRocks 提供 flink-connector-starrocks,导入数据至 StarRocks,相比于 Flink 官方提供的 flink-connector-jdbc,导入性能更佳。 flink-connector-starrocks 的内部实现是通过缓存并批量由 Stream Load 导入。
支持的数据源
- CSV
- JSON
操作步骤
步骤一:添加 pom 依赖
将以下内容加入pom.xml
:
点击 版本信息 查看页面Latest Version信息,替换下面x.x.x内容
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<!-- for flink-1.15, connector 1.2.3+ -->
<version>x.x.x_flink-1.15</version>
<!-- for flink-1.14 -->
<version>x.x.x_flink-1.14_2.11</version>
<version>x.x.x_flink-1.14_2.12</version>
<!-- for flink-1.13 -->
<version>x.x.x_flink-1.13_2.11</version>
<version>x.x.x_flink-1.13_2.12</version>
<!-- for flink-1.12 -->
<version>x.x.x_flink-1.12_2.11</version>
<version>x.x.x_flink-1.12_2.12</version>
<!-- for flink-1.11 -->
<version>x.x.x_flink-1.11_2.11</version>
<version>x.x.x_flink-1.11_2.12</version>
</dependency>
步骤二:调用 flink-connector-starrocks
如您使用 Flink DataStream API,则需要参考如下命令。
// -------- 原始数据为 json 格式 -------- fromElements(new String[]{ "{\"score\": \"99\", \"name\": \"stephen\"}", "{\"score\": \"100\", \"name\": \"lebron\"}" }).addSink( StarRocksSink.sink( // the sink options StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx") .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port") .withProperty("username", "xxx") .withProperty("password", "xxx") .withProperty("table-name", "xxx") // 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。 // .withProperty("sink.properties.partial_update", "true") // .withProperty("sink.properties.columns", "k1,k2,k3") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") // 设置并行度,多并行度情况下需要考虑如何保证数据有序性 .withProperty("sink.parallelism", "1") .build() ) ); // -------- 原始数据为 CSV 格式 -------- class RowData { public int score; public String name; public RowData(int score, String name) { ...... } } fromElements( new RowData[]{ new RowData(99, "stephen"), new RowData(100, "lebron") } ).addSink( StarRocksSink.sink( // the table structure TableSchema.builder() .field("score", DataTypes.INT()) .field("name", DataTypes.VARCHAR(20)) .build(), // the sink options StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx") .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port") .withProperty("username", "xxx") .withProperty("password", "xxx") .withProperty("table-name", "xxx") .withProperty("database-name", "xxx") // 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。 // .withProperty("sink.properties.partial_update", "true") // .withProperty("sink.properties.columns", "k1,k2,k3") .withProperty("sink.properties.column_separator", "\\x01") .withProperty("sink.properties.row_delimiter", "\\x02") .build(), // set the slots with streamRowData (slots, streamRowData) -> { slots[0] = streamRowData.score; slots[1] = streamRowData.name; } ) );
如您使用 Flink Table API,则需要参考如下命令。
// -------- 原始数据为 CSV 格式 -------- // create a table with `structure` and `properties` // Needed: Add `com.starrocks.connector.flink.table.StarRocksDynamicTableSinkFactory` // to: `src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory` tEnv.executeSql( "CREATE TABLE USER_RESULT(" + "name VARCHAR," + "score BIGINT" + ") WITH ( " + "'connector' = 'starrocks'," + "'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx'," + "'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port'," + "'database-name' = 'xxx'," + "'table-name' = 'xxx'," + "'username' = 'xxx'," + "'password' = 'xxx'," + "'sink.buffer-flush.max-rows' = '1000000'," + "'sink.buffer-flush.max-bytes' = '300000000'," + "'sink.buffer-flush.interval-ms' = '5000'," + // 自 2.4 版本,支持更新主键模型中的部分列。您可以通过以下两个属性指定需要更新的列。 // "'sink.properties.partial_update' = 'true'," + // "'sink.properties.columns' = 'k1,k2,k3'," + "'sink.properties.column_separator' = '\\x01'," + "'sink.properties.row_delimiter' = '\\x02'," + "'sink.properties.*' = 'xxx'" + // stream load properties like `'sink.properties.columns' = 'k1, v1'` "'sink.max-retries' = '3'" + ")" );
参数说明
其中Sink选项如下:
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | YES | NONE | String | starrocks |
jdbc-url | YES | NONE | String | this will be used to execute queries in starrocks. |
load-url | YES | NONE | String | fe_ip:http_port;fe_ip:http_port separated with ';', which would be used to do the batch sinking. |
database-name | YES | NONE | String | starrocks database name |
table-name | YES | NONE | String | starrocks table name |
username | YES | NONE | String | starrocks connecting username |
password | YES | NONE | String | starrocks connecting password |
sink.semantic | NO | at-least-once | String | at-least-once or exactly-once(flush at checkpoint only and options like sink.buffer-flush.* won't work either). |
sink.version | NO | AUTO | String | The version of implementaion for sink exactly-once. Only availible for connector 1.2.4+. If V2 , use StarRocks' stream load transaction interface which requires StarRocks 2.4+. If V1 , use stream load non-transaction interface. If AUTO , connector will choose the stream load transaction interface automatically if the StarRocks supports the feature, otherwise choose non-transaction interface. |
sink.buffer-flush.max-bytes | NO | 94371840(90M) | String | the max batching size of the serialized data, range: [64MB, 10GB]. |
sink.buffer-flush.max-rows | NO | 500000 | String | the max batching rows, range: [64,000, 5000,000]. |
sink.buffer-flush.interval-ms | NO | 300000 | String | the flushing time interval, range: [1000ms, 3600000ms]. |
sink.max-retries | NO | 3 | String | max retry times of the stream load request, range: [0, 10]. |
sink.connect.timeout-ms | NO | 1000 | String | Timeout in millisecond for connecting to the load-url , range: [100, 60000]. |
sink.properties.format | NO | CSV | String | The file format of data loaded into starrocks. Valid values: CSV and JSON. Default value: CSV. |
sink.properties.* | NO | NONE | String | the stream load properties like 'sink.properties.columns' = 'k1, k2, k3',details in STREAM LOAD. Since 2.4, the flink-connector-starrocks supports partial updates for Primary Key model. |
sink.properties.ignore_json_size | NO | false | String | ignore the batching size (100MB) of json data |
sink.properties.timeout | NO | 600 | String | Timeout for transaction stream load when you use exactly-once sink. A new transaction will begin after a Flink checkpoint is triggered, and be committed when the next checkpoint is triggered, so you should set the value larger than the Flink checkpoint interval, otherwise the Flink job will fail because of transaction timeout . |
Flink 与 StarRocks 的数据类型映射关系
Flink type | StarRocks type |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
BINARY | INT |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
ARRAY\<T> | ARRAY\<T> |
MAP\<KT,VT> | JSON STRING |
ROW\<arg T...> | JSON STRING |
注意:当前不支持 Flink 的 BYTES、VARBINARY、TIME、INTERVAL、MULTISET、RAW,具体可参考 Flink 数据类型。
注意事项
自 2.4 版本 StarRocks 开始支持Stream Load 事务接口。自 Flink connector 1.2.4 版本起, Sink 基于事务接口重新设计实现了 exactly-once,相较于原来基于非事务接口的实现,降低了内存使用和 checkpoint 耗时,提高了作业的实时性和稳定性。 自 Flink connector 1.2.4 版本起,sink 默认使用事务接口实现。如果需要使用非事务接口实现,则需要配置
sink.version
为V1
。注意
如果只升级 StarRocks 或 Flink connector,sink 会自动选择非事务接口实现。
基于Stream Load非事务接口实现的exactly-once,依赖flink的checkpoint-interval在每次checkpoint时保存批数据以及其label,在checkpoint完成后的第一次invoke中阻塞flush所有缓存在state当中的数据,以此达到精准一次。但如果StarRocks挂掉了,会导致用户的flink sink stream 算子长时间阻塞,并引起flink的监控报警或强制kill。
默认使用csv格式进行导入,用户可以通过指定
'sink.properties.row_delimiter' = '\\x02'
(此参数自 StarRocks-1.15.0 开始支持)与'sink.properties.column_separator' = '\\x01'
来自定义行分隔符与列分隔符。如果遇到导入停止的 情况,请尝试增加flink任务的内存。
如果代码运行正常且能接收到数据,但是写入不成功时请确认当前机器能访问BE的http_port端口,这里指能ping通集群show backends显示的ip:port。举个例子:如果一台机器有外网和内网ip,且FE/BE的http_port均可通过外网ip:port访问,集群里绑定的ip为内网ip,任务里loadurl写的FE外网ip:http_port,FE会将写入任务转发给BE内网ip:port,这时如果Client机器ping不通BE的内网ip就会写入失败。
导入数据可观测指标
Name | Type | Description |
---|---|---|
totalFlushBytes | counter | successfully flushed bytes. |
totalFlushRows | counter | successfully flushed rows. |
totalFlushSucceededTimes | counter | number of times that the data-batch been successfully flushed. |
totalFlushFailedTimes | counter | number of times that the flushing been failed. |
flink-connector-starrocks 导入底层调用的 Stream Load实现,可以在 flink 日志中查看导入状态
日志中如果有
http://$fe:${http_port}/api/$db/$tbl/_stream_load
生成,表示成功触发了 Stream Load 任务,任务结果也会打印在 flink 日志中,返回值可参考 Stream Load 返回值。日志中如果没有上述信息,请在 StarRocks 论坛 提问,我们会及时跟进。
常见问题
请参见 FLink Connector 常见问题。