メインコンテンツまでスキップ
バージョン: 3.1

AWS S3 からのデータロード

StarRocks は、AWS S3 からデータをロードするために次のオプションを提供しています。

これらのオプションにはそれぞれの利点があり、以下のセクションで詳しく説明します。

ほとんどの場合、使用が非常に簡単な INSERT+FILES() メソッドをお勧めします。

ただし、INSERT+FILES() メソッドは現在、Parquet と ORC ファイル形式のみをサポートしています。したがって、CSV などの他のファイル形式のデータをロードする必要がある場合や、データロード中に DELETE などのデータ変更を行う必要がある場合は、Broker Load を利用できます。

始める前に

ソースデータの準備

StarRocks にロードしたいソースデータが S3 バケットに適切に保存されていることを確認してください。また、データとデータベースの場所を考慮することもお勧めします。バケットと StarRocks クラスターが同じリージョンにある場合、データ転送コストは大幅に低くなります。

このトピックでは、S3 バケットにあるサンプルデータセット s3://starrocks-examples/user_behavior_ten_million_rows.parquet を提供します。このオブジェクトは、AWS 認証済みユーザーであれば誰でも読み取れるため、有効な資格情報でアクセスできます。

権限の確認

StarRocks テーブルにデータを ロード できるのは、これらの StarRocks テーブルに対して INSERT 権限を持つユーザーのみです。INSERT 権限を持っていない場合は、 GRANT に記載されている手順に従って、StarRocks クラスターに接続するために使用するユーザーに INSERT 権限を付与してください。

認証情報の収集

このトピックの例では、IAM ユーザー認証を使用します。AWS S3 からデータを読み取る権限を持っていることを確認するために、IAM ユーザー認証の準備 を読み、適切な IAM ポリシー を設定した IAM ユーザーを作成する手順に従うことをお勧めします。

要するに、IAM ユーザー認証を実践する場合、次の AWS リソースに関する情報を収集する必要があります。

  • データを保存する S3 バケット
  • バケット内の特定のオブジェクトにアクセスする場合の S3 オブジェクトキー(オブジェクト名)。オブジェクトキーには、S3 オブジェクトがサブフォルダに保存されている場合、プレフィックスを含めることができます。
  • S3 バケットが属する AWS リージョン
  • アクセス資格情報として使用されるアクセスキーとシークレットキー

利用可能なすべての認証方法については、AWS リソースへの認証 を参照してください。

INSERT+FILES() の使用

この方法は v3.1 以降で利用可能で、現在は Parquet と ORC ファイル形式のみをサポートしています。

INSERT+FILES() の利点

FILES() は、指定したパス関連のプロパティに基づいてクラウドストレージに保存されたファイルを読み取り、ファイル内のデータのテーブルスキーマを推測し、ファイルからデータ行としてデータを返すことができます。

FILES() を使用すると、次のことが可能です。

  • SELECT を使用して S3 から直接データをクエリする。
  • CREATE TABLE AS SELECT (CTAS) を使用してテーブルを作成し、ロードする。
  • INSERT を使用して既存のテーブルにデータをロードする。

典型的な例

SELECT を使用して S3 から直接クエリする

SELECT+FILES() を使用して S3 から直接クエリすることで、テーブルを作成する前にデータセットの内容をプレビューできます。例えば:

  • データを保存せずにデータセットをプレビューする。
  • 最小値と最大値をクエリし、使用するデータ型を決定する。
  • NULL 値を確認する。

次の例は、サンプルデータセット s3://starrocks-examples/user_behavior_ten_million_rows.parquet をクエリします。

SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user_behavior_ten_million_rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
)
LIMIT 3;

NOTE

上記のコマンドで AAABBB をあなたの資格情報に置き換えてください。このオブジェクトは、AWS 認証済みユーザーであれば有効な aws.s3.access_keyaws.s3.secret_key を使用して読み取ることができます。

システムは次のクエリ結果を返します。

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 543711 | 829192 | 2355072 | pv | 2017-11-27 08:22:37 |
| 543711 | 2056618 | 3645362 | pv | 2017-11-27 10:16:46 |
| 543711 | 1165492 | 3645362 | pv | 2017-11-27 10:17:00 |
+--------+---------+------------+--------------+---------------------+

NOTE

上記で返された列名は Parquet ファイルによって提供されています。

CTAS を使用してテーブルを作成しロードする

これは前の例の続きです。前のクエリは CREATE TABLE AS SELECT (CTAS) でラップされ、スキーマ推測を使用してテーブル作成を自動化します。これは、StarRocks がテーブルスキーマを推測し、必要なテーブルを作成し、データをテーブルにロードすることを意味します。Parquet ファイルを使用する場合、Parquet 形式には列名が含まれているため、FILES() テーブル関数を使用してテーブルを作成する際に列名と型を指定する必要はありません。

NOTE

スキーマ推測を使用する場合の CREATE TABLE の構文では、レプリカの数を設定することはできませんので、テーブルを作成する前に設定してください。以下の例は、3 つのレプリカを持つシステムの例です:

ADMIN SET FRONTEND CONFIG ('default_replication_num' = "3");

データベースを作成し、切り替えます:

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

CTAS を使用してテーブルを作成し、サンプルデータセット s3://starrocks-examples/user_behavior_ten_million_rows.parquet のデータをテーブルにロードします:

CREATE TABLE user_behavior_inferred AS
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user_behavior_ten_million_rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);

NOTE

上記のコマンドで AAABBB をあなたの資格情報に置き換えてください。このオブジェクトは、AWS 認証済みユーザーであれば有効な aws.s3.access_keyaws.s3.secret_key を使用して読み取ることができます。

テーブルを作成した後、DESCRIBE を使用してそのスキーマを表示できます:

DESCRIBE user_behavior_inferred;

システムは次のクエリ結果を返します。

+--------------+------------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+------------------+------+-------+---------+-------+
| UserID | bigint | YES | true | NULL | |
| ItemID | bigint | YES | true | NULL | |
| CategoryID | bigint | YES | true | NULL | |
| BehaviorType | varchar(1048576) | YES | false | NULL | |
| Timestamp | varchar(1048576) | YES | false | NULL | |
+--------------+------------------+------+-------+---------+-------+

推測されたスキーマと手動で作成されたスキーマを比較します:

  • データ型
  • NULL 許可
  • キーフィールド

宛先テーブルのスキーマをより良く制御し、クエリパフォーマンスを向上させるために、本番環境では手動でテーブルスキーマを指定することをお勧めします。

テーブルをクエリして、データがロードされたことを確認します。例:

SELECT * from user_behavior_inferred LIMIT 3;

次のクエリ結果が返され、データが正常にロードされたことを示しています。

+--------+--------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+--------+------------+--------------+---------------------+
| 58 | 158350 | 2355072 | pv | 2017-11-27 13:06:51 |
| 58 | 158590 | 3194735 | pv | 2017-11-27 02:21:04 |
| 58 | 215073 | 3002561 | pv | 2017-11-30 10:55:42 |
+--------+--------+------------+--------------+---------------------+

INSERT を使用して既存のテーブルにロードする

挿入するテーブルをカスタマイズしたい場合があります。例えば:

  • 列のデータ型、NULL 許可設定、またはデフォルト値
  • キーの種類と列
  • データのパーティショニングとバケッティング

NOTE

最も効率的なテーブル構造を作成するには、データの使用方法と列の内容に関する知識が必要です。このトピックではテーブル設計については扱いません。テーブル設計に関する情報については、Table types を参照してください。

この例では、テーブルがどのようにクエリされるか、および Parquet ファイル内のデータに関する知識に基づいてテーブルを作成しています。Parquet ファイル内のデータに関する知識は、S3 でファイルを直接クエリすることで得られます。

  • S3 のデータセットをクエリした結果、Timestamp 列が datetime データ型に一致するデータを含んでいることが示されたため、以下の DDL で列型を指定しています。
  • S3 のデータをクエリすることで、データセットに NULL 値がないことがわかるため、DDL ではどの列も NULL 許可として設定していません。
  • 予想されるクエリタイプに基づいて、ソートキーとバケッティング列は UserID 列に設定されています。このデータに対するあなたのユースケースは異なるかもしれないので、ソートキーとして ItemID を使用することを決定するかもしれません。

データベースを作成し、切り替えます:

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

手動でテーブルを作成します(AWS S3 からロードしたい Parquet ファイルと同じスキーマを持つことをお勧めします):

CREATE TABLE user_behavior_declared
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);

テーブルを作成した後、INSERT INTO SELECT FROM FILES() を使用してロードできます:

INSERT INTO user_behavior_declared
SELECT * FROM FILES
(
"path" = "s3://starrocks-examples/user_behavior_ten_million_rows.parquet",
"format" = "parquet",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
);

NOTE

上記のコマンドで AAABBB をあなたの資格情報に置き換えてください。このオブジェクトは、AWS 認証済みユーザーであれば有効な aws.s3.access_keyaws.s3.secret_key を使用して読み取ることができます。

ロードが完了した後、テーブルをクエリしてデータがロードされたことを確認します。例:

SELECT * from user_behavior_declared LIMIT 3;

次のクエリ結果が返され、データが正常にロードされたことを示しています。

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 58 | 4309692 | 1165503 | pv | 2017-11-25 14:06:52 |
| 58 | 181489 | 1165503 | pv | 2017-11-25 14:07:22 |
| 58 | 3722956 | 1165503 | pv | 2017-11-25 14:09:28 |
+--------+---------+------------+--------------+---------------------+

ロードの進捗を確認する

information_schema.loads ビューから INSERT ジョブの進捗をクエリできます。この機能は v3.1 以降でサポートされています。例:

SELECT * FROM information_schema.loads ORDER BY JOB_ID DESC;

複数のロードジョブを送信した場合、ジョブに関連付けられた LABEL でフィルタリングできます。例:

SELECT * FROM information_schema.loads WHERE LABEL = 'insert_e3b882f5-7eb3-11ee-ae77-00163e267b60' \G
*************************** 1. row ***************************
JOB_ID: 10243
LABEL: insert_e3b882f5-7eb3-11ee-ae77-00163e267b60
DATABASE_NAME: mydatabase
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: INSERT
PRIORITY: NORMAL
SCAN_ROWS: 10000000
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 10000000
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):300; max_filter_ratio:0.0
CREATE_TIME: 2023-11-09 11:56:01
ETL_START_TIME: 2023-11-09 11:56:01
ETL_FINISH_TIME: 2023-11-09 11:56:01
LOAD_START_TIME: 2023-11-09 11:56:01
LOAD_FINISH_TIME: 2023-11-09 11:56:44
JOB_DETAILS: {"All backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[10142]},"FileNumber":0,"FileSize":0,"InternalTableLoadBytes":311710786,"InternalTableLoadRows":10000000,"ScanBytes":581574034,"ScanRows":10000000,"TaskNumber":1,"Unfinished backends":{"e3b882f5-7eb3-11ee-ae77-00163e267b60":[]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL

loads ビューで提供されるフィールドの情報については、Information Schema を参照してください。

NOTE

INSERT は同期コマンドです。INSERT ジョブがまだ実行中の場合、その実行ステータスを確認するには別のセッションを開く必要があります。

Broker Load の使用

非同期の Broker Load プロセスは、S3 への接続を確立し、データを取得し、StarRocks にデータを保存する処理を行います。

この方法は、Parquet、ORC、および CSV ファイル形式をサポートしています。

Broker Load の利点

  • Broker Load はバックグラウンドで実行され、クライアントはジョブが続行するために接続を維持する必要がありません。
  • Broker Load は長時間実行されるジョブに適しており、デフォルトのタイムアウトは 4 時間です。
  • Parquet と ORC ファイル形式に加えて、Broker Load は CSV ファイルもサポートしています。

データフロー

Workflow of Broker Load

  1. ユーザーがロードジョブを作成します。
  2. フロントエンド (FE) がクエリプランを作成し、そのプランをバックエンドノード (BEs) またはコンピュートノード (CNs) に配布します。
  3. BEs または CNs がソースからデータを取得し、StarRocks にデータをロードします。

典型的な例

テーブルを作成し、S3 からサンプルデータセット s3://starrocks-examples/user_behavior_ten_million_rows.parquet を取得するロードプロセスを開始し、データロードの進捗と成功を確認します。

データベースとテーブルの作成

データベースを作成し、切り替えます:

CREATE DATABASE IF NOT EXISTS mydatabase;
USE mydatabase;

手動でテーブルを作成します(AWS S3 からロードしたい Parquet ファイルと同じスキーマを持つことをお勧めします):

CREATE TABLE user_behavior
(
UserID int(11),
ItemID int(11),
CategoryID int(11),
BehaviorType varchar(65533),
Timestamp datetime
)
ENGINE = OLAP
DUPLICATE KEY(UserID)
DISTRIBUTED BY HASH(UserID);

Broker Load の開始

次のコマンドを実行して、サンプルデータセット s3://starrocks-examples/user_behavior_ten_million_rows.parquet から user_behavior テーブルにデータをロードする Broker Load ジョブを開始します:

LOAD LABEL user_behavior
(
DATA INFILE("s3://starrocks-examples/user_behavior_ten_million_rows.parquet")
INTO TABLE user_behavior
FORMAT AS "parquet"
)
WITH BROKER
(
"aws.s3.enable_ssl" = "true",
"aws.s3.use_instance_profile" = "false",
"aws.s3.region" = "us-east-1",
"aws.s3.access_key" = "AAAAAAAAAAAAAAAAAAAA",
"aws.s3.secret_key" = "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
)
PROPERTIES
(
"timeout" = "72000"
);

NOTE

上記のコマンドで AAABBB をあなたの資格情報に置き換えてください。このオブジェクトは、AWS 認証済みユーザーであれば有効な aws.s3.access_keyaws.s3.secret_key を使用して読み取ることができます。

このジョブには 4 つの主要なセクションがあります:

  • LABEL: ロードジョブの状態をクエリする際に使用される文字列。
  • LOAD 宣言: ソース URI、ソースデータ形式、および宛先テーブル名。
  • BROKER: ソースの接続詳細。
  • PROPERTIES: タイムアウト値およびロードジョブに適用するその他のプロパティ。

詳細な構文とパラメータの説明については、BROKER LOAD を参照してください。

ロードの進捗を確認する

information_schema.loads ビューから Broker Load ジョブの進捗をクエリできます。この機能は v3.1 以降でサポートされています。

SELECT * FROM information_schema.loads;

loads ビューで提供されるフィールドの情報については、Information Schema を参照してください。

複数のロードジョブを送信した場合、ジョブに関連付けられた LABEL でフィルタリングできます。例:

SELECT * FROM information_schema.loads WHERE LABEL = 'user_behavior';

以下の出力では、ロードジョブ user_behavior に対して 2 つのエントリがあります:

  • 最初のレコードは CANCELLED 状態を示しています。ERROR_MSG までスクロールすると、listPath failed によりジョブが失敗したことがわかります。
  • 2 番目のレコードは FINISHED 状態を示しており、ジョブが成功したことを意味します。
JOB_ID|LABEL                                      |DATABASE_NAME|STATE    |PROGRESS           |TYPE  |PRIORITY|SCAN_ROWS|FILTERED_ROWS|UNSELECTED_ROWS|SINK_ROWS|ETL_INFO|TASK_INFO                                           |CREATE_TIME        |ETL_START_TIME     |ETL_FINISH_TIME    |LOAD_START_TIME    |LOAD_FINISH_TIME   |JOB_DETAILS                                                                                                                                                                                                                                                    |ERROR_MSG                             |TRACKING_URL|TRACKING_SQL|REJECTED_RECORD_PATH|
------+-------------------------------------------+-------------+---------+-------------------+------+--------+---------+-------------+---------------+---------+--------+----------------------------------------------------+-------------------+-------------------+-------------------+-------------------+-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------+------------+------------+--------------------+
10121|user_behavior |mydatabase |CANCELLED|ETL:N/A; LOAD:N/A |BROKER|NORMAL | 0| 0| 0| 0| |resource:N/A; timeout(s):72000; max_filter_ratio:0.0|2023-08-10 14:59:30| | | |2023-08-10 14:59:34|{"All backends":{},"FileNumber":0,"FileSize":0,"InternalTableLoadBytes":0,"InternalTableLoadRows":0,"ScanBytes":0,"ScanRows":0,"TaskNumber":0,"Unfinished backends":{}} |type:ETL_RUN_FAIL; msg:listPath failed| | | |
10106|user_behavior |mydatabase |FINISHED |ETL:100%; LOAD:100%|BROKER|NORMAL | 86953525| 0| 0| 86953525| |resource:N/A; timeout(s):72000; max_filter_ratio:0.0|2023-08-10 14:50:15|2023-08-10 14:50:19|2023-08-10 14:50:19|2023-08-10 14:50:19|2023-08-10 14:55:10|{"All backends":{"a5fe5e1d-d7d0-4826-ba99-c7348f9a5f2f":[10004]},"FileNumber":1,"FileSize":1225637388,"InternalTableLoadBytes":2710603082,"InternalTableLoadRows":86953525,"ScanBytes":1225637388,"ScanRows":86953525,"TaskNumber":1,"Unfinished backends":{"a5| | | | |

ロードジョブが完了したことを確認した後、宛先テーブルのサブセットを確認してデータが正常にロードされたかどうかを確認できます。例:

SELECT * from user_behavior LIMIT 3;

次のクエリ結果が返され、データが正常にロードされたことを示しています。

+--------+---------+------------+--------------+---------------------+
| UserID | ItemID | CategoryID | BehaviorType | Timestamp |
+--------+---------+------------+--------------+---------------------+
| 142 | 2869980 | 2939262 | pv | 2017-11-25 03:43:22 |
| 142 | 2522236 | 1669167 | pv | 2017-11-25 15:14:12 |
| 142 | 3031639 | 3607361 | pv | 2017-11-25 15:19:25 |
+--------+---------+------------+--------------+---------------------+