通过导入实现数据变更
StarRocks 的主键表支持通过 Stream Load、Broker Load 或 Routine Load 导入作业,对 StarRocks 表进行数据变更,包括插入、更新和删除数据。不支持通过 Spark Load 导入作业或 INSERT 语句对 StarRocks 表进行数据变更。
StarRocks 还支持部分更新 (Partial Update) 和条件更新 (Conditional Update)。
注意
导入操作需要目标表的 INSERT 权限。如果您的用户账号没有 INSERT 权限,请参考 GRANT 给用户赋权。
本文以 CSV 格式的数据文件为例介绍如何通过导入实现数据变更。具体支持的数据文件类型,跟您选择的导入方式有关。
说明
对于 CSV 格式的数据,StarRocks 支持设置长度最大不超过 50 个字节的 UTF-8 编码字符串作为列分隔符,包括常见的逗号 (,)、Tab 和 Pipe (|)。
内部实现
StarRocks 的主键表目前支持 UPSERT 和 DELETE 操作,不支持区分 INSERT 和 UPDATE 操作。
在创建导入作业时,StarRocks 支持在导入作业的创建语句或命令中添加 __op
字段,用于指定操作类型。
说明
不需要在创建 StarRocks 表时 添加
__op
列。
不同的导入方式,定义 __op
字段的方法也不相同:
-
如果使用 Stream Load 导入方式,需要通过
columns
参数来定义__op
字段。 -
如果使用 Broker Load 导入方式,需要通过 SET 子句来定义
__op
字段。 -
如果使用 Routine Load 导入方式,需要通过
COLUMNS
参数来定义__op
字段。
根据要做的数据变更操作,您可以选择添加或者不添加 __op
字段。不添加 __op
字段的话,默认为 UPSERT 操作。主要涉及的数据变更操作场景如下:
-
当数据文件只涉及 UPSERT 操作时,可以不添加
__op
字段。 -
当数据文件只涉及 DELETE 操作时,必须添加
__op
字段,并且指定操作类型为 DELETE。 -
当数据文件中同时包含 UPSERT 和 DELETE 操作时,必须添加
__op
字段,并且确保数据文件中包含一个代表操作类型的列,取值为0
或1
。其中,取值为0
时代表 UPSERT 操作,取值为1
时代表 DELETE 操作。
使用说明
-
必须确保待导入的数据文件中每一行的列数都相同。
-
所更新的列必须包含主键列。
前提条件
Broker Load
Routine Load
如果使用 Routine Load 导入数据,必须确保您的 Apache Kafka® 集群已创建 Topic。本文假设您已部署四个 Topic,分别为 topic1
、topic2
、topic3
和 topic4
。
基本操作
下面通过几个示例来展示具体的导入操作。有关使用 Stream Load、Broker Load 和 Routine Load 导入数据的详细语法和参数介绍,请参见 STREAM LOAD、BROKER LOAD 和 CREATE ROUTINE LOAD。
UPSERT
当数据文件只涉及 UPSERT 操作时,可以不添加 __op
字段。
如果您添加 __op
字段:
-
可以指定
__op
为 UPSERT 操作。 -
也可以不做任何指定,StarRocks 默认导入为 UPSERT 操作。
数据样例
-
准备数据文件。
a. 在本地文件系统创建一个 CSV 格式的数据文件
example1.csv
。文件包含三列,分别代表用户 ID、用户姓名和用户得分,如下所示:101,Lily,100
102,Rose,100b. 把
example1.csv
文件中的数据上传到 Kafka 集群的topic1
中。 -
准备 StarRocks 表。
a. 在数据库
test_db
中创建一张名为table1
的主键表。表包含id
、name
和score
三列,分别代表用户 ID、用户名称和用户得分,主键为id
列,如下所示:CREATE TABLE `table1`
(
`id` int(11) NOT NULL COMMENT "用户 ID",
`name` varchar(65533) NOT NULL COMMENT "用户姓名",
`score` int(11) NOT NULL COMMENT "用户得分"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);说明
自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 设置分桶数量。
b. 向
table1
表中插入一条数据,如下所示:INSERT INTO table1 VALUES
(101, 'Lily',80);
导入数据
通过导入,把 example1.csv
文件中 id
为 101
的数据更新到 table1
表中,并且把 example1.csv
文件中 id
为 102
的数据插入到 table1
表中。
-
通过 Stream Load 导入:
-
不添加
__op
字段:curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label1" \
-H "column_separator:," \
-T example1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load -
添加
__op
字段:curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label2" \
-H "column_separator:," \
-H "columns:__op ='upsert'" \
-T example1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load
-
-
通过 Broker Load 导入:
-
不添加
__op
字段:LOAD LABEL test_db.label1
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
into table table1
columns terminated by ","
format as "csv"
)
WITH BROKER; -
添加
__op
字段:LOAD LABEL test_db.label2
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
into table table1
columns terminated by ","
format as "csv"
set (__op = 'upsert')
)
WITH BROKER;
-
-
通过 Routine Load 导入:
-
不添加
__op
字段:CREATE ROUTINE LOAD test_db.table1 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS (id, name, score)
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test1",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
); -
添加
__op
字段:CREATE ROUTINE LOAD test_db.table1 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS (id, name, score, __op ='upsert')
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test1",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);
-
查询数据
导入完成后,查询 table1
表的数据,如下所示:
SELECT * FROM table1;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 101 | Lily | 100 |
| 102 | Rose | 100 |
+------+------+-------+
2 rows in set (0.02 sec)
从查询结果可以看到,example1.csv
文件中 id
为 101
的数据已经更新到 table1
表中,并且 example1.csv
文件中 id
为 102
的数据已经插入到 table1
表中。
DELETE
当数据文件只涉及 DELETE 操作时,必须添加 __op
字段,并且指定操作类型为 DELETE。
数 据样例
-
准备数据文件。
a. 在本地文件系统创建一个 CSV 格式的数据文件
example2.csv
。文件包含三列,分别代表用户 ID、用户姓名和用户得分,如下所示:101,Jack,100
b. 把
example2.csv
文件中的数据上传到 Kafka 集群的topic2
中。 -
准备 StarRocks 表。
a. 在数据库
test_db
中创建一张名为table2
的主键表。表包含id
、name
和score
三列,分别代表用户 ID、用户名称和用户得分,主键为id
列,如下所示:CREATE TABLE `table2`
(
`id` int(11) NOT NULL COMMENT "用户 ID",
`name` varchar(65533) NOT NULL COMMENT "用户姓名",
`score` int(11) NOT NULL COMMENT "用户得分"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);说明
自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 设置分桶数量。
b. 向
table2
表中插入数据,如下所示:INSERT INTO table2 VALUES
(101, 'Jack', 100),
(102, 'Bob', 90);
导入数据
通过导入,把 example2.csv
文件中 id
为 101
的数据从 table2
表中删除。
-
通过 Stream Load 导入:
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label3" \
-H "column_separator:," \
-H "columns:__op='delete'" \
-T example2.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table2/_stream_load -
通过 Broker Load 导入:
LOAD LABEL test_db.label3
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example2.csv")
into table table2
columns terminated by ","
format as "csv"
set (__op = 'delete')
)
WITH BROKER; -
通过 Routine Load 导入:
CREATE ROUTINE LOAD test_db.table2 ON table2
COLUMNS(id, name, score, __op = 'delete')
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test2",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);
查询数据
导入完成后,查询 table2
表的数据,如下所示:
SELECT * FROM table2;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 102 | Bob | 90 |
+------+------+-------+
1 row in set (0.00 sec)
从查询结果可以看到,example2.csv
文件中 id
为 101
的数据已经从 table2
表中删除。
UPSERT 和 DELETE
当数据文件中同时包含 UPSERT 和 DELETE 操作时,必须添加 __op
字段,并且确保数据文件中包含一个代表操作类型的列,取值为 0
或 1
。其中,取值为 0
时代表 UPSERT 操作,取值为 1
时代表 DELETE 操作。
数据样例
-
准备数据文件。
a. 在本地文件系统创建一个 CSV 格式的数据文件
example3.csv
。文件包含四列,分别代表用户 ID、用户姓名、用户得分和操作类型,如下所示:101,Tom,100,1
102,Sam,70,0
103,Stan,80,0b. 把
example3.csv
文件中的数据上传到 Kafka 集群的topic3
中。 -
准备 StarRocks 表。
a. 在数据库
test_db
中创建一张名为table3
的主键表。表包含id
、name
和score
三列,分别代表用户 ID、用户名称和用户得分,主键为id
列,如下所示:CREATE TABLE `table3`
(
`id` int(11) NOT NULL COMMENT "用户 ID",
`name` varchar(65533) NOT NULL COMMENT "用户姓名",
`score` int(11) NOT NULL COMMENT "用户得分"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);说明
自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 设置分桶数量。
b. 向
table3
表中插入数据,如下所示:INSERT INTO table3 VALUES
(101, 'Tom', 100),
(102, 'Sam', 90);
导入数据
通过导入,把 example3.csv
文件中 id
为 101
的数据从 table3
表中删除,把 example3.csv
文件中 id
为 102
的数据更新到 table3
表,并且把 example3.csv
文件中 id
为 103
的数据插入到 table3
表:
-
通过 Stream Load 导入:
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label4" \
-H "column_separator:," \
-H "columns: id, name, score, temp, __op = temp" \
-T example3.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table3/_stream_load说明
上述示例中,通过
columns
参数把example3.csv
文件中代表组别代码的第四列临时命名为temp
,然后定义__op
字段等于临时命名的temp
列。这样,StarRocks 可以根据example3.csv
文件中第四列的取值是0
还是1
来确定执行 UPSERT 还是 DELETE 操作。 -
通过 Broker Load 导入:
LOAD LABEL test_db.label4
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
into table table1
columns terminated by ","
format as "csv"
(id, name, score, temp)
set (__op=temp)
)
WITH BROKER; -
通过 Routine Load 导入:
CREATE ROUTINE LOAD test_db.table3 ON table3
COLUMNS(id, name, score, temp, __op = temp)
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test3",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
查询数据
导入完成后,查询 table3
表的数据,如下所示:
SELECT * FROM table3;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 102 | Sam | 70 |
| 103 | Stan | 80 |
+------+------+-------+
2 rows in set (0.01 sec)
从查询结果可以看到,example3.csv
文件中 id
为 101
的数据已经从 table3
表中删除,example3.csv
文件中 id
为 102
的数据已经更新到 table3
表中,并且 example3.csv
文件中 id
为 103
的数据已经插入到 table3
表中。
部分更新
主键表还支持部分列更新(Partial Updates),并且针对不同的数据更新场景,提供了行模式和列模式两种部分列更新,在不影响查询性能的同时,尽可能地降低部分更新的开销,从而能够保证更新的实时性。行模式比较适用于较多列且小批量的实时更新场景。列模式适用于少数列并且大量行的批处理更新场景。
注意
部分更新时,如果要更新的行不存在,那么 StarRocks 会插入新的一行,并自动对缺失的列填充默认值。如果没有定义默认值,则自动填充
0
。
如下以 CSV 格式的数据文件为例进行说明。
数据样例
-
准备数据文件。
a. 在本地文件系统创建一个 CSV 格式的数据文件
example4.csv
。文件包含两列,分别代表用户 ID 和用户姓名,如下所示:101,Lily
102,Rose
103,Aliceb. 把
example4.csv
文件中的数据上传到 Kafka 集群的topic4
中。 -
准备 StarRocks 表。
a. 在数据库
test_db
中创建一张名为table4
的主键表。表包含id
、name
和score
三列,分别代表用户 ID、用户名称和用户得分,主键为id
列,如下所示:CREATE TABLE `table4`
(
`id` int(11) NOT NULL COMMENT "用户 ID",
`name` varchar(65533) NOT NULL COMMENT "用户姓名",
`score` int(11) NOT NULL COMMENT "用户得分"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);说明
自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 设置分桶数量。
b. 向
table4
表中插入一条数据,如下所示:INSERT INTO table4 VALUES
(101, 'Tom',80);
导入数据
通过导入,把 example4.csv
里的两列数据更新到 table4
表的 id
和 name
两列。
-
通过 Stream Load 导入:
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label7" -H "column_separator:," \
-H "partial_update:true" \
-H "columns:id,name" \
-T example4.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table4/_stream_load说明
使用 Stream Load 导入数据时,需要设置
partial_update
为true
,以开启部分更新特性,默认为行模式部分更新,如果需要使用列模式部分 更新,则需要设置partial_update_mode
为column
。另外,还需要在columns
中声明待更新数据的列的名称。 -
通过 Broker Load 导入:
LOAD LABEL test_db.table4
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example4.csv")
into table table4
format as "csv"
(id, name)
)
WITH BROKER
PROPERTIES
(
"partial_update" = "true"
);说明
使用 Broker Load 导入数据时,需要设置
partial_update
为true
,以开启部分更新特性,默认为行模式部分更新,如果需要使用列模式部分更新,则需要设置partial_update_mode
为column
。另外,还需要在column_list
中声明待更新数据的列的名称。 -
通过 Routine Load 导入:
CREATE ROUTINE LOAD test_db.table4 on table4
COLUMNS (id, name),
COLUMNS TERMINATED BY ','
PROPERTIES
(
"partial_update" = "true"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test4",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);说明
- 使用 Routine Load 导入数据时,需要设置
partial_update
为true
,以开启部分更新特性。另外,还需要在COLUMNS
中声明待更新数据的列的名称。 - Routine Load 仅支持行模式部分更新,不支持列模式部分更新。
- 使用 Routine Load 导入数据时,需要设置
查询数据
导入完成后,查询 table4
表的数据,如下所示:
SELECT * FROM table4;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 102 | Rose | 0 |
| 101 | Lily | 80 |
| 103 | Alice | 0 |
+------+-------+-------+
3 rows in set (0.01 sec)
从查询结 果可以看到,example4.csv
文件中 id
为 101
的数据已经更新到 table4
表中,并且 example4.csv
文件中 id
为 102
和 103
的数据已经插入到 table4
表中。
条件更新
自 StarRocks v2.5 起,主键表支持条件更新 (Conditional Update)。您可以指定某一非主键列为更新条件,这样只有当导入的数据中该列的值大于等于当前值的时候,更新才会生效。
条件更新功能用于解决数据乱序的问题。如果上游数据发生乱序,可以使用条件更新功能保证新的数据不被老的数据覆盖。
说明
不支持给同一批导入的数据指定不同的条件。
不支持删除操作。
在 3.1.3 版本及以前,StarRocks 不支持条件更新同部分更新一并使用。自 3.1.3 版本起,StarRocks 才支持条件更新同部分更新一并使用。
数据样例
-
准备数据文件。
a. 在本地文件系统创建一个 CSV 格式的数据文件
example5.csv
。文件包含三列,分别代表用户 ID、版本号和用户得分,如下所示:101,1,100
102,3,100b. 把
example5.csv
文件中的数据上传到 Kafka 集群的topic5
中。 -
准备 StarRocks 表。
a. 在数据库
test_db
中创建一张名为table5
的主键表。表包含id
、version
和score
三列,分别代表用户 ID、版本号和用户得分,主键为id
列,如下所示:CREATE TABLE `table5`
(
`id` int(11) NOT NULL COMMENT "用户 ID",
`version` int NOT NULL COMMENT "版本号",
`score` int(11) NOT NULL COMMENT "用户得分"
)
ENGINE=OLAP
PRIMARY KEY(`id`) DISTRIBUTED BY HASH(`id`);说明
自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 设置分桶数量。
b. 向
table5
表中插入两条数据,如下所示:INSERT INTO table5 VALUES
(101, 2, 80),
(102, 2, 90);
导入数据
通过导入,把 example5.csv
文件中 id
为 101
、102
的数据更新到 table5
表中,指定 merge_condition
为 version
列,表示只有当导入的数据中 version
大于等于 table5
中对应行的version
值时,更新才会生效。
-
通过 Stream Load 导入:
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label10" \
-H "column_separator:," \
-H "merge_condition:version" \
-T example5.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table5/_stream_load -
通过 Routine Load 导入:
CREATE ROUTINE LOAD test_db.table5 on table5
COLUMNS (id, version, score),
COLUMNS TERMINATED BY ','
PROPERTIES
(
"merge_condition" = "version"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "topic5",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
); -
通过 Broker Load 导入:
LOAD LABEL test_db.table5
( DATA INFILE ("s3://xxx.csv")
INTO TABLE table5 COLUMNS TERMINATED BY "," FORMAT AS "CSV"
)
WITH BROKER
PROPERTIES
(
"merge_condition" = "version"
);
查询数据
导入完成后,查询 table5
表的数据,如下所示:
SELECT * FROM table5;
+------+------+-------+
| id | version | score |
+------+------+-------+
| 101 | 2 | 80 |
| 102 | 3 | 100 |
+------+------+-------+
2 rows in set (0.02 sec)
从查询结果可以看到,example5.csv
文件中 id
为 101
的数据并没有被更新,而 id
为 102
已经被更新。