Apache Spark Load
Spark Load を使用してデータを一括ロードする
このロードは、外部の Apache Spark™ リソースを使用してインポートデータを事前処理し、インポートのパフォーマンスを向上させ、計算リソースを節約します。主に 初期移行 や 大規模データインポート に使用され、StarRocks へのデータ量は TB レベルまで対応します。
Spark load は 非同期 のインポート方法であり、ユーザーは MySQL プロトコルを介して Spark タイプのインポートジョブを作成し、SHOW LOAD を使用してインポート結果を確認します。
注意
- StarRocks テーブルに対して INSERT 権限を持つユーザーのみがこのテーブルにデータをロードできます。GRANT に従って必要な権限を付与でき ます。
- Spark Load は、主キーテーブルにデータをロードするためには使用できません。
用語の説明
- Spark ETL: インポートプロセスでのデータの ETL を主に担当し、グローバル辞書の構築(BITMAP タイプ)、パーティショニング、ソート、集計などを含みます。
- Broker: Broker は独立したステートレスプロセスです。ファイルシステムインターフェースをカプセル化し、StarRocks にリモートストレージシステムからファイルを読み取る機能を提供します。
- Global Dictionary: 元の値からエンコードされた値へのデータ構造を保存します。元の値は任意のデータ型であり、エンコードされた値は整数です。グローバル辞書は、正確なカウントディスティンクトが事前計算されるシナリオで主に使用されます。
基本原理
ユーザーは MySQL クライアントを通じて Spark タイプのインポートジョブを提出し、FE がメタデータを記録し、提出結果を返します。
spark load タスクの実行は、以下の主要なフェーズに分かれます。
- ユーザーが spark load ジョブを FE に提出します。
- FE は ETL タスクを Apache Spark™ クラスターに提出して実行するようスケジュー ルします。
- Apache Spark™ クラスターは、グローバル辞書の構築(BITMAP タイプ)、パーティショニング、ソート、集計などを含む ETL タスクを実行します。
- ETL タスクが完了すると、FE は各事前処理済みスライスのデータパスを取得し、関連する BE に Push タスクを実行するようスケジュールします。
- BE は Broker プロセスを通じて HDFS からデータを読み取り、StarRocks ストレージ形式に変換します。
Broker プロセスを使用しない場合、BE は HDFS から直接データを読み取ります。
- FE は有効なバージョンをスケジュールし、インポートジョブを完了します。
以下の図は、spark load の主なフローを示しています。

グローバル辞書
適用シナリオ
現在、StarRocks の BITMAP カラムは Roaringbitmap を使用して実装されており、入力データ型は整数のみです。そのため、インポートプロセスで BITMAP カラムの事前計算を実装する場合、入力データ型を整数に変換する必要があります。
StarRocks の既存のインポートプロセスでは、グローバル辞書のデータ構造は Hive テーブルに基づいて実装されており、元の値からエンコードされた値へのマッピングを保存します。
構築プロセス
- 上流のデータソースからデータを読み取り、一時的な Hive テーブル
hive-tableを生成します。 hive-tableの強調されていないフィールドの値を抽出し、新しい Hive テーブルdistinct-value-tableを生成します。- 元の値とエンコードされた値の 1 列を持つ新しいグローバル辞書テーブル
dict-tableを作成します。 distinct-value-tableとdict-tableの間で左ジョインを行い、ウィンドウ関数を使用してこのセットをエンコードします。最終的に、重複除去されたカラムの元の値とエンコードされた値をdict-tableに書き戻します。dict-tableとhive-tableの間でジョインを行い、hive-tableの元の値を整数のエンコード値に置き換える作業を完了します。hive-tableは次回のデータ事前処理で読み取られ、計算後に StarRocks にインポートされます。
データ事前処理
データ事前処理の基本プロセスは次のとおりです。
- 上流のデータソース(HDFS ファイルまたは Hive テーブル)からデータを読み取ります。
- 読み取ったデータに対してフィールドマッピングと計算を完了し、パーティション情報に基づいて
bucket-idを生成します。 - StarRocks テーブルの Rollup メタデータに基づいて RollupTree を生成します。
- RollupTree を反復処理し、階層的な集計操作を実行します。次の階層の Rollup は、前の階層の Rollup から計算できます。
- 集計計算が完了するたびに、データは
bucket-idに基づいてバケット化され、その後 HDFS に書き込まれます。 - 後続の Broker プロセスは HDFS からファイルを取得し、StarRocks BE ノードにインポートします。
基本操作
ETL クラスターの設定
Apache Spark™ は StarRocks で ETL 作業を行うための外部計算リソースとして使用されます。StarRocks に追加される他の外部リソースとしては、クエリ用の Spark/GPU、外部ストレージ用の HDFS/S3、ETL 用の MapReduce などがあります。したがって、StarRocks で使用されるこれらの外部リソースを管理するために Resource Management を導入します。
Apache Spark™ インポートジョブを提出する前に、ETL タスクを実行するための Apache Spark™ クラスターを設定します。操作の構文は次のとおりです。
-- Apache Spark™ リソースを作成
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
type = spark,
spark_conf_key = spark_conf_value,
working_dir = path,
broker = broker_name,
broker.property_key = property_value
);
-- Apache Spark™ リソースを削除
DROP RESOURCE resource_name;
-- リソースを表示
SHOW RESOURCES
SHOW PROC "/resources";
-- 権限
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identityGRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name;
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identityREVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name;
- リソースの作成
例:
-- yarn クラスター モード
CREATE EXTERNAL RESOURCE "spark0"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.jars" = "xxx.jar,yyy.jar",
"spark.files" = "/tmp/aaa,/tmp/bbb",
"spark.executor.memory" = "1g",
"spark.yarn.queue" = "queue0",
"spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/starrocks",
"broker" = "broker0",
"broker.username" = "user0",
"broker.password" = "password0"
);
-- yarn HA クラスター モード
CREATE EXTERNAL RESOURCE "spark1"
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
"spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
"spark.hadoop.yarn.resourcemanager.hostname.rm1" = "host1",
"spark.hadoop.yarn.resourcemanager.hostname.rm2" = "host2",
"spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
"working_dir" = "hdfs://127.0.0.1:10000/tmp/starrocks",
"broker" = "broker1"
);
resource-name は StarRocks に設定された Apache Spark™ リソースの名前です。
PROPERTIES には Apache Spark™ リソースに関連するパラメータが含まれます。次のとおりです。
注意
Apache Spark™ リソースの詳細な説明については、CREATE RESOURCE を参照してください。
-
Spark 関連パラメータ:
type: リソースタイプ、必須、現在はsparkのみをサポート。spark.master: 必須、現在はyarnのみをサポート。spark.submit.deployMode: Apache Spark™ プログラムのデプロイメントモード、必須、現在はclusterとclientの両方をサポート。spark.hadoop.fs.defaultFS: マスターが yarn の場合に必須。- yarn リソースマネージャーに関連するパラメータ、必須。
- 単一ノードの ResourceManager
spark.hadoop.yarn.resourcemanager.address: 単一ポイントリソースマネージャーのアドレス。 - ResourceManager HA
ResourceManager のホスト名またはアドレスを指定できます。
spark.hadoop.yarn.resourcemanager.ha.enabled: リソースマネージャー HA を有効にし、trueに設定。spark.hadoop.yarn.resourcemanager.ha.rm-ids: リソースマネージャーの論理 ID のリスト。spark.hadoop.yarn.resourcemanager.hostname.rm-id: 各 rm-id に対して、リソースマネージャーに対応するホスト名を指定。spark.hadoop.yarn.resourcemanager.address.rm-id: 各 rm-id に対して、クライアントがジョブを送信するためのhost:portを指定。
- 単一ノードの ResourceManager
-
*working_dir: ETL に使用されるディレクトリ。Apache Spark™ が ETL リソースとして使用される場合に必須。例:hdfs://host:port/tmp/starrocks。 -
Broker 関連パラメータ:
broker: Broker の名前。Apache Spark™ が ETL リソースとして使用される場合に必須。ALTER SYSTEM ADD BROKERコマンドを使用して事前に設定を完了する必要があります。broker.property_key: Broker プロセスが ETL によって生成された中間ファイルを読み取る際に指定する情報(例: 認証情報)。
注意事項:
上記は Broker プロセスを介したロードのパラメータの説明です。Broker プロセスを使用せずにデータをロードする場合、以下に注意してください。
brokerを指定する必要はありません。- ユーザー認証や NameNode ノードの HA を設定する必要がある場合、HDFS クラスターの hdfs-site.xml ファイルでパラメータを設定する必要があります。パラメータの説明については broker_properties を参照してください。また、各 FE の $FE_HOME/conf および各 BE の $BE_HOME/conf に hdfs-site.xml ファイルを移動する必要があります。
注意
HDFS ファイルが特定のユーザーのみアクセス可能な場合、
broker.nameに HDFS ユーザー名を指定し、broker.passwordにユーザーパスワードを指定する必要があります。
- リソースの表示
通常のアカウントは、USAGE-PRIV アクセス権を持つリソースのみを表示できます。root および admin アカウントはすべてのリソースを表示できます。
- リソース権限
リソース権限は GRANT REVOKE を通じて管理され、現在は USAGE-PRIV 権限のみをサポートしています。ユーザーまたはロールに USAGE-PRIV 権限を付与できます。
-- user0 に spark0 リソースへのアクセスを許可
GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";
-- role0 に spark0 リソースへのアクセスを許可
GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0";
-- user0 にすべてのリソースへのアクセスを許可
GRANT USAGE_PRIV ON RESOURCE* TO "user0"@"%";
-- role0 にすべてのリソースへのアクセスを許可
GRANT USAGE_PRIV ON RESOURCE* TO ROLE "role0";
-- user user0 から spark0 リソースの使用権限を取り消す
REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";