ロードによるデータの変更
StarRocks が提供する Primary Key tables を使用すると、Stream Load、Broker Load、または Routine Load ジョブを実行して StarRocks テーブルにデータ変更を加えることができます。これらのデータ変更には、挿入、更新、削除が含まれます。ただし、Primary Key tables は Spark Load または INSERT を使用したデータ変更をサポートしていません。
StarRocks は部分更新と条件付き更新もサポートしています。
You can load data into StarRocks tables only as a user who has the INSERT privilege on those StarRocks tables. If you do not have the INSERT privilege, follow the instructions provided in GRANT to grant the INSERT privilege to the user that you use to connect to your StarRocks cluster. The syntax is GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}
.
このトピックでは、CSV データを例にして、ロードを通じて StarRocks テーブルにデータ変更を加える方法を説明します。サポートされるデータファイル形式は、選択したロード方法によって異なります。
注意
CSV データの場合、UTF-8 文字列(カンマ(,)、タブ、パイプ(|)など)をテキスト区切り文字として使用できますが、その長さは 50 バイトを超えないようにしてください。
実装
StarRocks が提供する Primary Key tables は UPSERT および DELETE 操作をサポートしており、INSERT 操作と UPDATE 操作を区別しません。
ロードジョブを作成する際、StarRocks はジョブ作成文またはコマンドに __op
という名前のフィールドを追加することをサポートしています。__op
フィールドは、実行したい操作の種類を指定するために使用されます。
注意
テーブルを作成する際、そのテーブルに
__op
という名前の列を追加する必要はありません。
__op
フィールドの定義方法は、選択したロード方法によって異なります。
-
Stream Load を選択した場合、
columns
パラメータを使用して__op
フィールドを定義します。 -
Broker Load を選択した場合、SET 句を使用して
__op
フィールドを定義します。 -
Routine Load を選択した場合、
COLUMNS
列を使用して__op
フィールドを定義します。
__op
フィールドを追加するかどうかは、行いたいデータ変更に基づいて決定できます。__op
フィールドを追加しない場合、操作の種類はデフォルトで UPSERT になります。主なデータ変更シナリオは次のとおりです。
-
ロードしたいデータファイルが UPSERT 操作のみを含む場合、
__op
フィールドを追加する必要はありません。 -
ロードしたいデータファイルが DELETE 操作のみを含む場合、
__op
フィールドを追加し、操作の種類を DELETE と指定する必要があります。 -
ロードしたいデータファイルが UPSERT と DELETE 操作の両方を含む場合、
__op
フィールドを追加し、データファイルに0
または1
の値を持つ列が含まれていることを確認する必要があります。値が0
の場合は UPSERT 操作を示し、値が1
の場合は DELETE 操作を示します。
使用上の注意
-
データファイルの各行が同じ数の列を持っていることを確認してください。
-
データ変更に関与する列には、主キー列が含まれている必要があります。
基本操作
このセクションでは、ロードを通じて StarRocks テーブルにデータ変更を加える方法の例を示します。詳細な構文とパラメータの説明については、STREAM LOAD、BROKER LOAD、および CREATE ROUTINE LOAD を参照してください。
UPSERT
ロードしたいデータファイルが UPSERT 操作のみを含む場合、__op
フィールドを追加する必要はありません。
注意
__op
フィールドを追加する場合:
操作の種類を UPSERT と指定できます。
__op
フィールドを空のままにしておくことができます。操作の種類はデフォルトで UPSERT になります。
データ例
-
データファイルを準備します。
a. ローカルファイルシステムに
example1.csv
という名前の CSV ファイルを作成します。このファイルは、ユーザー ID、ユーザー名、ユーザースコアを順に表す 3 つの列で構成されています。101,Lily,100
102,Rose,100b.
example1.csv
のデータを Kafka クラスターのtopic1
に公開します。 -
StarRocks テーブルを準備します。
a. StarRocks データベース
test_db
にtable1
という名前の Primary Key table を作成します。このテーブルは、id
、name
、score
の 3 つの列で構成されており、id
が主キーです。CREATE TABLE `table1`
(
`id` int(11) NOT NULL COMMENT "user ID",
`name` varchar(65533) NOT NULL COMMENT "user name",
`score` int(11) NOT NULL COMMENT "user score"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);注意
バージョン v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際にバケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細については、set the number of buckets を参照してください。
b.
table1
にレコードを挿入します。INSERT INTO table1 VALUES
(101, 'Lily',80);
データのロード
example1.csv
の id
が 101
のレコードを table1
に更新し、id
が 102
のレコードを table1
に挿入するためにロードジョブを実行します。
-
Stream Load ジョブを実行します。
-
__op
フィールドを含めたくない場合、次のコマンドを実行します。curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label1" \
-H "column_separator:," \
-T example1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load -
__op
フィールドを含めたい場合、次のコマンドを実行します。curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label2" \
-H "column_separator:," \
-H "columns:__op ='upsert'" \
-T example1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load
-
-
Broker Load ジョブを実行します。
-
__op
フィールドを含めたくない場合、次のコマンドを実行します。LOAD LABEL test_db.label1
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
into table table1
columns terminated by ","
format as "csv"
)
WITH BROKER; -
__op
フィールドを含めたい場合、次のコマンドを実行します。LOAD LABEL test_db.label2
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
into table table1
columns terminated by ","
format as "csv"
set (__op = 'upsert')
)
WITH BROKER;
-
-
Routine Load ジョブを実行します。
-
__op
フィールドを含めたくない場合、次のコマンドを実行します。CREATE ROUTINE LOAD test_db.table1 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS (id, name, score)
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test1",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
); -
__op
フィールドを含めたい場合、次のコマンドを実行します。CREATE ROUTINE LOAD test_db.table1 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS (id, name, score, __op ='upsert')
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test1",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);
-
データのクエリ
ロードが完了したら、table1
のデータをクエリして、ロードが成功したことを確認します。
SELECT * FROM table1;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 101 | Lily | 100 |
| 102 | Rose | 100 |
+------+------+-------+
2 rows in set (0.02 sec)
上記のクエリ結果に示されているように、example1.csv
の id
が 101
のレコードは table1
に更新され、id
が 102
のレコードは table1
に挿入されています。
DELETE
ロードしたいデータファイルが DELETE 操作のみを含む場合、__op
フィールドを追加し、操作の種類を DELETE と指定する必要があります。
データ例
-
データファイルを準備します。
a. ローカルファイルシステムに
example2.csv
という名前の CSV ファイルを作成します。このファイルは、ユーザー ID、ユーザー名、ユーザースコアを順に表す 3 つの列で構成されています。101,Jack,100
b.
example2.csv
のデータを Kafka クラスターのtopic2
に公開します。 -
StarRocks テーブルを準備します。
a. StarRocks テーブル
test_db
にtable2
という名前の Primary Key table を作成します。このテーブルは、id
、name
、score
の 3 つの列で構成されており、id
が主キーです。CREATE TABLE `table2`
(
`id` int(11) NOT NULL COMMENT "user ID",
`name` varchar(65533) NOT NULL COMMENT "user name",
`score` int(11) NOT NULL COMMENT "user score"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);注意
バージョン v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際にバケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細については、set the number of buckets を参照してください。
b.
table2
に 2 つのレコードを挿入します。INSERT INTO table2 VALUES
(101, 'Jack', 100),
(102, 'Bob', 90);
データのロード
example2.csv
の id
が 101
のレコードを table2
から削除するためにロードジョブを実行します。
-
Stream Load ジョブを実行します。
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label3" \
-H "column_separator:," \
-H "columns:__op='delete'" \
-T example2.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table2/_stream_load -
Broker Load ジョブを実行します。
LOAD LABEL test_db.label3
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example2.csv")
into table table2
columns terminated by ","
format as "csv"
set (__op = 'delete')
)
WITH BROKER; -
Routine Load ジョブを実行します。
CREATE ROUTINE LOAD test_db.table2 ON table2
COLUMNS(id, name, score, __op = 'delete')
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test2",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);
データのクエリ
ロードが完了したら、table2
のデータをクエリして、ロードが成功したことを確認します。
SELECT * FROM table2;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 102 | Bob | 90 |
+------+------+-------+
1 row in set (0.00 sec)
上記のクエリ結果に示されているように、example2.csv
の id
が 101
のレコードは table2
から削除されています。
UPSERT と DELETE
ロードしたいデータファイルが UPSERT と DELETE 操作の両方を含む場合、__op
フィールドを追加し、データファイルに 0
または 1
の値を持つ列が含まれていることを確認する必要があります。値が 0
の場合は UPSERT 操作を示し、値が 1
の場合は DELETE 操作を示します。
データ例
-
データファイルを準備します。
a. ローカルファイルシステムに
example3.csv
という名前の CSV ファイルを作成します。このファイルは、ユーザー ID、ユーザー名、ユーザースコア、操作の種類を順に表す 4 つの列で構成されています。101,Tom,100,1
102,Sam,70,0
103,Stan,80,0b.
example3.csv
のデータを Kafka クラスターのtopic3
に公開します。 -
StarRocks テーブルを準備します。
a. StarRocks データベース
test_db
にtable3
という名前の Primary Key table を作成します。このテーブルは、id
、name
、score
の 3 つの列で構成されており、id
が主キーです。CREATE TABLE `table3`
(
`id` int(11) NOT NULL COMMENT "user ID",
`name` varchar(65533) NOT NULL COMMENT "user name",
`score` int(11) NOT NULL COMMENT "user score"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);
注意
バージョン v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際にバケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細については、set the number of buckets を参照してください。
b. table3
に 2 つのレコードを挿入します。
INSERT INTO table3 VALUES
(101, 'Tom', 100),
(102, 'Sam', 90);
データのロード
example3.csv
の id
が 101
のレコードを table3
から削除し、id
が 102
のレコードを table3
に更新し、id
が 103
のレコードを table3
に挿入するためにロードジョブを実行します。
-
Stream Load ジョブを実行します。
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label4" \
-H "column_separator:," \
-H "columns: id, name, score, temp, __op = temp" \
-T example3.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table3/_stream_load注意
上記の例では、
example3.csv
の操作の種類を表す第 4 列が一時的にtemp
として命名され、columns
パラメータを使用して__op
フィールドがtemp
列にマッピングされています。このようにして、StarRocks はexample3.csv
の第 4 列の値が0
または1
であるかに応じて、UPSERT または DELETE 操作を実行するかどうかを決定できます。 -
Broker Load ジョブを実行します。
LOAD LABEL test_db.label4
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
into table table1
columns terminated by ","
format as "csv"
(id, name, score, temp)
set (__op=temp)
)
WITH BROKER; -
Routine Load ジョブを実行します。
CREATE ROUTINE LOAD test_db.table3 ON table3
COLUMNS(id, name, score, temp, __op = temp)
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test3",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
データのクエリ
ロードが完了したら、table3
のデータをクエリして、ロードが成功したことを確認します。
SELECT * FROM table3;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 102 | Sam | 70 |
| 103 | Stan | 80 |
+------+------+-------+
2 rows in set (0.01 sec)
上記のクエリ結果に示されているように、example3.csv
の id
が 101
のレコードは table3
から削除され、id
が 102
のレコードは table3
に更新され、id
が 103
のレコードは table3
に挿入されています。
部分更新
Primary Key tables は部分更新もサポートしており、異なるデータ更新シナリオに対して行モードと列モードの 2 つの部分更新モードを提供します。これらの 2 つの部分更新モードは、クエリパフォーマンスを保証しながら、可能な限り部分更新のオーバーヘッドを最小限に抑え、リアルタイム更新を実現します。行モードは、多くの列と小さなバッチを含むリアルタイム更新シナリオに適しています。列モードは、少数の列と多数の行を含むバッチ処理更新シナリオに適しています。
注意
部分更新を実行する際、更新対象の行が存在しない場合、StarRocks は新しい行を挿入し、データ更新が挿入されていないフィールドにはデフォルト値を入力します。
このセクションでは、CSV を例にして部分更新を実行する方法を説明します。
データ例
-
データファイルを準備します。
a. ローカルファイルシステムに
example4.csv
という名前の CSV ファイルを作成します。このファイルは、ユーザー ID とユーザー名を順に表す 2 つの列で構成されています。101,Lily
102,Rose
103,Aliceb.
example4.csv
のデータを Kafka クラスターのtopic4
に公開します。 -
StarRocks テーブルを準備します。
a. StarRocks データベース
test_db
にtable4
という名前の Primary Key table を作成します。このテーブルは、id
、name
、score
の 3 つの列で構成されており、id
が主キーです。CREATE TABLE `table4`
(
`id` int(11) NOT NULL COMMENT "user ID",
`name` varchar(65533) NOT NULL COMMENT "user name",
`score` int(11) NOT NULL COMMENT "user score"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);注意
バージョン v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際にバケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細については、set the number of buckets を参照してください。
b.
table4
にレコードを挿入します。INSERT INTO table4 VALUES
(101, 'Tom',80);
データのロード
example4.csv
の 2 つの列のデータを table4
の id
列と name
列に更新するためにロードを実行します。
-
Stream Load ジョブを実行します。
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label7" -H "column_separator:," \
-H "partial_update:true" \
-H "columns:id,name" \
-T example4.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table4/_stream_load注意
Stream Load を選択する場合、部分更新機能を有効にするために
partial_update
パラメータをtrue
に設定する必要があります。デフォルトは行モードでの部分更新です。列モードで部分更新を行う必要がある場合は、partial_update_mode
をcolumn
に設定する必要があります。さらに、更新したい列を指定するためにcolumns
パラメータを使用する必要があります。 -
Broker Load ジョブを実行します。
LOAD LABEL test_db.table4
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example4.csv")
into table table4
format as "csv"
(id, name)
)
WITH BROKER
PROPERTIES
(
"partial_update" = "true"
);注意
Broker Load を選択する場合、部分更新機能を有効にするために
partial_update
パラメータをtrue
に設定する必要があります。デフォルトは行モードでの部分更新です。列モードで部分更新を行う必要がある場合は、partial_update_mode
をcolumn
に設定する必要があります。さらに、更新したい列を指定するためにcolumn_list
パラメータを使用する必要があります。 -
Routine Load ジョブを実行します。
CREATE ROUTINE LOAD test_db.table4 on table4
COLUMNS (id, name),
COLUMNS TERMINATED BY ','
PROPERTIES
(
"partial_update" = "true"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test4",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);注意
- Routine Load を選択する場合、部分更新機能を有効にするために
partial_update
パラメータをtrue
に設定する必要があります。さらに、更新したい列を指定するためにCOLUMNS
パラメータを使用する必要があります。 - Routine Load は行モードでの部分更新のみをサポートし、列モードでの部分更新はサポートしていません。
- Routine Load を選択する場合、部分更新機能を有効にするために
データのクエリ
ロードが完了したら、table4
のデータをクエリして、ロードが成功したことを確認します。
SELECT * FROM table4;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 102 | Rose | 0 |
| 101 | Lily | 80 |
| 103 | Alice | 0 |
+------+-------+-------+
3 rows in set (0.01 sec)
上記のクエリ結果に示されているように、example4.csv
の id
が 101
のレコードは table4
に更新され、id
が 102
と 103
のレコードは table4
に挿入されています。
条件付き更新
StarRocks v2.5 以降、Primary Key tables は条件付き更新をサポートしています。非主キー列を条件として指定し、更新が有効になるかどうかを決定できます。このようにして、ソースレコードから宛先レコードへの更新は、指定された列でソースデータレコードが宛先データレコードよりも大きいか等しい値を持つ場合にのみ有効になります。
条件付き更新機能はデータの順序の乱れを解決するために設計されています。ソースデータが順序が乱れている場合、この機能を使用して新しいデータが古いデータによって上書きされないようにすることができます。
注意
- 同じバッチのデータに対して異なる列を更新条件として指定することはできません。
- DELETE 操作は条件付き更新をサポートしていません。
- バージョン v3.1.3 より前のバージョンでは、部分更新と条件付き更新を同時に使用することはできません。v3.1.3 以降、StarRocks は部分更新と条件付き更新を同時に使用することをサポートしています。
データ例
-
データファイルを準備します。
a. ローカルファイルシステムに
example5.csv
という名前の CSV ファイルを作成します。このファイルは、ユーザー ID、バージョン、ユーザースコアを順に表す 3 つの列で構成されています。101,1,100
102,3,100b.
example5.csv
のデータを Kafka クラスターのtopic5
に公開します。 -
StarRocks テーブルを準備します。
a. StarRocks データベース
test_db
にtable5
という名前の Primary Key table を作成します。このテーブルは、id
、version
、score
の 3 つの列で構成されており、id
が主キーです。CREATE TABLE `table5`
(
`id` int(11) NOT NULL COMMENT "user ID",
`version` int NOT NULL COMMENT "version",
`score` int(11) NOT NULL COMMENT "user score"
)
ENGINE=OLAP
PRIMARY KEY(`id`) DISTRIBUTED BY HASH(`id`);注意
バージョン v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際にバケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細については、set the number of buckets を参照してください。
b.
table5
にレコードを挿入します。INSERT INTO table5 VALUES
(101, 2, 80),
(102, 2, 90);
データのロード
example5.csv
から id
が 101
と 102
のレコードを table5
に更新し、それぞれのレコードの version
値が現在の version
値以上の場合にのみ更新が有効になるように指定します。
-
Stream Load ジョブを実行します。
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label10" \
-H "column_separator:," \
-H "merge_condition:version" \
-T example5.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table5/_stream_load -
Routine Load ジョブを実行します。
CREATE ROUTINE LOAD test_db.table5 on table5
COLUMNS (id, version, score),
COLUMNS TERMINATED BY ','
PROPERTIES
(
"merge_condition" = "version"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "topic5",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
); -
Broker Load ジョブを実行します。
LOAD LABEL test_db.table5
( DATA INFILE ("s3://xxx.csv")
INTO TABLE table5 COLUMNS TERMINATED BY "," FORMAT AS "CSV"
)
WITH BROKER
PROPERTIES
(
"merge_condition" = "version"
);
データのクエリ
ロードが完了したら、table5
のデータをクエリして、ロードが成功したことを確認します。
SELECT * FROM table5;
+------+------+-------+
| id | version | score |
+------+------+-------+
| 101 | 2 | 80 |
| 102 | 3 | 100 |
+------+------+-------+
2 rows in set (0.02 sec)
上記のクエリ結果に示されているように、example5.csv
の id
が 101
のレコードは table5
に更新されておらず、id
が 102
のレコードは table5
に挿入されています。