- StarRocks
- 产品简介
- 快速开始
- 表设计
- 导入数据
- 导出数据
- 查询数据源
- 查询加速
- 管理手册
- 部署集群
- 运维集群
- 数据恢复
- 管理用户权限及认证
- 性能调优
- 参考手册
- SQL参考
- 关键字
- 用户账户管理
- 集群管理
- ADMIN CANCEL REPAIR TABLE
- ADMIN CHECK TABLET
- ADMIN REPAIR TABLE
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE RESOURCE GROUP
- CREATE FILE
- DROP FILE
- DROP RESOURCE GROUP
- INSTALL PLUGIN
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROC
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW TABLE STATUS
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ALTER RESOURCE
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE DATABASE
- CREATE EXTERNAL CATALOG
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP ANALYZE
- DROP STATS
- DROP CATALOG
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW META
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL REFRESH MATERIALIZED VIEW
- CANCEL EXPORT
- CREATE ROUTINE LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- RESUME ROUTINE LOAD
- SELECT
- SHOW ALTER TABLE
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEW
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- 辅助命令
- 数据类型
- 函数参考
- Java UDF
- 窗口函数
- Lambda 表达式
- 日期函数
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_add
- hours_diff
- hours_sub
- microseconds_add
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- months_sub
- now
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_days
- to_date
- unix_timestamp
- utc_timestamp
- week
- weekofyear
- weeks_add
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- 加密函数
- 地理位置函数
- 字符串函数
- JSON 函数
- Map 函数
- 模糊/正则匹配函数
- 工具函数
- 聚合函数
- Bitmap 函数
- bitmap_and
- bitmap_andnot
- bitmap_contains
- bitmap_count
- bitmap_empty
- bitmap_from_string
- bitmap_hash
- bitmap_has_any
- bitmap_intersect
- bitmap_max
- bitmap_min
- bitmap_or
- bitmap_remove
- bitmap_to_array
- bitmap_to_base64
- base64_to_bitmap
- bitmap_to_string
- bitmap_union
- bitmap_union_count
- bitmap_union_int
- bitmap_xor
- intersect_count
- sub_bitmap
- to_bitmap
- 数组函数
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- arrays_overlap
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- array_to_bitmap
- reverse
- unnest
- bit函数
- cast函数
- hash函数
- 条件函数
- 百分位函数
- 标量函数
- 数学函数
- 系统变量
- 用户自定义变量
- 错误码
- 系统限制
- SQL参考
- 常见问题解答
- 性能测试
使用 Stream Load 事务接口导入
为了支持和 Apache Flink®、Apache Kafka® 等其他系统之间实现跨系统的两阶段提交,并提升高并发 Stream Load 导入场景下的性能,StarRocks 提供了 Stream Load 事务接口。
本文介绍 Stream Load 事务接口、以及如何使用该事务接口把数据导入到 StarRocks 中。
接口说明
Stream Load 事务接口支持通过兼容 HTTP 协议的工具或语言发起接口操作请求。本文以 curl 工具为例介绍如何使用该接口。该接口提供事务管理、数据写入、事务预提交、事务去重和超时管理等功能。
事务管理
提供如下标准接口操作,用于管理事务:
/api/transaction/begin
:开启一个新事务。/api/transaction/commit
:提交当前事务,持久化变更。/api/transaction/rollback
:回滚当前事务,回滚变更。
事务预提交
提供 /api/transaction/prepare
接口操作,用于预提交当前事务,临时持久化变更。预提交一个事务后,您可以继续提交或者回滚该事务。这种机制下,如果在事务预提交成功以后 StarRocks 发生宕机,您仍然可以在系统恢复后继续执行提交。
说明
在事务预提交以后,请勿继续写入数据。继续写入数据的话,写入请求会报错。
数据写入
提供 /api/transaction/load
接口操作,用于写入数据。您可以在同一个事务中多次调用该接口来写入数据。
事务去重
复用 StarRocks 现有的标签机制,通过标签绑定事务,实现事务的 “至多一次 (At-Most-Once)” 语义。
超时管理
支持通过 FE 配置中的 stream_load_default_timeout_second
参数设置默认的事务超时时间。
开启事务时,可以通过 HTTP 请求头中的 timeout
字段来指定当前事务的超时时间。
开启事务时,还可以通过 HTTP 请求头中的 idle_transaction_timeout
字段来指定空闲事务超时时间。当事务超过 idle_transaction_timeout
所设置的超时时间而没有数据写入时,事务将自动回滚。
接口优势
Stream Load 事务接口具有如下优势:
精确一次语义
通过“预提交事务”、“提交事务”,方便实现跨系统的两阶段提交。例如配合在 Flink 实现“精确一次 (Exactly-Once)”语义的导入。
提升导入性能
在通过程序提交 Stream Load 作业的场景中,Stream Load 事务接口允许在一个导入作业中按需合并发送多次小批量的数据后“提交事务”,从而能减少数据导入的版本,提升导入性能。
使用限制
事务接口当前具有如下使用限制:
只支持单库单表事务,未来将会支持跨库多表事务。
只支持单客户端并发数据写入,未来将会支持多客户端并发数据写入。
支持在单个事务中多次调用数据写入接口
/api/transaction/load
来写入数据,但是要求所有/api/transaction/load
操作中的参数设置必须保持一致。导入 CSV 格式的数据时,需要确保每行数据结尾都有行分隔符。
基本操作
开始事务
语法
curl -H "label:<label_name>" -H "db:<database_name>" -H "table:<table_name>"
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/begin
返回结果
如果事务开始成功,则返回如下结果:
{ "Status": "OK", "Message": "", "Label": "xxx", "TxnId": 9032, "BeginTxnTimeMs": 0 }
如果事务的标签重复,则返回如下结果:
{ "Status": "LABEL_ALREADY_EXISTS", "ExistingJobStatus": "RUNNING", "Message": "Label [xxx] has already been used." }
如果发生标签重复以外的其他错误,则返回如下结果:
{ "Status": "FAILED", "Message": "" }
写入数据
语法
curl -H "label:<label_name>" -H "db:<database_name>" -H "table:<table_name>"
-T /path/to/data.csv
-XPUT http://<fe_host>:<fe_http_port>/api/transaction/load
返回结果
如果数据写入成功,则返回如下结果:
{ "TxnId": 1, "Seq": 0, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "OK", "Message": "", "NumberTotalRows": 5265644, "NumberLoadedRows": 5265644, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 10737418067, "LoadTimeMs": 418778, "StreamLoadPutTimeMs": 68, "ReceivedDataTimeMs": 38964, }
如果事务被判定为未知,则返回如下结果:
{ "TxnId": 1, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "FAILED", "Message": "TXN_NOT_EXISTS" }
如果事务状态无效,则返回如下结果:
{ "TxnId": 1, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "FAILED", "Message": "Transcation State Invalid" }
如果发生事务未知和状态无效以外的其他错误,则返回如下结果:
{ "TxnId": 1, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "FAILED", "Message": "" }
预提交事务
语法
curl -H "label:<label_name>" -H "db:<database_name>"
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/prepare
返回结果
如果事务预提交成功,则返回如下结果:
{ "TxnId": 1, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "OK", "Message": "", "NumberTotalRows": 5265644, "NumberLoadedRows": 5265644, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 10737418067, "LoadTimeMs": 418778, "StreamLoadPutTimeMs": 68, "ReceivedDataTimeMs": 38964, "WriteDataTimeMs": 417851 "CommitAndPublishTimeMs": 1393 }
如果事务被判定为不存在,则返回如下结果:
{ "TxnId": 1, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "FAILED", "Message": "Transcation Not Exist" }
如果事务预提交超时,则返回如下结果:
{ "TxnId": 1, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "FAILED", "Message": "commit timeout", }
如果发生事务不存在和预提交超时以外的其他错误,则返回如下结果:
{ "TxnId": 1, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "FAILED", "Message": "publish timeout" }
提交事务
语法
curl -H "label:<label_name>" -H "db:<database_name>"
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/commit
返回结果
如果事务提交成功,则返回如下结果:
{ "TxnId": 1, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "OK", "Message": "", "NumberTotalRows": 5265644, "NumberLoadedRows": 5265644, "NumberFilteredRows": 0, "NumberUnselectedRows": 0, "LoadBytes": 10737418067, "LoadTimeMs": 418778, "StreamLoadPutTimeMs": 68, "ReceivedDataTimeMs": 38964, "WriteDataTimeMs": 417851 "CommitAndPublishTimeMs": 1393 }
如果事务已经提交过,则返回如下结果:
{ "TxnId": 1, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "OK", "Message": "Transaction already commited", }
如果事务被判定为不存在,则返回如下结果:
{ "TxnId": 1, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "FAILED", "Message": "Transcation Not Exist" }
如果事务提交超时,则返回如下结果:
{ "TxnId": 1, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "FAILED", "Message": "commit timeout", }
如果数据发布超时,则返回如下结果:
{ "TxnId": 1, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "FAILED", "Message": "publish timeout", "CommitAndPublishTimeMs": 1393 }
如果发生事务不存在和超时以外的其他错误,则返回如下结果:
{ "TxnId": 1, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "FAILED", "Message": "" }
回滚事务
语法
curl -H "label:<label_name>" -H "db:<database_name>"
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/rollback
返回结果
如果事务回滚成功,则返回如下结果:
{ "TxnId": 1, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "OK", "Message": "" }
如果事务被判定为不存在,则返回如下结果:
{ "TxnId": 1, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "FAILED", "Message": "Transcation Not Exist" }
如果发生事务不存在以外的其他错误,则返回如下结果:
{ "TxnId": 1, "Label": "a25eca8b-7b48-4c87-9ea7-0cbdd913e77d", "Status": "FAILED", "Message": "" }
相关文档
有关 Stream Load 适用的业务场景、支持的数据文件格式、基本原理等信息,参见通过 HTTP PUT 从本地文件系统或流式数据源导入数据。
有关创建 Stream Load 作业的语法和参数,参见STREAM LOAD。