ロードによるデータ変更
StarRocks が提供する主キーテーブルを使用すると、Stream Load、Broker Load、またはRoutine Loadジョブを実行して StarRocks テーブルにデータ変更を加えることができます。これらのデータ変更には、挿入、更新、削除が含まれます。ただし、主キーテーブルは、Spark LoadやINSERTを使用したデータ変更をサポートしていません。
StarRocks は部分更新と条件付き更新もサポートしています。
StarRocks のテーブルにデータを ロード するには、その StarRocks テーブルに対して INSERT 権限を持つユーザーである必要があります。INSERT 権限がない場合は、GRANT に記載されている手順に従って、StarRocks クラスターに接続するために使用するユーザーに INSERT 権限を付与してください。構文は GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>} です。
このトピックでは、CSV データを例にして、ロードを通じて StarRocks テーブルにデータ変更を加える方法を説明します。サポートされるデータファイル形式は、選択したロード方法によって異なります。
注意
CSV データの場合、UTF-8 文字列(カンマ(,)、タブ、パイプ(|)など)をテキスト区切り文字として使用できますが、その長さは 50 バイトを超えないようにしてください。
実装
StarRocks が提供する主キーテーブルは、UPSERT および DELETE 操作をサポートしており、INSERT 操作と UPDATE 操作を区別しません。
ロードジョブを作成する際に、StarRocks はジョブ作成ステートメントまたはコマンドに __op という名前のフィールドを追加することをサポートしています。__op フィールドは、実行したい操作の種類を指定するために使用されます。
注意
テーブルを作成する際に、そのテーブルに
__opという名前の列を追加する必要はありません。
__op フィールドの定義方法は、選択したロード方法によって異なります。
-
Stream Load を選択した場合、
columnsパラメータを使用して__opフィールドを定義します。 -
Broker Load を選択した場合、SET 句を使用して
__opフィールドを定義します。 -
Routine Load を選択した場合、
COLUMNS列を使用して__opフィールドを定義します。
データ変更に基づいて __op フィールドを追加するかどうかを決定できます。__op フィールドを追加しない場合、操作の種類はデフォルトで UPSERT になります。主なデータ変更シナリオは次のとおりです。
-
ロードしたいデータファイルが UPSERT 操作のみを含む場合、
__opフィールドを追加する必要はありません。 -
ロードしたいデータファイルが DELETE 操作のみを含む場合、
__opフィールドを追加し、操作の種類を DELETE として指定する必要があります。 -
ロードしたいデータファイルが UPSERT および DELETE 操作の両方を含む場合、
__opフィールドを追加し、データファイルに0または1の値を持つ列が含まれていることを確認する必要があります。0の値は UPSERT 操作を示し、1の値は DELETE 操作を示します。
使用上の注意
-
データファイルの各行が同じ数の列を持っていることを確認してください。
-
データ変更に関与する列には、主キー列が含まれている必要があります。
基本操作
このセクションでは、ロードを通じて StarRocks テーブルにデータ変 更を加える方法の例を示します。詳細な構文とパラメータの説明については、STREAM LOAD、BROKER LOAD、およびCREATE ROUTINE LOADを参照してください。
UPSERT
ロードしたいデータファイルが UPSERT 操作のみを含む場合、__op フィールドを追加する必要はありません。
注意
__opフィールドを追加する場合:
- 操作の種類を UPSERT として指定できます。
__opフィールドを空のままにしておくことができます。操作の種類はデフォルトで UPSERT になります。
データ例
-
データファイルを準備します。
a. ローカルファイルシステムに
example1.csvという名前の CSV ファイルを作成します。このファイルは、ユーザー ID、ユーザー名、ユーザースコアを順に表す 3 つの列で構成されています。101,Lily,100
102,Rose,100b.
example1.csvのデータを Kafka クラスターのtopic1に公開します。 -
StarRocks テーブルを準備します。
a. StarRocks データベース
test_dbにtable1という名前の主キーテーブルを作成します。このテーブルは、id、name、scoreの 3 つの列で構成されており、idが主キーです。CREATE TABLE `table1`
(
`id` int(11) NOT NULL COMMENT "user ID",
`name` varchar(65533) NOT NULL COMMENT "user name",
`score` int(11) NOT NULL COMMENT "user score"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);注意
バージョン v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際に、バケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細情報については、バケット数の設定を参照してください。
b.
table1にレコードを挿入します。INSERT INTO table1 VALUES
(101, 'Lily',80);
データのロード
example1.csv の id が 101 のレコードを table1 に更新し、example1.csv の id が 102 のレコードを table1 に挿入するロードジョブを実行します。
-
Stream Load ジョブを実行します。
-
__opフィールドを含めたくない場合、次のコマンドを実行します。curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label1" \
-H "column_separator:," \
-T example1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load -
__opフィールドを含めたい場合、次のコマンドを実行します。curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label2" \
-H "column_separator:," \
-H "columns:__op ='upsert'" \
-T example1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load
-
-
Broker Load ジョブを実行します。
-
__opフィールドを含めたくない場合、次のコマンドを実行します。LOAD LABEL test_db.label1
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
into table table1
columns terminated by ","
format as "csv"
)
WITH BROKER; -
__opフィールドを含めたい場合、次のコマンドを実行します。LOAD LABEL test_db.label2
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
into table table1
columns terminated by ","
format as "csv"
set (__op = 'upsert')
)
WITH BROKER;
-
-
Routine Load ジョブを実行します。
-
__opフィールドを含めたくない場合、次のコマンドを実行します。CREATE ROUTINE LOAD test_db.table1 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS (id, name, score)
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test1",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
); -
__opフィールドを含めたい場合、次のコマンドを実行します。CREATE ROUTINE LOAD test_db.table1 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS (id, name, score, __op ='upsert')
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test1",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);
-
データのクエリ
ロードが完了したら、table1 のデータをクエリして、ロードが成功したことを確認します。
SELECT * FROM table1;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 101 | Lily | 100 |
| 102 | Rose | 100 |
+------+------+-------+
2 rows in set (0.02 sec)
上記のクエリ結果に示されているように、example1.csv の id が 101 のレコードは table1 に更新され、example1.csv の id が 102 のレコードは table1 に挿入されました。
DELETE
ロードしたいデータファイルが DELETE 操作のみを含む場合、__op フィールドを追加し、操作の種類を DELETE として指定する必要があります。
データ例
-
データファイルを準備します。
a. ローカルファイルシステムに
example2.csvという名前の CSV ファイルを作成します。このファイルは、ユーザー ID、ユーザー名、ユーザースコアを順に表す 3 つの列で構成されています。101,Jack,100b.
example2.csvのデータを Kafka クラスターのtopic2に公開します。 -
StarRocks テーブルを準備します。
a. StarRocks テーブル
test_dbにtable2という名前の主キーテーブルを作成します。このテーブルは、id、name、scoreの 3 つの列で構成されており、idが主キーです。CREATE TABLE `table2`
(
`id` int(11) NOT NULL COMMENT "user ID",
`name` varchar(65533) NOT NULL COMMENT "user name",
`score` int(11) NOT NULL COMMENT "user score"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`);注意
バージョン v2.5.7 以降、StarRocks はテーブルを作成する際やパーティションを追加する際に、バケット数 (BUCKETS) を自動的に設定できます。バケット数を手動で設定する必要はありません。詳細情報については、バケット数の設定を参照してください。
b.
table2に 2 つのレコードを挿入します。INSERT INTO table2 VALUES
(101, 'Jack', 100),
(102, 'Bob', 90);
データのロード
example2.csv の id が 101 のレコードを table2 から削除するロードジョブを実行します。
-
Stream Load ジョブを実行します。
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "label:label3" \
-H "column_separator:," \
-H "columns:__op='delete'" \
-T example2.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table2/_stream_load -
Broker Load ジョブを実行します。
LOAD LABEL test_db.label3
(
data infile("hdfs://<hdfs_host>:<hdfs_port>/example2.csv")
into table table2
columns terminated by ","
format as "csv"
set (__op = 'delete')
)
WITH BROKER; -
Routine Load ジョブを実行します。
CREATE ROUTINE LOAD test_db.table2 ON table2
COLUMNS(id, name, score, __op = 'delete')
PROPERTIES
(
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows"= "250000",
"max_error_number" = "1000"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "test2",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);
データのクエリ
ロードが完了したら、table2 のデータをクエリして、ロードが成功したことを確認します。
SELECT * FROM table2;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 102 | Bob | 90 |
+------+------+-------+
1 row in set (0.00 sec)