StarRocks Migration Tool (SMT)
StarRocks Migration Tool (SMT) は、StarRocks が提供するデータ移行ツールで、Flink を通じてソースデータベースから StarRocks にデータをロードします。SMT の主な機能は次のとおりです:
- ソースデータベースとターゲットの StarRocks クラスターの情報に基づいて、StarRocks にテーブルを作成するためのステートメントを生成します。
- Flink の SQL クライアントで実行可能な SQL ステートメントを生成し、データ同期のための Flink ジョブを送信します。これにより、パイプラインでのフルまたは増分データ同期が簡素化されます。現在、SMT は以下のソースデータベースをサポートしています:
| ソースデータベース | StarRocks にテーブルを作成するためのステートメントを生成 | フルデータ同期 | 増分データ同期 |
|---|---|---|---|
| MySQL | サポート | サポート | サポート |
| PostgreSQL | サポート | サポート | サポート |
| Oracle | サポート | サポート | サポート |
| Hive | サポート | サポート | サポートされていない |
| ClickHouse | サポート | サポート | サポートされていない |
| SQL Server | サポート | サポート | サポート |
| TiDB | サポート | サポート | サポート |
ダウンロードリンク: https://cdn-thirdparty.starrocks.com/smt.tar.gz?r=2
SMT の使用手順
一般的な手順は次のとおりです:
-
conf/config_prod.conf ファイルを設定します。
-
starrocks-migration-tool を実行します。
-
実行後、SQL スクリプトがデフォルトで result ディレクトリに生成されます。
その後、result ディレクトリ内の SQL スクリプトを使用して、メタデータまたはデータの同期を行うことができます。
SMT の設定
-
[db]: データソースに接続するための情報。typeパラメータで指定されたデータベースタイプに対応するデータソースに接続するための情報を設定します。 -
[other]: 追加の設定。be_numパラメータに実際の BE ノードの数を指定することをお勧めします。 -
flink.starrocks.sink.*: flink-connector-starrocks の設定。詳細な設定と説明については、configuration description を参照してください。 -
[table-rule.1]: データソース内のテーブルをマッチングするためのルール。このルールで設定された正規表現に基づいて CREATE TABLE ステートメントが生成され、データソース内のデータベースとテーブルの名前にマッチします。複数のルールを設定でき、各ルールは対応する結果ファイルを生成します。例えば:[table-rule.1]->result/starrocks-create.1.sql[table-rule.2]->result/starrocks-create.2.sql
各ルールには、データベース、テーブル、および flink-connector-starrocks の設定が含まれている必要があります。
[table-rule.1]
# プロパティを設定するためのデータベースをマッチングするパターン
database = ^database1.*$
# プロパティを設定するためのテーブルをマッチングするパターン
table = ^.*$
schema = ^.*$
############################################
### flink sink の設定
### `connector`、`table-name`、`database-name` は設定しないでください。自動生成されます。
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
flink.starrocks.load-url=192.168.1.1:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.max-retries=10
flink.starrocks.sink.buffer-flush.interval-ms=15000
flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=true
[table-rule.2]
# プロパティを設定するためのデータベースをマッチングするパターン
database = ^database2.*$
# プロパティを設定するためのテーブルをマッチングするパターン
table = ^.*$
schema = ^.*$
############################################
### flink sink の設定
### `connector`、`table-name`、`database-name` は設定しないでください。自動生成されます。
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
flink.starrocks.load-url=192.168.1.1:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.max-retries=10
flink.starrocks.sink.buffer-flush.interval-ms=15000
flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=true -
データベース内でシャードに分割された大きな テーブルに対して、別のルールを設定することができます。例えば、
edu_db_1とedu_db_2の 2 つのデータベースにそれぞれcourse_1とcourse_2というテーブルが含まれており、これらのテーブルが同じ構造を持っているとします。これら 2 つのテーブルから 1 つの StarRocks テーブルにデータをロードして分析するために、次のルールを使用できます。[table-rule.3]
# プロパティを設定するためのデータベースをマッチングするパターン
database = ^edu_db_[0-9]*$
# プロパティを設定す るためのテーブルをマッチングするパターン
table = ^course_[0-9]*$
schema = ^.*$このルールは自動的に多対一のロード関係を形成します。StarRocks に生成されるテーブルのデフォルト名は
course__auto_shardであり、関連する SOL スクリプトでテーブル名を変更することもできます。例えばresult/starrocks-create.3.sqlです。
MySQL から StarRocks への同期
概要
Flink CDC コネクタと SMT を使用すると、MySQL からサブセカンドでデータを同期できます。

画像に示されているように、SMT は MySQL と StarRocks のクラスター情報とテーブル構造に基づいて、Flink のソーステーブルとシンクテーブルの CREATE TABLE ステートメントを自動生成します。Flink CDC コネクタは MySQL Binlog を読み取り、Flink-connector-starrocks はデータを StarRocks に書き込みます。
手順
| 関連項目 | パッケージ名 | ダウンロードリンク |
|---|---|---|
| Flink | flink-x.x.x-bin-scala_2.12.tgz | ここをクリック |
| Flink CDC コネクタ | flink-sql-connector-mysql-cdc-x.x.x.jar | ここをクリック |
| Flink-connector-starrocks | flink-connector-starrocks-x.x.x_flink-x.x.jar | ここをクリック |
| SMT | smt.tar.gz | ここをクリック |
-
Flink をダウンロードします。Flink 1.11 以降がサポートされています。
-
Flink CDC コネクタ をダウンロードします。Flink のバージョンに対応する
flink-sql-connector-mysql-cdc-xxx.jarをダウンロードしてください。 -
Flink-connector-starrocks をダウンロードします。
-
flink-sql-connector-mysql-cdc-xxx.jar と flink-connector-starrocks-xxx.jar を flink-xxx/lib/ にコピーします。
-
smt.tar.gz をダウンロードします。
-
SMT の設定ファイルを抽出して修正します。
[db]
host = 192.168.1.1
port = 3306
user = root
password =
type = mysql
[other]
# StarRocks のバックエンドの数
be_num = 3
# `decimal_v3` は StarRocks-1.18.1 以降でサポートされています
use_decimal_v3 = false
# 変換された DDL SQL を保存するディレクトリ
output_dir = ./result
[table-rule.1]
# プロパティを設定するためのデータベースをマッチングするパターン
database = ^db$
# プロパティを設定するためのテーブルをマッチングするパターン
table = ^table$
schema = ^.*$
############################################
### flink sink の設定
### `connector`、`table-name`、`database-name` は設定しないでください。自動生成されます。
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
flink.starrocks.load-url=192.168.1.1:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.max-retries=10
flink.starrocks.sink.buffer-flush.interval-ms=15000
flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=true -
starrocks-migrate-tool を実行します。すべての SQL スクリプトが result ディレクトリに生成されます。
$./starrocks-migrate-tool
$ls result
flink-create.1.sql smt.tar.gz starrocks-create.all.sql
flink-create.all.sql starrocks-create.1.sql starrocks-external-create.all.sql -
プレフィックスが starrocks-create の SQL スクリプトを使用して、StarRocks にテーブルを生成します。
mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql -
プレフィックスが flink-create の SQL スクリプトを使用して、Flink のソーステーブルとシンクテーブルを生成し、データを同期するための Flink ジョブを開始します。
bin/sql-client.sh embedded < flink-create.all.sql上記のコマンドが正常に実行されると、データを同期するための Flink ジョブが実行され続けます。
-
Flink ジョブのステータスを確認します。
bin/flink listジョブの実行中にエラーが発生した場合は、Flink のログで詳細なエラー情報を確認できます。また、conf/flink-conf.yaml ファイルで Flink の設定を変更することもできます。例えば、メモリやスロットなどです。
注意事項
-
MySQL binlog を有効にする方法
-
/etc/my.cnf を修正します:
# binlog を有効にする
log-bin=/var/lib/mysql/mysql-bin
#log_bin=ON
## binlog ファイルのベース名
#log_bin_basename=/var/lib/mysql/mysql-bin
## すべての binlog ファイルを管理するインデックスファイル
#log_bin_index=/var/lib/mysql/mysql-bin.index
# サーバー ID を設定
server-id=1
binlog_format = row -
mysqld を再起動します。
SHOW VARIABLES LIKE 'log_bin';を実行して、MySQL binlog が有効かどうかを確認できます。
-
PostgreSQL から StarRocks への同期
概要
Flink CDC コネクタと SMT を使用すると、PostgreSQL からサブセカンドでデータを同期できます。
SMT は PostgreSQL と StarRocks のクラスター情報とテーブル構造に基づいて、Flink のソーステーブルとシンクテーブルの CREATE TABLE ステートメントを自動生成します。
Flink CDC コネクタは PostgreSQL の WAL を読み取り、Flink-connector-starrocks はデータを StarRocks に書き込みます。
手順
-
Flink をダウンロードします。Flink のバージョンは 1.11 以降がサポートされています。
-
Flink CDC コネクタ をダウンロードします。Flink のバージョンに対応する
flink-sql-connector-postgres-cdc-xxx.jarをダウンロードしてください。 -
Flink StarRocks コネクタ をダウンロードします。
-
flink-sql-connector-postgres-cdc-xxx.jar と flink-connector-starrocks-xxx.jar を flink-xxx/lib/ にコピーします。
-
smt.tar.gz をダウンロードします。
-
SMT の設定ファイルを抽出して修正します。
[db]
host = 192.168.1.1
port = 5432
user = xxx
password = xxx
type = pgsql
[other]
# StarRocks のバックエンドの数
be_num = 3
# `decimal_v3` は StarRocks-1.18.1 以降でサポートされています
use_decimal_v3 = false
# 変換された DDL SQL を保存するディレクトリ
output_dir = ./result
[table-rule.1]
# プロパティを設定するためのデータベースをマッチングするパターン
database = ^db$
# プロパティを設定するためのテーブルをマッチングするパターン
table = ^table$
# プロパティを設定するためのスキーマをマッチングするパターン
schema = ^.*$
############################################
### flink sink の設定
### `connector`、`table-name`、`database-name` は設定しないでください。自動生成されます。
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
flink.starrocks.load-url=192.168.1.1:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.max-retries=10
flink.starrocks.sink.buffer-flush.interval-ms=15000
flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=true -
starrocks-migrate-tool を実行します。すべての SQL スクリプトが result ディレクトリに生成されます。
$./starrocks-migrate-tool
$ls result
flink-create.1.sql smt.tar.gz starrocks-create.all.sql
flink-create.all.sql starrocks-create.1.sql -
プレフィックスが starrocks-create の SQL スクリプトを使用して、StarRocks にテーブルを生成します。
mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql -
プレフィックスが flink-create の SQL スクリプトを使用して、Flink のソーステーブルとシンクテーブルを生成し、データを同期するための Flink ジョブを開始します。
bin/sql-client.sh embedded < flink-create.all.sql上記のコマンドが正常に実行されると、データを同期するための Flink ジョブが実行され続けます。
-
Flink ジョブのステータスを確認します。
bin/flink listジョブの実行中にエラーが発生した場合は、Flink のログで詳細なエラー情報を確認できます。また、conf/flink-conf.yaml ファイルで Flink の設定を変更することもできます。例えば、メモリやスロットなどです。
注意事項
- PostgreSQL
v9.*の場合、以下のような特別な flink-cdc 設定が必要です(PostgreSQLv10.*以降を使用することをお勧めします。そうでない場合は、WAL デコードプラグインをインストールする必要があります):
############################################
############################################
### `postgresql` 用の flink-cdc プラグイン設定
############################################
### `9.*` 用 decoderbufs, wal2json, wal2json_rds, wal2json_streaming, wal2json_rds_streaming
### https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html
### および https://debezium.io/documentation/reference/postgres-plugins.html を参照
### flink.cdc.decoding.plugin.name = decoderbufs
-
PostgreSQL WAL を有効にする方法
# 接続権限を開く
echo "host all all 0.0.0.0/32 trust" >> pg_hba.conf
echo "host replication all 0.0.0.0/32 trust" >> pg_hba.conf
# WAL 論理レプリケーションを有効にする
echo "wal_level = logical" >> postgresql.conf
echo "max_wal_senders = 2" >> postgresql.conf
echo "max_replication_slots = 8" >> postgresql.conf同期が必要なテーブルに対してレプリカアイデンティティ FULL を指定します。
ALTER TABLE schema_name.table_name REPLICA IDENTITY FULLこれらの変更を行った後、PostgreSQL を再起動します。
Oracle から StarRocks への同期
概要
Flink CDC コネクタと SMT を使用すると、Oracle からサブセカンドでデータを同期できます。
SMT は Oracle と StarRocks のクラスター情報とテーブル構造に基づいて、Flink のソーステーブルとシンクテーブルの CREATE TABLE ステートメントを自動生成します。
Flink CDC コネクタは Oracle の logminer を読み取り、Flink-connector-starrocks はデータを StarRocks に書き込みます。
手順
-
Flink をダウンロードします。Flink のバージョンは 1.11 以降がサポートされています。
-
Flink CDC コネクタ をダウンロードします。Flink のバージョンに対応する
flink-sql-connector-oracle-cdc-xxx.jarをダウンロードしてください。 -
Flink StarRocks コネクタ をダウンロードします。
-
flink-sql-connector-oracle-cdc-xxx.jarとflink-connector-starrocks-xxx.jarをflink-xxx/lib/にコピーします。 -
smt.tar.gz をダウンロードします。
-
SMT の設定ファイルを抽出して修正します。
[db]
host = 192.168.1.1
port = 1521
user = xxx
password = xxx
type = oracle
[other]
# StarRocks のバックエンドの数
be_num = 3
# `decimal_v3` は StarRocks-1.18.1 以降でサポートされています
use_decimal_v3 = false
# 変換された DDL SQL を保存するディレクトリ
output_dir = ./result
[table-rule.1]
# プロパティを設定するためのデータベースをマッチングするパターン
database = ^db$
# プロパティを設定するためのテーブルをマッチングするパターン
table = ^table$
# プロパティを設定するためのスキーマをマッチングするパターン
schema = ^.*$
############################################
### flink sink の設定
### `connector`、`table-name`、`database-name` は設定しないでください。自動生成されます。
############################################
flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030
flink.starrocks.load-url=192.168.1.1:8030
flink.starrocks.username=root
flink.starrocks.password=
flink.starrocks.sink.max-retries=10
flink.starrocks.sink.buffer-flush.interval-ms=15000
flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=true -
starrocks-migrate-tool を実行します。すべての SQL スクリプトが result ディレクトリに生成されます。
$./starrocks-migrate-tool
$ls result
flink-create.1.sql smt.tar.gz starrocks-create.all.sql
flink-create.all.sql starrocks-create.1.sql -
プレフィックスが starrocks-create の SQL スクリプトを使用して、StarRocks にテーブルを生成します。
mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.all.sql -
プレフィックスが flink-create の SQL スクリプトを使用して、Flink のソーステーブルとシンクテーブルを生成し、データを同期するための Flink ジョブを開始します。
bin/sql-client.sh embedded < flink-create.all.sql上記のコマンドが正常に実行されると、データを同期するための Flink ジョブが実行され続けます。
-
Flink ジョブのステータスを確認します。
bin/flink listジョブの実行中にエラーが発生した場合は、Flink のログで詳細なエラー情報を確認できます。また、conf/flink-conf.yaml ファイルで Flink の設定を変更することもできます。例えば、メモリやスロットなどです。
注意事項
-
logminer を使用して Oracle を同期する方法:
# ロギングを有効にする
alter system set db_recovery_file_dest = '/home/oracle/data' scope=spfile;
alter system set db_recovery_file_dest_size = 10G;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
ALTER TABLE schema_name.table_name ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
# ユーザー作成と権限付与
GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE TO flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT CREATE TABLE TO flinkuser;
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser; -
[table-rule.1] のデータベース設定は正規表現をサポートしていないため、完全なデータベース名を指定する必要があります。
-
Oracle12c は CDB モードをサポートしているため、SMT は内部的に CDB が有効かどうかを自動的に判断し、flink-cdc の設定を変更します。ただし、ユーザーは
[db].userの設定に c## プレフィックスを追加する必要があるかどうかに注意を払う必要があります。権限不足の問題を避けるためです。
Hive から StarRocks への同期
概要
このガイドでは、SMT を使用して Hive データを StarRocks に同期する方法を説明します。同期中に、StarRocks に Duplicate テーブルが作成され、Flink ジョブがデータを同期するために実行され続けます。