メインコンテンツまでスキップ
バージョン: 3.2

AutoMQ Kafka

AutoMQ for Kafka は、クラウド環境向けに再設計された Kafka のクラウドネイティブバージョンです。AutoMQ Kafka は オープンソース で、Kafka プロトコルと完全に互換性があり、クラウドの利点を最大限に活用しています。自己管理の Apache Kafka と比較して、AutoMQ Kafka はクラウドネイティブアーキテクチャを採用しており、容量の自動スケーリング、ネットワークトラフィックの自動バランス、パーティションの秒単位での移動などの機能を提供します。これらの機能により、ユーザーの総所有コスト (TCO) が大幅に削減されます。

この記事では、StarRocks Routine Load を使用して AutoMQ Kafka にデータをインポートする方法を説明します。Routine Load の基本原則については、Routine Load Fundamentals のセクションを参照してください。

環境の準備

StarRocks とテストデータの準備

StarRocks クラスターが稼働していることを確認してください。

テスト用のデータベースと主キーテーブルを作成します。

create database automq_db;
create table users (
id bigint NOT NULL,
name string NOT NULL,
timestamp string NULL,
status string NULL
) PRIMARY KEY (id)
DISTRIBUTED BY HASH(id)
PROPERTIES (
"enable_persistent_index" = "true"
);
注記

If a StarRocks cluster in a staging environment contains only one BE, the number of replicas can be set to 1 in the PROPERTIES clause, such as PROPERTIES( "replication_num" = "1" ). The default number of replicas is 3, which is also the number recommended for production StarRocks clusters. If you want to use the default number, you do not need to configure the replication_num parameter.

AutoMQ Kafka とテストデータの準備

AutoMQ Kafka 環境とテストデータを準備するには、AutoMQ Quick Start ガイドに従って AutoMQ Kafka クラスターをデプロイします。StarRocks が AutoMQ Kafka サーバーに直接接続できることを確認してください。

AutoMQ Kafka で example_topic という名前のトピックをすばやく作成し、テスト JSON データを書き込むには、次の手順に従います。

トピックの作成

Kafka のコマンドラインツールを使用してトピックを作成します。Kafka 環境にアクセスでき、Kafka サービスが稼働していることを確認してください。トピックを作成するコマンドは次のとおりです。

./kafka-topics.sh --create --topic example_topic --bootstrap-server 10.0.96.4:9092 --partitions 1 --replication-factor 1

注: topicbootstrap-server を Kafka サーバーのアドレスに置き換えてください。

トピック作成の結果を確認するには、次のコマンドを使用します。

./kafka-topics.sh --describe example_topic --bootstrap-server 10.0.96.4:9092

テストデータの生成

シンプルな JSON 形式のテストデータを生成します。

{
"id": 1,
"name": "testuser",
"timestamp": "2023-11-10T12:00:00",
"status": "active"
}

テストデータの書き込み

Kafka のコマンドラインツールまたはプログラミング手法を使用して、テストデータを example_topic に書き込みます。コマンドラインツールを使用した例は次のとおりです。

echo '{"id": 1, "name": "testuser", "timestamp": "2023-11-10T12:00:00", "status": "active"}' | sh kafka-console-producer.sh --broker-list 10.0.96.4:9092 --topic example_topic

注: topicbootstrap-server を Kafka サーバーのアドレスに置き換えてください。

最近書き込まれたトピックデータを表示するには、次のコマンドを使用します。

sh kafka-console-consumer.sh --bootstrap-server 10.0.96.4:9092 --topic example_topic --from-beginning

Routine Load タスクの作成

StarRocks のコマンドラインで、AutoMQ Kafka トピックからデータを継続的にインポートする Routine Load タスクを作成します。

CREATE ROUTINE LOAD automq_example_load ON users
COLUMNS(id, name, timestamp, status)
PROPERTIES
(
"desired_concurrent_number" = "5",
"format" = "json",
"jsonpaths" = "[\"$.id\",\"$.name\",\"$.timestamp\",\"$.status\"]"
)
FROM KAFKA
(
"kafka_broker_list" = "10.0.96.4:9092",
"kafka_topic" = "example_topic",
"kafka_partitions" = "0",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

注: kafka_broker_list を Kafka サーバーのアドレスに置き換えてください。

パラメータの説明

データフォーマット

PROPERTIES 句の "format" = "json" でデータフォーマットを JSON と指定します。

データ抽出と変換

ソースデータとターゲットテーブル間のマッピングと変換関係を指定するには、COLUMNS と jsonpaths パラメータを設定します。COLUMNS 内の列名はターゲットテーブルの列名に対応し、その順序はソースデータ内の列順に対応します。jsonpaths パラメータは、JSON データから必要なフィールドデータを抽出するために使用され、新しく生成された CSV データに似ています。その後、COLUMNS パラメータは jsonpaths 内のフィールドに順番に一時的な名前を付けます。データ変換の詳細については、Data Transformation during Import を参照してください。

注: 各 JSON オブジェクトが行ごとにターゲットテーブルの列に対応するキー名と数量(順序は不要)を持っている場合、COLUMNS を設定する必要はありません。

データインポートの検証

まず、Routine Load インポートジョブを確認し、Routine Load インポートタスクのステータスが RUNNING であることを確認します。

show routine load\G

次に、StarRocks データベース内の対応するテーブルをクエリし、データが正常にインポートされたことを確認します。

StarRocks > select * from users;
+------+--------------+---------------------+--------+
| id | name | timestamp | status |
+------+--------------+---------------------+--------+
| 1 | testuser | 2023-11-10T12:00:00 | active |
| 2 | testuser | 2023-11-10T12:00:00 | active |
+------+--------------+---------------------+--------+
2 rows in set (0.01 sec)