- StarRocks
- 产品简介
- 快速开始
- 部署 StarRocks
- 表设计
- 导入数据
- 导出数据
- 查询数据源
- 查询加速
- 管理手册
- 参考手册
- SQL参考
- 用户账户管理
- 集群管理
- ADD SQLBLACKLIST
- ADMIN CANCEL REPAIR TABLE
- ADMIN CHECK TABLET
- ADMIN REPAIR TABLE
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE RESOURCE GROUP
- CREATE FILE
- DELETE SQLBLACKLIST
- DROP FILE
- DROP RESOURCE GROUP
- EXPLAIN
- INSTALL PLUGIN
- KILL
- SET
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROC
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW SQLBLACKLIST
- SHOW TABLE STATUS
- SHOW VARIABLES
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ALTER RESOURCE
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE DATABASE
- CREATE EXTERNAL CATALOG
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP ANALYZE
- DROP STATS
- DROP CATALOG
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SET CATALOG
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW META
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL REFRESH MATERIALIZED VIEW
- CANCEL EXPORT
- CREATE ROUTINE LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- RESUME ROUTINE LOAD
- SELECT
- SHOW ALTER TABLE
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE CATALOG
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEWS
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- SUBMIT TASK
- UPDATE
- 辅助命令
- 数据类型
- 关键字
- AUTO_INCREMENT
- 函数参考
- Java UDF
- 窗口函数
- Lambda 表达式
- 日期函数
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_add
- hours_diff
- hours_sub
- microseconds_add
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- months_sub
- now
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_days
- to_date
- unix_timestamp
- utc_timestamp
- week
- weekofyear
- weeks_add
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- 字符串函数
- append_trailing_char_if_absent
- ascii
- char
- char_length
- character_length
- concat
- concat_ws
- ends_with
- find_in_set
- group_concat
- hex_decode_binary
- hex_decode_string
- hex
- instr
- lcase
- left
- length
- locate
- lower
- lpad
- ltrim
- money_format
- null_or_empty
- parse_url
- repeat
- replace
- reverse
- right
- rpad
- rtrim
- space
- split
- split_part
- starts_with
- strleft
- strright
- substring
- trim
- ucase
- unhex
- upper
- 聚合函数
- array_agg
- avg
- approx_count_distinct
- any_value
- bitmap
- bitmap_agg
- count
- grouping
- grouping_id
- hll_empty
- hll_hash
- hll_raw_agg
- hll_union
- hll_union_agg
- max
- max_by
- min
- multi_distinct_count
- multi_distinct_sum
- percentile_approx
- percentile_cont
- percentile_disc
- retention
- sum
- std
- stddev
- stddev_samp
- variance, variance_pop, var_pop
- var_samp
- window_funnel
- Array 函数
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- arrays_overlap
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- array_to_bitmap
- reverse
- unnest
- Bitmap 函数
- bitmap_agg
- bitmap_and
- bitmap_andnot
- bitmap_contains
- bitmap_count
- bitmap_empty
- bitmap_from_string
- bitmap_hash
- bitmap_has_any
- bitmap_intersect
- bitmap_max
- bitmap_min
- bitmap_or
- bitmap_remove
- bitmap_to_array
- bitmap_to_base64
- base64_to_bitmap
- bitmap_to_string
- bitmap_union
- bitmap_union_count
- bitmap_union_int
- bitmap_xor
- intersect_count
- sub_bitmap
- to_bitmap
- JSON 函数
- Map 函数
- Bit 函数
- Binary 函数
- 加密函数
- 模糊/正则匹配函数
- 条件函数
- 百分位函数
- 标量函数
- 工具函数
- 地理位置函数
- cast 函数
- hash 函数
- 数学函数
- 系统变量
- 用户自定义变量
- 错误码
- 系统限制
- SQL参考
- 常见问题解答
- 性能测试
通过 HTTP PUT 从本地文件系统或流式数据源导入数据
StarRocks 提供基于 HTTP 协议的 Stream Load 导入方式,帮助您从本地文件系统或流式数据源导入数据。
Stream Load 是一种同步的导入方式。您提交导入作业以后,StarRocks 会同步地执行导入作业,并返回导入作业的结果信息。您可以通过返回的结果信息来判断导入作业是否成功。
Stream Load 适用于以下业务场景:
导入本地数据文件。
一般可采用 curl 命令直接提交一个导入作业,将本地数据文件的数据导入到 StarRocks 中。
导入实时产生的数据流。
一般可采用 Apache Flink® 等程序提交一个导入作业,持续生成一系列导入任务,将实时产生的数据流持续不断地导入到 StarRocks 中。
Stream Load 支持在导入过程中做数据转换、以及通过 UPSERT 和 DELETE 操作实现数据变更。请参见导入过程中实现数据转换和通过导入实现数据变更。
注意:Stream Load 操作会同时更新和 StarRocks 原始表相关的物化视图的数据。
支持的数据文件格式
Stream Load 支持如下数据文件格式:
- CSV
- JSON
您可以通过 streaming_load_max_mb
参数来设置单个源数据文件的大小上限,但一般不建议调大此参数。具体请参见本文档“参数配置”章节。
说明
对于 CSV 格式的数据,需要注意以下两点:
- StarRocks 支持设置长度最大不超过 50 个字节的 UTF-8 编码字符串作为列分隔符,包括常见的逗号 (,)、Tab 和 Pipe (|)。
- 空值 (null) 用
\N
表示。比如,数据文件一共有三列,其中某行数据的第一列、第三列数据分别为a
和b
,第二列没有数据,则第二列需要用\N
来表示空值,写作a,\N,b
,而不是a,,b
。a,,b
表示第二列是一个空字符串。
使用限制
Stream Load 当前不支持导入某一列为 JSON 的 CSV 文件的数据。
基本原理
您需要在客户端上通过 HTTP 发送导入作业请求给 FE,FE 会通过 HTTP 重定向 (Redirect) 指令将请求转发给某一个 BE。或者,您也可以直接发送导入作业请求给某一个 BE。
说明
如果把导入作业请求发送给 FE,FE 会通过轮询机制选定由哪一个 BE 来接收请求,从而实现 StarRocks 集群内的负载均衡。因此,推荐您把导入作业请求发送给 FE。
接收导入作业请求的 BE 作为 Coordinator BE,将数据按表结构划分、并分发数据到其他各相关的 BE。导入作业的结果信息由 Coordinator BE 返回给客户端。需要注意的是,如果您在导入过程中停止 Coordinator BE,会导致导入作业失败。
下图展示了 Stream Load 的主要流程:
导入本地文件
创建导入作业
本文以 curl 工具为例,介绍如何使用 Stream Load 从本地文件系统导入 CSV 或 JSON 格式的数据。有关创建导入作业的详细语法和参数说明,请参见 STREAM LOAD。
注意在 StarRocks 中,部分文字是 SQL 语言的保留关键字,不能直接用于 SQL 语句。如果想在 SQL 语句中使用这些保留关键字,必须用反引号 (`) 包含起来。参见关键字。
导入 CSV 格式的数据
数据样例
在数据库
test_db
中创建一张名为table1
的主键模型表。表包含id
、name
和score
三列,主键为id
列,如下所示:MySQL [test_db]> CREATE TABLE `table1` ( `id` int(11) NOT NULL COMMENT "用户 ID", `name` varchar(65533) NULL COMMENT "用户姓名", `score` int(11) NOT NULL COMMENT "用户得分" ) ENGINE=OLAP PRIMARY KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 10;
在本地文件系统中创建一个 CSV 格式的数据文件
example1.csv
。文件一共包含三列,分别代表用户 ID、用户姓名和用户得分,如下所示:1,Lily,23 2,Rose,23 3,Alice,24 4,Julia,25
命令示例
通过如下命令,把 example1.csv
文件中的数据导入到 table1
表中:
curl --location-trusted -u <username>:<password> -H "label:123" \
-H "Expect:100-continue" \
-H "column_separator:," \
-H "columns: id, name, score" \
-T example1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load
example1.csv
文件中包含三列,跟 table1
表的 id
、name
、score
三列一一对应,并用逗号 (,) 作为列分隔符。因此,需要通过 column_separator
参数指定列分隔符为逗号 (,),并且在 columns
参数中按顺序把 example1.csv
文件中的三列临时命名为 id
、name
、score
。columns
参数中声明的三列,按名称对应 table1
表中的三列。
查询数据
导入完成后,查询 table1
表的数据,如下所示:
MySQL [test_db]> SELECT * FROM table1;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 1 | Lily | 23 |
| 2 | Rose | 23 |
| 3 | Alice | 24 |
| 4 | Julia | 25 |
+------+-------+-------+
4 rows in set (0.00 sec)
导入 JSON 格式的数据
数据样例
在数据库
test_db
中创建一张名为table2
的主键模型表。表包含id
和city
两列,主键为id
列,如下所示:MySQL [test_db]> CREATE TABLE `table2` ( `id` int(11) NOT NULL COMMENT "城市 ID", `city` varchar(65533) NULL COMMENT "城市名称" ) ENGINE=OLAP PRIMARY KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 10;
在本地文件系统中创建一个 JSON 格式的数据文件
example2.json
。文件一共包含两个字段,分别代表城市名称和城市 ID,如下所示:{"name": "北京", "code": 2}
命令示例
通过如下语句把 example2.json
文件中的数据导入到 table2
表中:
curl -v --location-trusted -u <username>:<password> -H "strict_mode: true" \
-H "Expect:100-continue" \
-H "format: json" -H "jsonpaths: [\"$.name\", \"$.code\"]" \
-H "columns: city,tmp_id, id = tmp_id * 100" \
-T example2.json -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table2/_stream_load
example2.json
文件中包含 name
和 code
两个键,跟 table2
表中的列之间的对应关系如下图所示。
上图所示的对应关系描述如下:
- 提取
example2.json
文件中包含的name
和code
两个字段,按顺序依次映射到jsonpaths
参数中声明的name
和code
两个字段。 - 提取
jsonpaths
参数中声明的name
和code
两个字段,按顺序映射到columns
参数中声明的city
和tmp_id
两列。 - 提取
columns
参数声明中的city
和id
两列,按名称映射到table2
表中的city
和id
两列。
说明
上述示例中,在导入过程中先将
example2.json
文件中code
字段对应的值乘以 100,然后再落入到table2
表的id
中。
有关导入 JSON 数据时 jsonpaths
、columns
和 StarRocks 表中的字段之间的对应关系,请参见 STREAM LOAD 文档中“列映射”章节。
查询数据
导入完成后,查询 table2
表的数据,如下所示:
MySQL [test_db]> SELECT * FROM table2;
+------+--------+
| id | city |
+------+--------+
| 200 | 北京 |
+------+--------+
4 rows in set (0.01 sec)
查看导入作业
导入作业结束后,StarRocks 会以 JSON 格式返回本次导入作业的结果信息,具体请参见 STREAM LOAD 文档中“返回值”章节。
Stream Load 不支持通过 SHOW LOAD 语句查看导入作业执行情况。
取消导入作业
Stream Load 不支持手动取消导入作业。如果导入作业发生超时或者导入错误,StarRocks 会自动取消该作业。
导入数据流
Stream Load 支持通过程序导入数据流,具体操作方法,请参见如下文档:
- Flink 集成 Stream Load,请参见使用 flink-connector-starrocks 导入至 StarRocks。
- Java 集成 Stream Load,请参见 https://github.com/StarRocks/demo/MiscDemo/stream_load。
- Apache Spark™ 集成 Stream Load,请参见 01_sparkStreaming2StarRocks。
参数配置
这里介绍使用 Stream Load 导入方式需要注意的一些系统参数配置。这些参数作用于所有 Stream Load 导入作业。
streaming_load_max_mb
:单个源数据文件的大小上限。默认文件大小上限为 10 GB。具体请参见 BE 配置项。建议一次导入的数据量不要超过 10 GB。如果数据文件的大小超过 10 GB,建议您拆分成若干小于 10 GB 的文件分次导入。如果由于业务场景需要,无法拆分数据文件,可以适当调大该参数的取值,从而提高数据文件的大小上限。
需要注意的是,如果您调大该参数的取值,需要重启 BE 才能生效,并且系统性能有可能会受影响,并且也会增加失败重试时的代价。
说明
导入 JSON 格式的数据时,需要注意以下两点:
- 单个 JSON 对象的大小不能超过 4 GB。如果 JSON 文件中单个 JSON 对象的大小超过 4 GB,会提示 "This parser can't support a document that big." 错误。
- HTTP 请求中 JSON Body 的大小默认不能超过 100 MB。如果 JSON Body 的大小超过 100 MB,会提示 "The size of this batch exceed the max size [104857600] of json type data data [8617627793]. Set ignore_json_size to skip check, although it may lead huge memory consuming." 错误。为避免该报错,可以在 HTTP 请求头中添加
"ignore_json_size:true"
设置,忽略对 JSON Body 大小的检查。
stream_load_default_timeout_second
:导入作业的超时时间。默认超时时间为 600 秒。具体请参见 FE 动态参数。如果您创建的导入作业经常发生超时,可以通过该参数适当地调大超时时间。您可以通过如下公式计算导入作业的超时时间:
导入作业的超时时间 > 待导入数据量/平均导入速度
例如,如果源数据文件的大小为 10 GB,并且当前 StarRocks 集群的平均导入速度为 100 MB/s,则超时时间应该设置为大于 100 秒。
说明
“平均导入速度”是指目前 StarRocks 集群的平均导入速度。导入速度主要受限于集群的磁盘 I/O 及 BE 个数。
Stream Load 还提供
timeout
参数来设置当前导入作业的超时时间。具体请参见 STREAM LOAD。
常见问题
请参见 Stream Load 常见问题。