Apache® Pulsar™ からのデータを継続的にロードする
StarRocks バージョン 2.5 から、Routine Load は Apache® Pulsar™ からのデータを継続的にロードすることをサポートしています。Pulsar は、ストアとコンピュートの分離アーキテクチャを持つ、分散型のオープンソースの pub-sub メッセージングおよびストリーミングプラットフォームです。Routine Load を介して Pulsar からデータをロードすることは、Apache Kafka からデータをロードすることに似ています。このトピックでは、CSV 形式のデータを例に、Routine Load を介して Apache Pulsar からデータをロードする方法を紹介し ます。
サポートされているデータファイル形式
Routine Load は、Pulsar クラスターから CSV および JSON 形式のデータを消費することをサポートしています。
NOTE
CSV 形式のデータについては、StarRocks は 50 バイト以内の UTF-8 エンコードされた文字列をカラムセパレータとしてサポートしています。一般的に使用されるカラムセパレータには、カンマ (,) 、タブ、パイプ (|) があります。
Pulsar に関連する概念
Pulsar のトピックは、プロデューサーからコンシューマーにメッセージを送信するための名前付きチャネルです。Pulsar のトピックは、パーティション付きトピックと非パーティション付きトピックに分かれています。
- パーティション付きトピック は、複数のブローカーによって処理される特別なタイプのトピックであり、より高いスループットを可能にします。パーティション付きトピックは実際には N 個の内部トピックとして実装されており、N はパーティションの数です。
- 非パーティション付きトピック は、単一のブローカーによってのみ提供される通常のタイプのトピックであり、トピックの最大スループットを制限します。
メッセージのメッセージ ID は、メッセージが永続的に保存されるとすぐに BookKeeper インスタンス によって割り当てられます。メッセージ ID は、台帳内のメッセージの特定の位置を示し、Pulsar クラスター内で一意です。
Pulsar は、コンシューマーが consumer.seek(messageId) を通じて初期位置を指定することをサポートしています。しかし、Kafka コンシューマーオフセットが長整数値であるのに対し、メッセージ ID は ledgerId:entryID:partition-index:batch-index の4つの部分で構成されています。
したがって、メッセージから直接メッセージ ID を取得することはできません。その結果、現在、Routine Load は Pulsar からデータをロードする際に初期位置を指定することをサポートしておらず、パーティションの開始または終了からデータを消費することのみをサポートしています。
サブスクリプションは、メッセージがコンシューマーにどのように配信されるかを決定する名前付きの設定ルールです。Pulsar は、コンシューマーが複数のトピックに同時にサブスクライブすることもサポートしています。トピックには複数のサブスクリプションを持つことができます。
サブスクリプションのタイプは、コンシューマーが接続する際に定義され、すべてのコンシューマーを異なる設定で再起動することで変更できます。Pulsar には4つのサブスクリプションタイプがあります:
exclusive(デフォルト): 単一のコンシューマーのみがサブスクリプションに接続できます。1人の顧客のみがメッセージを消費できます。shared: 複数のコンシューマーが同じサブスクリプションに接続できます。メッセージはコンシューマー間でラウンドロビン配信され、特定のメッセージは1つのコンシューマーにのみ配信されます。failover: 複数のコンシューマーが同じサブスクリプションに接続できます。非パーティション付きトピックまたはパーティション付きトピックの各パーティションに対してマスターコンシューマーが選ばれ、メッセージを受信します。マスターコンシューマーが切断されると、すべての(未確認および後続の)メッセージが次のコンシューマーに配信されます。key_shared: 複数のコンシューマーが同じサブスクリプションに接続できます。メッセージはコンシューマー間で配信され、同じキーまたは同じ順序キーを持つメッセージは1つのコンシューマーにのみ配信されます。
Note:
現在、Routine Load は exclusive タイプを使用しています。
Routine Load ジョブを作成する
以下の例は、Pulsar で CSV 形式のメッセージを消費し、Routine Load ジョブを作成して StarRocks にデータをロードする方法を説明します。詳細な手順と参考情報については、CREATE ROUTINE LOAD を参照してください。
CREATE ROUTINE LOAD load_test.routine_wiki_edit_1 ON routine_wiki_edit
COLUMNS TERMINATED BY ",",
ROWS TERMINATED BY "\n",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
WHERE event_time > "2022-01-01 00:00:00",
PROPERTIES
(
"desired_concurrent_number" = "1",
"max_batch_interval" = "15000",
"max_error_number" = "1000"
)
FROM PULSAR
(
"pulsar_service_url" = "pulsar://localhost:6650",
"pulsar_topic" = "persistent://tenant/namespace/topic-name",
"pulsar_subscription" = "load-test",
"pulsar_partitions" = "load-partition-0,load-partition-1",
"pulsar_initial_positions" = "POSITION_EARLIEST,POSITION_LATEST",
"property.auth.token" = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD5Y"
);
Routine Load が Pulsar からデータを消費するために作成されると、data_source_properties を除くほとんどの入力パラメータは Kafka からデータを消費する場合と同じです。data_source_properties を除くパラメータの説明については、CREATE ROUTINE LOAD を参照してください。
data_source_properties に関連するパラメータとその説明は次のとおりです:
| Parameter | Required | Description |
|---|---|---|
| pulsar_service_url | Yes | Pulsar クラスターに接続するために使用される URL。形式: "pulsar://ip:port" または "pulsar://service:port"。例: "pulsar_service_url" = "pulsar://``localhost:6650``" |
| pulsar_topic | Yes | サブスクライブされたトピック。例: "pulsar_topic" = "persistent://tenant/namespace/topic-name" |
| pulsar_subscription | Yes | トピックに設定されたサブスクリプション。例: "pulsar_subscription" = "my_subscription" |
| pulsar_partitions, pulsar_initial_positions | No | pulsar_partitions : トピック内のサブスクライブされたパーティション。pulsar_initial_positions: pulsar_partitions で指定されたパーティションの初期位置。初期位置は pulsar_partitions のパーティションに対応している必要があります。有効な値:POSITION_EARLIEST (デフォルト値): サブスクリプションはパーティション内の最も早く利用可能なメッセージから開始します。POSITION_LATEST: サブスクリプションはパーティション内の最新の利用可能なメッセージから開始します。注意: pulsar_partitions が指定されていない場合、トピックのすべてのパーティションがサブスクライブされます。pulsar_partitions と property.pulsar_default_initial_position の両方が指定されている場合、pulsar_partitions の値が property.pulsar_default_initial_position の値を上書きします。pulsar_partitions も property.pulsar_default_initial_position も指定されていない場合、サブスクリプションはパーティション内の最新の利用可能なメッセージから開始します。例:"pulsar_partitions" = "my-partition-0,my-partition-1,my-partition-2,my-partition-3", "pulsar_initial_positions" = "POSITION_EARLIEST,POSITION_EARLIEST,POSITION_LATEST,POSITION_LATEST" |
Routine Load は、Pulsar 用の以下のカスタムパラメータをサポートしています。
| Parameter | Required | Description |
|---|---|---|
| property.pulsar_default_initial_position | No | トピックのパーティションがサブスクライブされたときのデフォルトの初期位置。このパラメータは pulsar_initial_positions が指定されていない場合に有効です。その有効な値は pulsar_initial_positions の有効な値と同じです。例: "``property.pulsar_default_initial_position" = "POSITION_EARLIEST" |
| property.auth.token | No | Pulsar がセキュリティトークンを使用してクライアントを認証する場合、あなたの身元を確認するためにトークン文字列が必要です。例: "p``roperty.auth.token" = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD" |