メインコンテンツまでスキップ
バージョン: 3.1

BROKER LOAD

説明

StarRocksは、MySQLベースのロード方法であるBroker Loadを提供します。ロードジョブを送信すると、StarRocksは非同期でジョブを実行します。SELECT * FROM information_schema.loadsを使用してジョブの結果をクエリできます。この機能はv3.1以降でサポートされています。背景情報、原則、サポートされているデータファイル形式、単一テーブルロードと複数テーブルロードの実行方法、ジョブ結果の表示方法については、loading overviewを参照してください。

StarRocks テーブルにデータを ロード できるのは、これらの StarRocks テーブルに対して INSERT 権限を持つユーザーのみです。INSERT 権限を持っていない場合は、 GRANT に記載されている手順に従って、StarRocks クラスターに接続するために使用するユーザーに INSERT 権限を付与してください。

構文

LOAD LABEL [<database_name>.]<label_name>
(
data_desc[, data_desc ...]
)
WITH BROKER
(
StorageCredentialParams
)
[PROPERTIES
(
opt_properties
)
]

StarRocksでは、いくつかのリテラルがSQL言語によって予約キーワードとして使用されます。これらのキーワードをSQL文で直接使用しないでください。SQL文でそのようなキーワードを使用したい場合は、バックティック (`) で囲んでください。Keywordsを参照してください。

パラメータ

database_name と label_name

label_nameはロードジョブのラベルを指定します。

database_nameは、宛先テーブルが属するデータベースの名前をオプションで指定します。

各ロードジョブには、データベース全体で一意のラベルがあります。ロードジョブのラベルを使用して、ロードジョブの実行ステータスを表示し、同じデータを繰り返しロードするのを防ぐことができます。ロードジョブがFINISHED状態に入ると、そのラベルは再利用できません。CANCELLED状態に入ったロードジョブのラベルのみが再利用可能です。ほとんどの場合、ロードジョブのラベルは、そのロードジョブを再試行して同じデータをロードするために再利用され、Exactly-Onceセマンティクスを実装します。

ラベルの命名規則については、System limitsを参照してください。

data_desc

ロードするデータのバッチの説明です。各data_descディスクリプタは、データソース、ETL関数、宛先StarRocksテーブル、および宛先パーティションなどの情報を宣言します。

Broker Loadは、一度に複数のデータファイルをロードすることをサポートしています。1つのロードジョブで、複数のdata_descディスクリプタを使用してロードしたい複数のデータファイルを宣言するか、1つのdata_descディスクリプタを使用して、すべてのデータファイルをロードしたいファイルパスを宣言することができます。Broker Loadは、複数のデータファイルをロードする各ロードジョブのトランザクションの原子性も保証できます。原子性とは、1つのロードジョブで複数のデータファイルをロードする際に、すべて成功するか失敗するかのいずれかであることを意味します。いくつかのデータファイルのロードが成功し、他のファイルのロードが失敗することはありません。

data_descは次の構文をサポートしています:

DATA INFILE ("<file_path>"[, "<file_path>" ...])
[NEGATIVE]
INTO TABLE <table_name>
[PARTITION (<partition1_name>[, <partition2_name> ...])]
[TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name> ...])]
[COLUMNS TERMINATED BY "<column_separator>"]
[ROWS TERMINATED BY "<row_separator>"]
[FORMAT AS "CSV | Parquet | ORC"]
[(format_type_options)]
[(column_list)]
[COLUMNS FROM PATH AS (<partition_field_name>[, <partition_field_name> ...])]
[SET <k1=f1(v1)>[, <k2=f2(v2)> ...]]
[WHERE predicate]

data_descには次のパラメータが含まれている必要があります:

  • file_path

    ロードしたい1つ以上のデータファイルの保存パスを指定します。

    このパラメータを1つのデータファイルの保存パスとして指定できます。たとえば、HDFSサーバー上のパス/user/data/tablenameから20210411という名前のデータファイルをロードするために、このパラメータを"hdfs://<hdfs_host>:<hdfs_port>/user/data/tablename/20210411"として指定できます。

    また、ワイルドカード?*[]{}、または^を使用して、複数のデータファイルの保存パスとしてこのパラメータを指定することもできます。Wildcard referenceを参照してください。たとえば、このパラメータを"hdfs://<hdfs_host>:<hdfs_port>/user/data/tablename/*/*"または"hdfs://<hdfs_host>:<hdfs_port>/user/data/tablename/dt=202104*/*"として指定し、HDFSサーバー上のパス/user/data/tablenameのすべてのパーティションまたは202104パーティションのみからデータファイルをロードすることができます。

    注意

    ワイルドカードは中間パスを指定するためにも使用できます。

    前述の例では、hdfs_hostおよびhdfs_portパラメータは次のように説明されています:

    • hdfs_host: HDFSクラスター内のNameNodeホストのIPアドレス。

    • hdfs_port: HDFSクラスター内のNameNodeホストのFSポート。デフォルトのポート番号は9000です。

    注意

    • Broker Loadは、S3またはS3Aプロトコルに従ってAWS S3へのアクセスをサポートしています。したがって、AWS S3からデータをロードする場合、ファイルパスとして渡すS3 URIのプレフィックスにs3://またはs3a://を含めることができます。
    • Broker Loadは、gsプロトコルに従ってのみGoogle GCSへのアクセスをサポートしています。したがって、Google GCSからデータをロードする場合、ファイルパスとして渡すGCS URIのプレフィックスにgs://を含める必要があります。
    • Blob Storageからデータをロードする場合、wasbまたはwasbsプロトコルを使用してデータにアクセスする必要があります:
      • ストレージアカウントがHTTP経由でのアクセスを許可する場合、wasbプロトコルを使用し、ファイルパスをwasb://<container_name>@<storage_account_name>.blob.core.windows.net/<path>/<file_name>/*として記述します。
      • ストレージアカウントがHTTPS経由でのアクセスを許可する場合、wasbsプロトコルを使用し、ファイルパスをwasbs://<container_name>@<storage_account_name>.blob.core.windows.net/<path>/<file_name>/*として記述します。
    • Data Lake Storage Gen2からデータをロードする場合、abfsまたはabfssプロトコルを使用してデータにアクセスする必要があります:
      • ストレージアカウントがHTTP経由でのアクセスを許可する場合、abfsプロトコルを使用し、ファイルパスをabfs://<container_name>@<storage_account_name>.dfs.core.windows.net/<file_name>として記述します。
      • ストレージアカウントがHTTPS経由でのアクセスを許可する場合、abfssプロトコルを使用し、ファイルパスをabfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<file_name>として記述します。
    • Data Lake Storage Gen1からデータをロードする場合、adlプロトコルを使用してデータにアクセスし、ファイルパスをadl://<data_lake_storage_gen1_name>.azuredatalakestore.net/<path>/<file_name>として記述します。
  • INTO TABLE

    宛先StarRocksテーブルの名前を指定します。

data_descには次のパラメータもオプションで含めることができます:

  • NEGATIVE

    特定のデータバッチのロードを取り消します。これを達成するには、NEGATIVEキーワードを指定して同じデータバッチをロードする必要があります。

    注意

    このパラメータは、StarRocksテーブルが集計テーブルであり、そのすべての値列がsum関数によって計算される場合にのみ有効です。

  • PARTITION

    データをロードしたいパーティションを指定します。デフォルトでは、このパラメータを指定しない場合、ソースデータはStarRocksテーブルのすべてのパーティションにロードされます。

  • TEMPORARY PARTITION

    データをロードしたい一時パーティションの名前を指定します。複数の一時パーティションを指定することができ、カンマ(,)で区切る必要があります。

  • COLUMNS TERMINATED BY

    データファイルで使用される列区切り文字を指定します。デフォルトでは、このパラメータを指定しない場合、このパラメータは\t(タブ)にデフォルト設定されます。このパラメータを使用して指定した列区切り文字は、実際にデータファイルで使用されている列区切り文字と同じである必要があります。そうでない場合、データ品質が不十分なためロードジョブが失敗し、そのStateCANCELLEDになります。

    Broker LoadジョブはMySQLプロトコルに従って送信されます。StarRocksとMySQLはどちらもロードリクエストで文字をエスケープします。したがって、列区切り文字がタブのような不可視文字である場合、列区切り文字の前にバックスラッシュ(\)を追加する必要があります。たとえば、列区切り文字が\tの場合は\\tと入力する必要があり、列区切り文字が\nの場合は\\nと入力する必要があります。Apache Hive™ファイルは\x01を列区切り文字として使用するため、データファイルがHiveからのものである場合は\\x01と入力する必要があります。

    注意

    • CSVデータの場合、カンマ(,)、タブ、またはパイプ(|)のようなUTF-8文字列をテキストデリミタとして使用できますが、その長さは50バイトを超えてはなりません。
    • Null値は\Nを使用して示されます。たとえば、データファイルが3つの列で構成され、そのデータファイルのレコードが最初と3番目の列にデータを保持し、2番目の列にデータがない場合、この状況では2番目の列にNull値を示すために\Nを使用する必要があります。これは、レコードをa,\N,bとしてコンパイルする必要があることを意味し、a,,bではありません。a,,bは、レコードの2番目の列が空の文字列を保持していることを示します。
  • ROWS TERMINATED BY

    データファイルで使用される行区切り文字を指定します。デフォルトでは、このパラメータを指定しない場合、このパラメータは\n(改行)にデフォルト設定されます。このパラメータを使用して指定した行区切り文字は、実際にデータファイルで使用されている行区切り文字と同じである必要があります。そうでない場合、データ品質が不十分なためロードジョブが失敗し、そのStateCANCELLEDになります。このパラメータはv2.5.4以降でサポートされています。

    行区切り文字の使用に関する注意事項については、前述のCOLUMNS TERMINATED BYパラメータの使用に関する注意事項を参照してください。

  • FORMAT AS

    データファイルの形式を指定します。有効な値:CSVParquet、およびORC。デフォルトでは、このパラメータを指定しない場合、StarRocksはfile_pathパラメータで指定されたファイル名拡張子**.csv**、.parquet、または**.orc**に基づいてデータファイル形式を決定します。

  • format_type_options

    FORMAT ASCSVに設定されている場合のCSV形式オプションを指定します。構文:

    (
    key = value
    key = value
    ...
    )

    注意

    format_type_optionsはv3.0以降でサポートされています。

    次の表はオプションを説明しています。

ParameterDescription
skip_headerCSV形式のデータファイルの最初の行をスキップするかどうかを指定します。タイプ: INTEGER。デフォルト値: 0
一部のCSV形式のデータファイルでは、最初の行はメタデータ(列名や列データ型など)を定義するために使用されます。skip_headerパラメータを設定することで、StarRocksがデータロード中にデータファイルの最初の行をスキップできるようになります。たとえば、このパラメータを1に設定すると、StarRocksはデータロード中にデータファイルの最初の行をスキップします。
データファイルの最初の行は、ロード文で指定した行区切り文字を使用して区切られている必要があります。
trim_spaceCSV形式のデータファイルから列区切り文字の前後のスペースを削除するかどうかを指定します。タイプ: BOOLEAN。デフォルト値: false
一部のデータベースでは、データをCSV形式のデータファイルとしてエクスポートする際に列区切り文字にスペースが追加されます。これらのスペースは、先行スペースまたは後続スペースと呼ばれます。trim_spaceパラメータを設定することで、StarRocksがデータロード中にこれらの不要なスペースを削除できるようになります。
StarRocksは、encloseで指定された文字で囲まれたフィールド内のスペース(先行スペースおよび後続スペースを含む)を削除しません。たとえば、次のフィールド値は、パイプ(|)を列区切り文字として使用し、二重引用符(")をencloseで指定された文字として使用しています:
|"Love StarRocks"|
|" Love StarRocks "|
| "Love StarRocks" |
trim_spacetrueに設定すると、StarRocksは前述のフィールド値を次のように処理します:
|"Love StarRocks"|
|" Love StarRocks "|
|"Love StarRocks"|
encloseCSV形式のデータファイルのフィールド値をRFC4180に従ってラップするために使用される文字を指定します。タイプ: 単一バイト文字。デフォルト値: NONE。最も一般的な文字は単一引用符(')および二重引用符(")です。
encloseで指定された文字で囲まれたすべての特殊文字(行区切り文字や列区切り文字を含む)は通常の記号と見なされます。StarRocksは、encloseで指定された文字として任意の単一バイト文字を指定できるため、RFC4180よりも多くのことができます。
フィールド値にencloseで指定された文字が含まれている場合、同じ文字を使用してそのencloseで指定された文字をエスケープできます。たとえば、enclose"に設定し、フィールド値がa "quoted" cの場合、このフィールド値をデータファイルに"a ""quoted"" c"として入力できます。
escape行区切り文字、列区切り文字、エスケープ文字、encloseで指定された文字などのさまざまな特殊文字をエスケープするために使用される文字を指定します。これらの文字は、StarRocksによって通常の文字と見なされ、フィールド値の一部として解析されます。タイプ: 単一バイト文字。デフォルト値: NONE。最も一般的な文字はスラッシュ(\)であり、SQL文では二重スラッシュ(\\)として記述する必要があります。
注意
escapeで指定された文字は、各ペアのencloseで指定された文字の内側と外側の両方に適用されます。
次の2つの例があります:
  • enclose"に設定し、escape\に設定すると、StarRocksは"say \"Hello world\""say "Hello world"に解析します。
  • 列区切り文字がカンマ(,)であると仮定します。escape\に設定すると、StarRocksはa, b\, cを2つの別々のフィールド値に解析します:ab, c
  • column_list

    データファイルとStarRocksテーブル間の列マッピングを指定します。構文: (<column_name>[, <column_name> ...])column_listで宣言された列は、名前によってStarRocksテーブルの列にマッピングされます。

    注意

    データファイルの列がStarRocksテーブルの列に順番にマッピングされている場合、column_listを指定する必要はありません。

    データファイルの特定の列をスキップしたい場合、その列を一時的にStarRocksテーブルの列名とは異なる名前にするだけで済みます。詳細については、loading overviewを参照してください。

  • COLUMNS FROM PATH AS

    指定したファイルパスから1つ以上のパーティションフィールドに関する情報を抽出します。このパラメータは、ファイルパスにパーティションフィールドが含まれている場合にのみ有効です。

    たとえば、データファイルが/path/col_name=col_value/file1というパスに保存されており、col_nameがパーティションフィールドであり、StarRocksテーブルの列にマッピングできる場合、このパラメータをcol_nameとして指定できます。このようにして、StarRocksはパスからcol_value値を抽出し、それらをcol_nameがマッピングされているStarRocksテーブルの列にロードします。

    注意

    このパラメータは、HDFSからデータをロードする場合にのみ利用可能です。

  • SET

    データファイルの列を変換するために使用したい1つ以上の関数を指定します。例:

    • StarRocksテーブルは、順番にcol1col2col3の3つの列で構成されています。データファイルは4つの列で構成されており、そのうち最初の2つの列はStarRocksテーブルのcol1col2に順番にマッピングされ、最後の2つの列の合計はStarRocksテーブルのcol3にマッピングされます。この場合、column_list(col1,col2,tmp_col3,tmp_col4)として指定し、SET句で(col3=tmp_col3+tmp_col4)を指定してデータ変換を実装する必要があります。
    • StarRocksテーブルは、順番にyearmonthdayの3つの列で構成されています。データファイルは、yyyy-mm-dd hh:mm:ss形式の日付と時刻の値を含む1つの列のみで構成されています。この場合、column_list(tmp_time)として指定し、SET句で(year = year(tmp_time), month=month(tmp_time), day=day(tmp_time))を指定してデータ変換を実装する必要があります。
  • WHERE

    ソースデータをフィルタリングするための条件を指定します。StarRocksは、WHERE句で指定されたフィルタ条件を満たすソースデータのみをロードします。

WITH BROKER

v2.3以前では、使用したいブローカーを指定するためにWITH BROKER "<broker_name>"を入力します。v2.5以降では、ブローカーを指定する必要はありませんが、WITH BROKERキーワードを保持する必要があります。

注意

v2.4以前では、StarRocksはBroker Loadジョブを実行する際に、StarRocksクラスターと外部ストレージシステム間の接続を確立するためにブローカーに依存していました。これは「ブローカーを使用したロード」と呼ばれます。ブローカーは、ファイルシステムインターフェースと統合された独立したステートレスサービスです。ブローカーを使用すると、StarRocksは外部ストレージシステムに保存されているデータファイルにアクセスして読み取ることができ、独自のコンピューティングリソースを使用してこれらのデータファイルのデータを事前処理してロードできます。

v2.5以降、StarRocksはブローカーへの依存を排除し、「ブローカーを使用しないロード」を実装しました。

StarRocksクラスターにデプロイされたブローカーを確認するには、SHOW BROKER文を使用できます。ブローカーがデプロイされていない場合は、Deploy a brokerに記載された手順に従ってブローカーをデプロイできます。

StorageCredentialParams

StarRocksがストレージシステムにアクセスするために使用する認証情報。

HDFS

オープンソースのHDFSは、シンプル認証とKerberos認証の2つの認証方法をサポートしています。Broker Loadはデフォルトでシンプル認証を使用します。オープンソースのHDFSは、NameNodeのHAメカニズムの設定もサポートしています。ストレージシステムとしてオープンソースのHDFSを選択した場合、認証設定とHA設定を次のように指定できます:

  • 認証設定

    • シンプル認証を使用する場合、StorageCredentialParamsを次のように設定します:

      "hadoop.security.authentication" = "simple",
      "username" = "<hdfs_username>",
      "password" = "<hdfs_password>"

      StorageCredentialParamsのパラメータは次のように説明されています。

      パラメータ説明
      hadoop.security.authentication認証方法。有効な値:simpleおよびkerberos。デフォルト値:simplesimpleはシンプル認証を表し、認証なしを意味し、kerberosはKerberos認証を表します。
      usernameHDFSクラスターのNameNodeにアクセスするために使用するアカウントのユーザー名。
      passwordHDFSクラスターのNameNodeにアクセスするために使用するアカウントのパスワード。
    • Kerberos認証を使用する場合、StorageCredentialParamsを次のように設定します:

      "hadoop.security.authentication" = "kerberos",
      "kerberos_principal" = "nn/zelda1@ZELDA.COM",
      "kerberos_keytab" = "/keytab/hive.keytab",
      "kerberos_keytab_content" = "YWFhYWFh"

      StorageCredentialParamsのパラメータは次のように説明されています。

      パラメータ説明
      hadoop.security.authentication認証方法。有効な値:simpleおよびkerberos。デフォルト値:simplesimpleはシンプル認証を表し、認証なしを意味し、kerberosはKerberos認証を表します。
      kerberos_principal認証されるKerberosプリンシパル。各プリンシパルは、HDFSクラスター全体で一意であることを保証するために、次の3つの部分で構成されます:
      • usernameまたはservicename: プリンシパルの名前。
      • instance: HDFSクラスター内で認証されるノードをホストするサーバーの名前。サーバー名は、HDFSクラスターが独立して認証される複数のDataNodeで構成されている場合に、プリンシパルが一意であることを保証するのに役立ちます。
      • realm: レルムの名前。レルム名は大文字でなければなりません。
      例:nn/zelda1@ZELDA.COM
      kerberos_keytabKerberosキータブファイルの保存パス。
      kerberos_keytab_contentKerberosキータブファイルのBase64エンコードされた内容。kerberos_keytabまたはkerberos_keytab_contentのいずれかを指定できます。

      複数のKerberosユーザーを設定した場合、少なくとも1つの独立したbroker groupがデプロイされていることを確認し、ロード文でWITH BROKER "<broker_name>"を入力して使用したいブローカーグループを指定する必要があります。さらに、ブローカーの起動スクリプトファイルstart_broker.shを開き、ファイルの42行目を変更して、ブローカーがkrb5.confファイルを読み取れるようにする必要があります。例:

      export JAVA_OPTS="-Dlog4j2.formatMsgNoLookups=true -Xmx1024m -Dfile.encoding=UTF-8 -Djava.security.krb5.conf=/etc/krb5.conf"

      注意

      • 前述の例では、/etc/krb5.confは実際のkrb5.confファイルの保存パスに置き換えることができます。ブローカーがそのファイルを読み取る権限を持っていることを確認してください。ブローカーグループが複数のブローカーで構成されている場合、各ブローカーノードでstart_broker.shファイルを変更し、変更を有効にするためにブローカーノードを再起動する必要があります。
      • StarRocksクラスターにデプロイされたブローカーを確認するには、SHOW BROKER文を使用できます。
  • HA設定

    HDFSクラスターのNameNodeにHAメカニズムを設定できます。これにより、NameNodeが別のノードに切り替えられた場合でも、StarRocksは自動的に新しいNameNodeを識別できます。これには次のシナリオが含まれます:

    • 1つのKerberosユーザーが設定された単一のHDFSクラスターからデータをロードする場合、ブローカーを使用したロードとブローカーを使用しないロードの両方がサポートされています。

      • ブローカーを使用したロードを実行するには、少なくとも1つの独立したbroker groupがデプロイされていることを確認し、HDFSクラスターを提供するブローカーノードの{deploy}/confパスにhdfs-site.xmlファイルを配置します。StarRocksはブローカーの起動時に{deploy}/confパスを環境変数CLASSPATHに追加し、ブローカーがHDFSクラスターのノード情報を読み取れるようにします。

      • ブローカーを使用しないロードを実行するには、クラスター内のすべてのFE、BE、およびCNノードのデプロイメントディレクトリのconf/core-site.xmlhadoop.security.authentication = kerberosを設定し、kinitコマンドを使用してKerberosアカウントを設定するだけです。

    • 複数のKerberosユーザーが設定された単一のHDFSクラスターからデータをロードする場合、ブローカーを使用したロードのみがサポートされています。少なくとも1つの独立したbroker groupがデプロイされていることを確認し、HDFSクラスターを提供するブローカーノードの{deploy}/confパスにhdfs-site.xmlファイルを配置します。StarRocksはブローカーの起動時に{deploy}/confパスを環境変数CLASSPATHに追加し、ブローカーがHDFSクラスターのノード情報を読み取れるようにします。

    • 複数のHDFSクラスターからデータをロードする場合(1つまたは複数のKerberosユーザーが設定されているかどうかに関係なく)、ブローカーを使用したロードのみがサポートされています。これらのHDFSクラスターのそれぞれに対して少なくとも1つの独立したbroker groupがデプロイされていることを確認し、ブローカーがHDFSクラスターのノード情報を読み取れるようにするために次のいずれかのアクションを実行します:

      • HDFSクラスターを提供するブローカーノードの{deploy}/confパスにhdfs-site.xmlファイルを配置します。StarRocksはブローカーの起動時に{deploy}/confパスを環境変数CLASSPATHに追加し、そのHDFSクラスターのノード情報をブローカーが読み取れるようにします。

      • ジョブ作成時に次のHA設定を追加します:

        "dfs.nameservices" = "ha_cluster",
        "dfs.ha.namenodes.ha_cluster" = "ha_n1,ha_n2",
        "dfs.namenode.rpc-address.ha_cluster.ha_n1" = "<hdfs_host>:<hdfs_port>",
        "dfs.namenode.rpc-address.ha_cluster.ha_n2" = "<hdfs_host>:<hdfs_port>",
        "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"

        HA設定のパラメータは次のように説明されています。

        パラメータ説明
        dfs.nameservicesHDFSクラスターの名前。
        dfs.ha.namenodes.XXXHDFSクラスター内のNameNodeの名前。複数のNameNode名を指定する場合は、カンマ(,)で区切ります。xxxdfs.nameservicesで指定したHDFSクラスター名です。
        dfs.namenode.rpc-address.XXX.NNHDFSクラスター内のNameNodeのRPCアドレス。NNdfs.ha.namenodes.XXXで指定したNameNode名です。
        dfs.client.failover.proxy.providerクライアントが接続するNameNodeのプロバイダー。デフォルト値:org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

    注意

    StarRocksクラスターにデプロイされたブローカーを確認するには、SHOW BROKER文を使用できます。

AWS S3

AWS S3をストレージシステムとして選択した場合、次のいずれかのアクションを実行します:

  • インスタンスプロファイルベースの認証方法を選択するには、StorageCredentialParamsを次のように設定します:

    "aws.s3.use_instance_profile" = "true",
    "aws.s3.region" = "<aws_s3_region>"
  • アサインされたロールベースの認証方法を選択するには、StorageCredentialParamsを次のように設定します:

    "aws.s3.use_instance_profile" = "true",
    "aws.s3.iam_role_arn" = "<iam_role_arn>",
    "aws.s3.region" = "<aws_s3_region>"
  • IAMユーザーベースの認証方法を選択するには、StorageCredentialParamsを次のように設定します:

    "aws.s3.use_instance_profile" = "false",
    "aws.s3.access_key" = "<iam_user_access_key>",
    "aws.s3.secret_key" = "<iam_user_secret_key>",
    "aws.s3.region" = "<aws_s3_region>"

StorageCredentialParamsで設定する必要があるパラメータは次のように説明されています。

パラメータ必須説明
aws.s3.use_instance_profileはい資格情報メソッドのインスタンスプロファイルとアサインされたロールを有効にするかどうかを指定します。有効な値:trueおよびfalse。デフォルト値:false
aws.s3.iam_role_arnいいえAWS S3バケットに対する権限を持つIAMロールのARN。AWS S3へのアクセスの資格情報メソッドとしてアサインされたロールを選択する場合、このパラメータを指定する必要があります。
aws.s3.regionはいAWS S3バケットが存在するリージョン。例:us-west-1
aws.s3.access_keyいいえIAMユーザーのアクセスキー。AWS S3へのアクセスの資格情報メソッドとしてIAMユーザーを選択する場合、このパラメータを指定する必要があります。
aws.s3.secret_keyいいえIAMユーザーのシークレットキー。AWS S3へのアクセスの資格情報メソッドとしてIAMユーザーを選択する場合、このパラメータを指定する必要があります。

AWS S3へのアクセスのための認証方法の選択方法とAWS IAMコンソールでのアクセス制御ポリシーの設定方法については、Authentication parameters for accessing AWS S3を参照してください。

Google GCS

Google GCSをストレージシステムとして選択した場合、次のいずれかのアクションを実行します:

  • VMベースの認証方法を選択するには、StorageCredentialParamsを次のように設定します:

    "gcp.gcs.use_compute_engine_service_account" = "true"

    StorageCredentialParamsで設定する必要があるパラメータは次のように説明されています。

    パラメータデフォルト値値の例説明
    gcp.gcs.use_compute_engine_service_accountfalsetrueCompute Engineにバインドされたサービスアカウントを直接使用するかどうかを指定します。
  • サービスアカウントベースの認証方法を選択するには、StorageCredentialParamsを次のように設定します:

    "gcp.gcs.service_account_email" = "<google_service_account_email>",
    "gcp.gcs.service_account_private_key_id" = "<google_service_private_key_id>",
    "gcp.gcs.service_account_private_key" = "<google_service_private_key>"

    StorageCredentialParamsで設定する必要があるパラメータは次のように説明されています。

    パラメータデフォルト値値の例説明
    gcp.gcs.service_account_email"""user@hello.iam.gserviceaccount.com"サービスアカウントの作成時に生成されたJSONファイルのメールアドレス。
    gcp.gcs.service_account_private_key_id"""61d257bd8479547cb3e04f0b9b6b9ca07af3b7ea"サービスアカウントの作成時に生成されたJSONファイルのプライベートキーID。
    gcp.gcs.service_account_private_key"""-----BEGIN PRIVATE KEY----xxxx-----END PRIVATE KEY-----\n"サービスアカウントの作成時に生成されたJSONファイルのプライベートキー。
  • インパーソネーションベースの認証方法を選択するには、StorageCredentialParamsを次のように設定します:

    • VMインスタンスにサービスアカウントをインパーソネートさせる:

      "gcp.gcs.use_compute_engine_service_account" = "true",
      "gcp.gcs.impersonation_service_account" = "<assumed_google_service_account_email>"

      StorageCredentialParamsで設定する必要があるパラメータは次のように説明されています。

      パラメータデフォルト値値の例説明
      gcp.gcs.use_compute_engine_service_accountfalsetrueCompute Engineにバインドされたサービスアカウントを直接使用するかどうかを指定します。
      gcp.gcs.impersonation_service_account"""hello"インパーソネートしたいサービスアカウント。
    • サービスアカウント(メタサービスアカウントと呼ばれる)に別のサービスアカウント(データサービスアカウントと呼ばれる)をインパーソネートさせる:

      "gcp.gcs.service_account_email" = "<google_service_account_email>",
      "gcp.gcs.service_account_private_key_id" = "<meta_google_service_account_email>",
      "gcp.gcs.service_account_private_key" = "<meta_google_service_account_email>",
      "gcp.gcs.impersonation_service_account" = "<data_google_service_account_email>"

      StorageCredentialParamsで設定する必要があるパラメータは次のように説明されています。

      パラメータデフォルト値値の例説明
      gcp.gcs.service_account_email"""user@hello.iam.gserviceaccount.com"メタサービスアカウントの作成時に生成されたJSONファイルのメールアドレス。
      gcp.gcs.service_account_private_key_id"""61d257bd8479547cb3e04f0b9b6b9ca07af3b7ea"メタサービスアカウントの作成時に生成されたJSONファイルのプライベートキーID。
      gcp.gcs.service_account_private_key"""-----BEGIN PRIVATE KEY----xxxx-----END PRIVATE KEY-----\n"メタサービスアカウントの作成時に生成されたJSONファイルのプライベートキー。
      gcp.gcs.impersonation_service_account"""hello"インパーソネートしたいデータサービスアカウント。

その他のS3互換ストレージシステム

他のS3互換ストレージシステム(例:MinIO)を選択する場合、StorageCredentialParamsを次のように設定します:

"aws.s3.enable_ssl" = "false",
"aws.s3.enable_path_style_access" = "true",
"aws.s3.endpoint" = "<s3_endpoint>",
"aws.s3.access_key" = "<iam_user_access_key>",
"aws.s3.secret_key" = "<iam_user_secret_key>"

StorageCredentialParamsで設定する必要があるパラメータは次のように説明されています。

パラメータ必須説明
aws.s3.enable_sslはいSSL接続を有効にするかどうかを指定します。有効な値:trueおよびfalse。デフォルト値:true
aws.s3.enable_path_style_accessはいパススタイルのURLアクセスを有効にするかどうかを指定します。有効な値:trueおよびfalse。デフォルト値:false。MinIOの場合、値をtrueに設定する必要があります。
aws.s3.endpointはいAWS S3の代わりにS3互換ストレージシステムに接続するために使用されるエンドポイント。
aws.s3.access_keyはいIAMユーザーのアクセスキー。
aws.s3.secret_keyはいIAMユーザーのシークレットキー。

Microsoft Azure Storage

Azure Blob Storage

Blob Storageをストレージシステムとして選択する場合、次のいずれかのアクションを実行します:

  • 共有キー認証方法を選択するには、StorageCredentialParamsを次のように設定します:

    "azure.blob.storage_account" = "<storage_account_name>",
    "azure.blob.shared_key" = "<storage_account_shared_key>"

    StorageCredentialParamsで設定する必要があるパラメータは次のように説明されています。

    パラメータ必須説明
    azure.blob.storage_accountはいBlob Storageアカウントのユーザー名。
    azure.blob.shared_keyはいBlob Storageアカウントの共有キー。
  • SASトークン認証方法を選択するには、StorageCredentialParamsを次のように設定します:

    "azure.blob.storage_account" = "<storage_account_name>",
    "azure.blob.container" = "<container_name>",
    "azure.blob.sas_token" = "<storage_account_SAS_token>"

    StorageCredentialParamsで設定する必要があるパラメータは次のように説明されています。

    パラメータ必須説明
    azure.blob.storage_accountはいBlob Storageアカウントのユーザー名。
    azure.blob.containerはいデータを保存するBlobコンテナの名前。
    azure.blob.sas_tokenはいBlob Storageアカウントにアクセスするために使用されるSASトークン。
Azure Data Lake Storage Gen2

Data Lake Storage Gen2をストレージシステムとして選択する場合、次のいずれかのアクションを実行します:

  • マネージドID認証方法を選択するには、StorageCredentialParamsを次のように設定します:

    "azure.adls2.oauth2_use_managed_identity" = "true",
    "azure.adls2.oauth2_tenant_id" = "<service_principal_tenant_id>",
    "azure.adls2.oauth2_client_id" = "<service_client_id>"

    StorageCredentialParamsで設定する必要があるパラメータは次のように説明されています。

    パラメータ必須説明
    azure.adls2.oauth2_use_managed_identityはいマネージドID認証方法を有効にするかどうかを指定します。値をtrueに設定します。
    azure.adls2.oauth2_tenant_idはいアクセスしたいテナントのID。
    azure.adls2.oauth2_client_idはいマネージドIDのクライアント(アプリケーション)ID。
  • 共有キー認証方法を選択するには、StorageCredentialParamsを次のように設定します:

    "azure.adls2.storage_account" = "<storage_account_name>",
    "azure.adls2.shared_key" = "<storage_account_shared_key>"

StorageCredentialParamsで設定する必要があるパラメータは次のように説明されています。

パラメータ必須説明
azure.adls2.storage_accountはいData Lake Storage Gen2ストレージアカウントのユーザー名。
azure.adls2.shared_keyはいData Lake Storage Gen2ストレージアカウントの共有キー。
  • サービスプリンシパル認証方法を選択するには、StorageCredentialParamsを次のように設定します:

    "azure.adls2.oauth2_client_id" = "<service_client_id>",
    "azure.adls2.oauth2_client_secret" = "<service_principal_client_secret>",
    "azure.adls2.oauth2_client_endpoint" = "<service_principal_client_endpoint>"

    StorageCredentialParamsで設定する必要があるパラメータは次のように説明されています。

    パラメータ必須説明
    azure.adls2.oauth2_client_idはいサービスプリンシパルのクライアント(アプリケーション)ID。
    azure.adls2.oauth2_client_secretはい作成された新しいクライアント(アプリケーション)シークレットの値。
    azure.adls2.oauth2_client_endpointはいサービスプリンシパルまたはアプリケーションのOAuth 2.0トークンエンドポイント(v1)。
Azure Data Lake Storage Gen1

Data Lake Storage Gen1をストレージシステムとして選択する場合、次のいずれかのアクションを実行します:

  • マネージドサービスID認証方法を選択するには、StorageCredentialParamsを次のように設定します:

    "azure.adls1.use_managed_service_identity" = "true"

    StorageCredentialParamsで設定する必要があるパラメータは次のように説明されています。

    パラメータ必須説明
    azure.adls1.use_managed_service_identityはいマネージドサービスID認証方法を有効にするかどうかを指定します。値をtrueに設定します。
  • サービスプリンシパル認証方法を選択するには、StorageCredentialParamsを次のように設定します:

    "azure.adls1.oauth2_client_id" = "<application_client_id>",
    "azure.adls1.oauth2_credential" = "<application_client_credential>",
    "azure.adls1.oauth2_endpoint" = "<OAuth_2.0_authorization_endpoint_v2>"

    StorageCredentialParamsで設定する必要があるパラメータは次のように説明されています。

    パラメータ必須説明
    azure.adls1.oauth2_client_idはいクライアント(アプリケーション)のID。
    azure.adls1.oauth2_credentialはい作成された新しいクライアント(アプリケーション)シークレットの値。
    azure.adls1.oauth2_endpointはいサービスプリンシパルまたはアプリケーションのOAuth 2.0トークンエンドポイント(v1)。

opt_properties

ロードジョブ全体に適用されるオプションのパラメータを指定します。構文:

PROPERTIES ("<key1>" = "<value1>"[, "<key2>" = "<value2>" ...])

サポートされているパラメータは次のとおりです:

  • timeout

    ロードジョブのタイムアウト期間を指定します。単位:秒。デフォルトのタイムアウト期間は4時間です。タイムアウト期間を6時間未満に指定することをお勧めします。ロードジョブがタイムアウト期間内に終了しない場合、StarRocksはロードジョブをキャンセルし、ロードジョブのステータスはCANCELLEDになります。

    注意

    ほとんどの場合、タイムアウト期間を設定する必要はありません。ロードジョブがデフォルトのタイムアウト期間内に終了しない場合にのみ、タイムアウト期間を設定することをお勧めします。

    タイムアウト期間を推測するには、次の式を使用します:

    タイムアウト期間 > (ロードするデータファイルの総サイズ x ロードするデータファイルの総数およびデータファイルに作成されたマテリアライズドビューの数)/平均ロード速度

    注意

    「平均ロード速度」は、StarRocksクラスター全体の平均ロード速度です。平均ロード速度は、サーバー構成やクラスターの最大同時クエリータスク数によって異なるため、クラスターごとに異なります。過去のロードジョブのロード速度に基づいて平均ロード速度を推測できます。

    たとえば、1GBのデータファイルをロードし、その上に2つのマテリアライズドビューを作成し、StarRocksクラスターの平均ロード速度が10 MB/sである場合、データロードに必要な時間は約102秒です。

    (1 x 1024 x 3)/10 = 307.2 (秒)

    この例では、タイムアウト期間を308秒以上に設定することをお勧めします。

  • max_filter_ratio

    ロードジョブの最大エラー許容度を指定します。最大エラー許容度は、不十分なデータ品質の結果としてフィルタリングされる行の最大割合です。有効な値:01。デフォルト値:0

    • このパラメータを0に設定すると、StarRocksはロード中に不適格な行を無視しません。このため、ソースデータに不適格な行が含まれている場合、ロードジョブは失敗します。これにより、StarRocksにロードされるデータの正確性が保証されます。

    • このパラメータを0より大きい値に設定すると、StarRocksはロード中に不適格な行を無視できます。このため、ソースデータに不適格な行が含まれていても、ロードジョブは成功する可能性があります。

      注意

      不十分なデータ品質のためにフィルタリングされる行には、WHERE句によってフィルタリングされる行は含まれません。

    最大エラー許容度が0に設定されているためにロードジョブが失敗した場合、SHOW LOADを使用してジョブ結果を表示できます。その後、不適格な行をフィルタリングできるかどうかを判断します。不適格な行をフィルタリングできる場合、ジョブ結果で返されるdpp.abnorm.ALLdpp.norm.ALLの値に基づいて最大エラー許容度を計算し、最大エラー許容度を調整してロードジョブを再送信します。最大エラー許容度を計算するための式は次のとおりです:

    max_filter_ratio = [dpp.abnorm.ALL/(dpp.abnorm.ALL + dpp.norm.ALL)]

    dpp.abnorm.ALLdpp.norm.ALLの値の合計は、ロードされる行の総数です。

  • log_rejected_record_num

    ログに記録できる不適格なデータ行の最大数を指定します。このパラメータはv3.1以降でサポートされています。有効な値:0-1、および任意の非ゼロの正の整数。デフォルト値:0

    • 0は、フィルタリングされたデータ行がログに記録されないことを指定します。
    • -1は、フィルタリングされたすべてのデータ行がログに記録されることを指定します。
    • 非ゼロの正の整数(例:n)は、各BEで最大nのフィルタリングされたデータ行がログに記録されることを指定します。
  • load_mem_limit

    ロードジョブに提供できる最大メモリ量を指定します。このパラメータの値は、各BEまたはCNノードでサポートされる上限メモリを超えることはできません。単位:バイト。デフォルトのメモリ制限は2 GBです。

  • strict_mode

    strict modeを有効にするかどうかを指定します。有効な値:trueおよびfalse。デフォルト値:falsetrueはstrict modeを有効にし、falseはstrict modeを無効にします。

  • timezone

    ロードジョブのタイムゾーンを指定します。デフォルト値:Asia/Shanghai。タイムゾーンの設定は、strftime、alignment_timestamp、from_unixtimeなどの関数によって返される結果に影響を与えます。詳細については、Configure a time zoneを参照してください。timezoneパラメータで指定されたタイムゾーンはセッションレベルのタイムゾーンです。

  • priority

    ロードジョブの優先度を指定します。有効な値:LOWESTLOWNORMALHIGH、およびHIGHEST。デフォルト値:NORMAL。Broker LoadはFEパラメータmax_broker_load_job_concurrencyを提供し、StarRocksクラスター内で同時に実行できるBroker Loadジョブの最大数を決定します。指定された時間内に送信されたBroker Loadジョブの数が最大数を超える場合、過剰なジョブはその優先度に基づいてスケジュールされるまで待機します。

    QUEUEINGまたはLOADING状態にある既存のロードジョブの優先度を変更するには、ALTER LOAD文を使用できます。

    StarRocksはv2.5以降、Broker Loadジョブのpriorityパラメータの設定を許可しています。

  • merge_condition

    更新が有効になるかどうかを判断するために使用したい列の名前を指定します。ソースレコードから宛先レコードへの更新は、指定された列でソースデータレコードが宛先データレコードよりも大きいか等しい値を持つ場合にのみ有効になります。

    注意

    指定する列は主キー列であってはなりません。さらに、主キーテーブルを使用するテーブルのみが条件付き更新をサポートしています。

列マッピング

データファイルの列がStarRocksテーブルの列に順番に1対1でマッピングできる場合、データファイルとStarRocksテーブル間の列マッピングを設定する必要はありません。

データファイルの列がStarRocksテーブルの列に順番に1対1でマッピングできない場合、columnsパラメータを使用してデータファイルとStarRocksテーブル間の列マッピングを設定する必要があります。これには次の2つのユースケースが含まれます:

  • 同じ数の列だが異なる列順序。 また、データファイルからのデータは、StarRocksテーブルの対応する列にロードされる前に関数によって計算される必要はありません。

    columnsパラメータでは、データファイルの列が配置されている順序と同じ順序でStarRocksテーブルの列名を指定する必要があります。

    たとえば、StarRocksテーブルは順番にcol1col2col3の3つの列で構成されており、データファイルも3つの列で構成されており、順番にStarRocksテーブルのcol3col2col1にマッピングできます。この場合、"columns: col3, col2, col1"を指定する必要があります。

  • 異なる数の列と異なる列順序。また、データファイルからのデータは、StarRocksテーブルの対応する列にロードされる前に関数によって計算される必要があります。

    columnsパラメータでは、データファイルの列が配置されている順序と同じ順序でStarRocksテーブルの列名を指定し、データを計算するために使用したい関数を指定する必要があります。2つの例は次のとおりです:

    • StarRocksテーブルは順番にcol1col2col3の3つの列で構成されています。データファイルは4つの列で構成されており、そのうち最初の3つの列は順番にStarRocksテーブルのcol1col2col3にマッピングされ、4番目の列はStarRocksテーブルのいずれの列にもマッピングされません。この場合、データファイルの4番目の列に一時的な名前を指定する必要があり、その一時的な名前はStarRocksテーブルの列名とは異なる必要があります。たとえば、"columns: col1, col2, col3, temp"と指定できます。この場合、データファイルの4番目の列は一時的にtempと名付けられます。
    • StarRocksテーブルは順番にyearmonthdayの3つの列で構成されています。データファイルは、yyyy-mm-dd hh:mm:ss形式の日付と時刻の値を含む1つの列のみで構成されています。この場合、"columns: col, year = year(col), month=month(col), day=day(col)"と指定できます。この場合、colはデータファイル列の一時的な名前であり、関数year = year(col)month=month(col)、およびday=day(col)はデータファイル列colからデータを抽出し、対応するStarRocksテーブルの列にデータをロードするために使用されます。たとえば、year = year(col)はデータファイル列colからyyyyデータを抽出し、StarRocksテーブル列yearにデータをロードするために使用されます。

詳細な例については、Configure column mappingを参照してください。

関連する設定項目

FE設定項目max_broker_load_job_concurrencyは、StarRocksクラスター内で同時に実行できるBroker Loadジョブの最大数を指定します。

StarRocks v2.4以前では、特定の期間内に送信されたBroker Loadジョブの総数が最大数を超える場合、過剰なジョブはその送信時間に基づいてキューに入れられ、スケジュールされます。

StarRocks v2.5以降では、特定の期間内に送信されたBroker Loadジョブの総数が最大数を超える場合、過剰なジョブはその優先度に基づいてキューに入れられ、スケジュールされます。上記で説明したpriorityパラメータを使用してジョブの優先度を指定できます。QUEUEINGまたはLOADING状態にある既存のジョブの優先度を変更するには、ALTER LOADを使用できます。

ジョブの分割と同時実行

Broker Loadジョブは、1つ以上のタスクに分割され、同時に実行されることができます。ロードジョブ内のタスクは単一のトランザクション内で実行されます。それらはすべて成功するか失敗する必要があります。StarRocksは、LOAD文でdata_descをどのように宣言するかに基づいて各ロードジョブを分割します:

  • 複数のdata_descパラメータを宣言し、それぞれが異なるテーブルを指定している場合、各テーブルのデータをロードするためのタスクが生成されます。

  • 複数のdata_descパラメータを宣言し、それぞれが同じテーブルの異なるパーティションを指定している場合、各パーティションのデータをロードするためのタスクが生成されます。

さらに、各タスクは1つ以上のインスタンスに分割され、StarRocksクラスターのBEsまたはCNsに均等に分配され、同時に実行されます。StarRocksは、FEパラメータmin_bytes_per_broker_scannerとBEまたはCNノードの数に基づいて各タスクを分割します。個々のタスク内のインスタンス数を計算するには、次の式を使用します:

個々のタスク内のインスタンス数 = min(個々のタスクによってロードされるデータ量/min_bytes_per_broker_scanner, BE/CNノードの数)

ほとんどの場合、各ロードジョブには1つのdata_descのみが宣言されており、各ロードジョブは1つのタスクにのみ分割され、そのタスクはBEまたはCNノードの数と同じ数のインスタンスに分割されます。

このセクションでは、HDFSを例にとってさまざまなロード設定を説明します。

CSVデータのロード

このセクションでは、CSVを例にとって、さまざまなロード要件を満たすために使用できるさまざまなパラメータ設定を説明します。

タイムアウト期間の設定

StarRocksデータベースtest_dbには、table1という名前のテーブルがあります。このテーブルは、順番にcol1col2col3の3つの列で構成されています。

データファイルexample1.csvも3つの列で構成されており、順番にtable1col1col2col3にマッピングされています。

example1.csvからtable1に3600秒以内にすべてのデータをロードしたい場合、次のコマンドを実行します:

LOAD LABEL test_db.label1
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example1.csv")
INTO TABLE table1
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
)
PROPERTIES
(
"timeout" = "3600"
);

最大エラー許容度の設定

StarRocksデータベースtest_dbには、table2という名前のテーブルがあります。このテーブルは、順番にcol1col2col3の3つの列で構成されています。

データファイルexample2.csvも3つの列で構成されており、順番にtable2col1col2col3にマッピングされています。

example2.csvからtable2に最大エラー許容度0.1でデータをロードしたい場合、次のコマンドを実行します:

LOAD LABEL test_db.label2
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example2.csv")
INTO TABLE table2

)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
)
PROPERTIES
(
"max_filter_ratio" = "0.1"
);

ファイルパスからすべてのデータファイルをロード

StarRocksデータベースtest_dbには、table3という名前のテーブルがあります。このテーブルは、順番にcol1col2col3の3つの列で構成されています。

HDFSクラスターの/user/starrocks/data/input/パスに保存されているすべてのデータファイルも、それぞれ3つの列で構成されており、順番にtable3col1col2col3にマッピングされています。これらのデータファイルで使用される列区切り文字は\x01です。

HDFSサーバー上のhdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/パスに保存されているすべてのデータファイルからtable3にデータをロードしたい場合、次のコマンドを実行します:

LOAD LABEL test_db.label3
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/*")
INTO TABLE table3
COLUMNS TERMINATED BY "\\x01"
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);

NameNode HAメカニズムの設定

StarRocksデータベースtest_dbには、table4という名前のテーブルがあります。このテーブルは、順番にcol1col2col3の3つの列で構成されています。

データファイルexample4.csvも3つの列で構成されており、table4col1col2col3にマッピングされています。

NameNodeのHAメカニズムが設定された状態でexample4.csvからtable4にすべてのデータをロードしたい場合、次のコマンドを実行します:

LOAD LABEL test_db.label4
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example4.csv")
INTO TABLE table4
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>",
"dfs.nameservices" = "my_ha",
"dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2","dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
"dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);

Kerberos認証の設定

StarRocksデータベースtest_dbには、table5という名前のテーブルがあります。このテーブルは、順番にcol1col2col3の3つの列で構成されています。

データファイルexample5.csvも3つの列で構成されており、順番にtable5col1col2col3にマッピングされています。

Kerberos認証が設定され、keytabファイルパスが指定された状態でexample5.csvからtable5にすべてのデータをロードしたい場合、次のコマンドを実行します:

LOAD LABEL test_db.label5
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example5.csv")
INTO TABLE table5
COLUMNS TERMINATED BY "\t"
)
WITH BROKER
(
"hadoop.security.authentication" = "kerberos",
"kerberos_principal" = "starrocks@YOUR.COM",
"kerberos_keytab" = "/home/starRocks/starRocks.keytab"
);

データロードの取り消し

StarRocksデータベースtest_dbには、table6という名前のテーブルがあります。このテーブルは、順番にcol1col2col3の3つの列で構成されています。

データファイルexample6.csvも3つの列で構成されており、順番にtable6col1col2col3にマッピングされています。

Broker Loadジョブを実行してexample6.csvからtable6にすべてのデータをロードしました。

ロードしたデータを取り消したい場合、次のコマンドを実行します:

LOAD LABEL test_db.label6
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example6.csv")
NEGATIVE
INTO TABLE table6
COLUMNS TERMINATED BY "\t"
)
WITH BROKER
(
"hadoop.security.authentication" = "kerberos",
"kerberos_principal" = "starrocks@YOUR.COM",
"kerberos_keytab" = "/home/starRocks/starRocks.keytab"
);

宛先パーティションの指定

StarRocksデータベースtest_dbには、table7という名前のテーブルがあります。このテーブルは、順番にcol1col2col3の3つの列で構成されています。

データファイルexample7.csvも3つの列で構成されており、順番にtable7col1col2col3にマッピングされています。

example7.csvからtable7の2つのパーティションp1p2にすべてのデータをロードしたい場合、次のコマンドを実行します:

LOAD LABEL test_db.label7
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example7.csv")
INTO TABLE table7
PARTITION (p1, p2)
COLUMNS TERMINATED BY ","
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);

列マッピングの設定

StarRocksデータベースtest_dbには、table8という名前のテーブルがあります。このテーブルは、順番にcol1col2col3の3つの列で構成されています。

データファイルexample8.csvも3つの列で構成されており、順番にtable8col2col1col3にマッピングされています。

example8.csvからtable8にすべてのデータをロードしたい場合、次のコマンドを実行します:

LOAD LABEL test_db.label8
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example8.csv")
INTO TABLE table8
COLUMNS TERMINATED BY ","
(col2, col1, col3)
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);

注意

前述の例では、example8.csvの列はtable8の列と同じ順序でマッピングできません。そのため、column_listを使用してexample8.csvtable8の間の列マッピングを設定する必要があります。

フィルタ条件の設定

StarRocksデータベースtest_dbには、table9という名前のテーブルがあります。このテーブルは、順番にcol1col2col3の3つの列で構成されています。

データファイルexample9.csvも3つの列で構成されており、順番にtable9col1col2col3にマッピングされています。

example9.csvからtable9に、最初の列の値が20180601より大きいデータレコードのみをロードしたい場合、次のコマンドを実行します:

LOAD LABEL test_db.label9
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example9.csv")
INTO TABLE table9
(col1, col2, col3)
where col1 > 20180601
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);

注意

前述の例では、example9.csvの列はtable9の列と同じ順序でマッピングできますが、WHERE句を使用して列ベースのフィルタ条件を指定する必要があります。そのため、column_listを使用してexample9.csvtable9の間の列マッピングを設定する必要があります。

HLL型の列を含むテーブルへのデータロード

StarRocksデータベースtest_dbには、table10という名前のテーブルがあります。このテーブルは、順番にidcol1col2col3の4つの列で構成されています。col1col2はHLL型の列として定義されています。

データファイルexample10.csvは3つの列で構成されており、最初の列はtable10idにマッピングされ、2番目と3番目の列は順番にtable10col1col2にマッピングされています。example10.csvの2番目と3番目の列の値は、関数を使用してHLL型データに変換され、table10col1col2にロードされます。

example10.csvからtable10にすべてのデータをロードしたい場合、次のコマンドを実行します:

LOAD LABEL test_db.label10
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example10.csv")
INTO TABLE table10
COLUMNS TERMINATED BY ","
(id, temp1, temp2)
SET
(
col1 = hll_hash(temp1),
col2 = hll_hash(temp2),
col3 = empty_hll()
)
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);

注意

前述の例では、example10.csvの3つの列はcolumn_listを使用して順番にidtemp1temp2と名付けられます。その後、関数を使用してデータを変換します:

  • hll_hash関数は、example10.csvtemp1temp2の値をHLL型データに変換し、example10.csvtemp1temp2table10col1col2にマッピングします。

  • hll_empty関数は、指定されたデフォルト値をtable10col3に埋め込むために使用されます。

関数hll_hashおよびhll_emptyの使用については、hll_hashおよびhll_emptyを参照してください。

ファイルパスからパーティションフィールドの値を抽出

Broker Loadは、宛先StarRocksテーブルの列定義に基づいて、ファイルパスに含まれる特定のパーティションフィールドの値を解析することをサポートしています。このStarRocksの機能は、Apache Spark™のPartition Discovery機能に似ています。

StarRocksデータベースtest_dbには、table11という名前のテーブルがあります。このテーブルは、順番にcol1col2col3cityutc_dateの5つの列で構成されています。

HDFSクラスターのファイルパス/user/starrocks/data/input/dir/city=beijingには、次のデータファイルが含まれています:

  • /user/starrocks/data/input/dir/city=beijing/utc_date=2019-06-26/0000.csv

  • /user/starrocks/data/input/dir/city=beijing/utc_date=2019-06-26/0001.csv

これらのデータファイルはそれぞれ3つの列で構成されており、順番にtable11col1col2col3にマッピングされています。

ファイルパス/user/starrocks/data/input/dir/city=beijing/utc_date=*/*からすべてのデータファイルをtable11にロードし、同時にファイルパスに含まれるパーティションフィールドcityutc_dateの値を抽出してtable11cityutc_dateにロードしたい場合、次のコマンドを実行します:

LOAD LABEL test_db.label11
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/dir/city=beijing/*/*")
INTO TABLE table11
FORMAT AS "csv"
(col1, col2, col3)
COLUMNS FROM PATH AS (city, utc_date)
SET (uniq_id = md5sum(k1, city))
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);

%3Aを含むファイルパスからパーティションフィールドの値を抽出

HDFSでは、ファイルパスにコロン(:)を含めることはできません。すべてのコロン(:)は%3Aに変換されます。

StarRocksデータベースtest_dbには、table12という名前のテーブルがあります。このテーブルは、順番にdata_timecol1col2の3つの列で構成されています。テーブルスキーマは次のとおりです:

data_time DATETIME,
col1 INT,
col2 INT

HDFSクラスターのファイルパス/user/starrocks/dataには、次のデータファイルが含まれています:

  • /user/starrocks/data/data_time=2020-02-17 00%3A00%3A00/example12.csv

  • /user/starrocks/data/data_time=2020-02-18 00%3A00%3A00/example12.csv

example12.csvからtable12にすべてのデータをロードし、同時にファイルパスからパーティションフィールドdata_timeの値を抽出してtable12data_timeにロードしたい場合、次のコマンドを実行します:

LOAD LABEL test_db.label12
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/*/example12.csv")
INTO TABLE table12
COLUMNS TERMINATED BY ","
FORMAT AS "csv"
(col1,col2)
COLUMNS FROM PATH AS (data_time)
SET (data_time = str_to_date(data_time, '%Y-%m-%d %H%%3A%i%%3A%s'))
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);

注意

前述の例では、パーティションフィールドdata_timeから抽出された値は、2020-02-17 00%3A00%3A00のように%3Aを含む文字列です。したがって、str_to_date関数を使用して、文字列をdata_timeにロードする前にDATETIME型データに変換する必要があります。

format_type_optionsの設定

StarRocksデータベースtest_dbには、table13という名前のテーブルがあります。このテーブルは、順番にcol1col2col3の3つの列で構成されています。

データファイルexample13.csvも3つの列で構成されており、順番にtable13col2col1col3にマッピングされています。

example13.csvからtable13にすべてのデータをロードし、example13.csvの最初の2行をスキップし、列区切り文字の前後のスペースを削除し、enclose\に設定し、escape\に設定したい場合、次のコマンドを実行します:

LOAD LABEL test_db.label13
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/*/example13.csv")
INTO TABLE table13
COLUMNS TERMINATED BY ","
FORMAT AS "CSV"
(
skip_header = 2
trim_space = TRUE
enclose = "\""
escape = "\\"
)
(col2, col1, col3)
)
WITH BROKER
(
"username" = "hdfs_username",
"password" = "hdfs_password"
)
PROPERTIES
(
"timeout" = "3600"
);

Parquetデータのロード

このセクションでは、Parquetデータをロードする際に注意すべきパラメータ設定について説明します。

StarRocksデータベースtest_dbには、table13という名前のテーブルがあります。このテーブルは、順番にcol1col2col3の3つの列で構成されています。

データファイルexample13.parquetも3つの列で構成されており、順番にtable13col1col2col3にマッピングされています。

example13.parquetからtable13にすべてのデータをロードしたい場合、次のコマンドを実行します:

LOAD LABEL test_db.label13
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example13.parquet")
INTO TABLE table13
FORMAT AS "parquet"
(col1, col2, col3)
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);

注意

デフォルトでは、Parquetデータをロードする際、StarRocksはファイル名に**.parquet拡張子が含まれているかどうかに基づいてデータファイル形式を決定します。ファイル名に.parquet**拡張子が含まれていない場合、FORMAT ASを使用してデータファイル形式をParquetとして指定する必要があります。

ORCデータのロード

このセクションでは、ORCデータをロードする際に注意すべきパラメータ設定について説明します。

StarRocksデータベースtest_dbには、table14という名前のテーブルがあります。このテーブルは、順番にcol1col2col3の3つの列で構成されています。

データファイルexample14.orcも3つの列で構成されており、順番にtable14col1col2col3にマッピングされています。

example14.orcからtable14にすべてのデータをロードしたい場合、次のコマンドを実行します:

LOAD LABEL test_db.label14
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/example14.orc")
INTO TABLE table14
FORMAT AS "orc"
(col1, col2, col3)
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
);

注意

  • デフォルトでは、ORCデータをロードする際、StarRocksはファイル名に**.orc拡張子が含まれているかどうかに基づいてデータファイル形式を決定します。ファイル名に.orc**拡張子が含まれていない場合、FORMAT ASを使用してデータファイル形式をORCとして指定する必要があります。

  • StarRocks v2.3以前では、データファイルにARRAY型の列が含まれている場合、ORCデータファイルの列名がStarRocksテーブルの対応する列名と同じであることを確認する必要があり、SET句で列を指定することはできません。