Edit

Synchronize data from MySQL

This topic describes how to synchronize data from MySQL to StarRocks in seconds by using Flink CDC Connector, flink-starrocks-connector, and StarRocks Migration Tools (SMT).

Before you begin

Before you start data synchronization, enable the binary logging in MySQL and download the following software packages:

  • Modify the /etc/my.cnf file to enable binary logging in MySQL and then restart MySQL Server (mysqld). You can execute the SHOW VARIABLES LIKE 'log_bin' statement to check whether binary logging is enabled.

    #Enable binlog
    
    log-bin=/var/lib/mysql/mysql-bin
#log_bin=ON

##Base name for binlog files

#log_bin_basename=/var/lib/mysql/mysql-bin

##Index file to manage all binlog files

#log_bin_index=/var/lib/mysql/mysql-bin.index

#Configure serverid

server-id=1

binlog_format = row
```
  • Apache Flink. Only Apache Flink 1.1 and later are supported. We recommend that you download Apache Flink 1.13.
  • Flink CDC Connector. To synchronize data from MySQL to StarRocks, download Flink-MySQL-CDC that is used with the version of Apache Flink you downloaded.
  • Flink-connector-starrocks. Apache Flink 1.13 uses a different version of flink-connector-starrocks than Apache Flink 1.11 and Apache Flink 1.12.
  • SMT

How it works

MySQL 同步

The preceding figure shows the workflow of data synchronization:

  1. SMT generates the statements that are used to create the source table and the sink table based on the cluster information and the table schema of MySQL and StarRocks.
  2. Flink CDC Connector acquires binary log data from MySQL.
  3. Flink-connector-starrocks loads the data into StarRocks.

Procedure

Perform the following steps to synchronize data:

  1. Duplicate flink-sql-connector-mysql-cdc-xxx.jar and flink-connector-starrocks-xxx.jar to the flink-xxx/lib/ directory.

  2. Decompress SMT and modify the configuration file of SMT:

    • DB: modify the value of this parameter to the connection information of MySQL.

    • be_num: the number of nodes in your StarRocks cluster. This parameter helps you to set the number of buckets more reasonably.

    • [table-rule.1]: the rule based on which you want to match data. You can match the databases and tables by using regular expressions, thus generate the SQL statements that are used to create tables. You can configure multiple rules.

    • flink.starrocks.*: the configuration information of your StarRocks cluster. For more information, see Load data by using flink-connector-starrocks.

      [db]
      
      host = 192.168.1.1
      
      port = 3306
      
      user = root
      
      password =  
    [other]

    # number of backends in StarRocks

    be_num = 3

    # `decimal_v3` is supported since StarRocks-1.18.1

    use_decimal_v3 = false

    # file to save the converted DDL SQL

    output_dir = ./result





    [table-rule.1]

    # pattern to match databases for setting properties

    database = ^console_19321.*$

    # pattern to match tables for setting properties

    table = ^.*$



    ############################################

    ### flink sink configurations

    ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated

    ############################################

    flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030

    flink.starrocks.load-url= 192.168.1.1:8030

    flink.starrocks.username=root

    flink.starrocks.password=

    flink.starrocks.sink.properties.column_separator=\x01

    flink.starrocks.sink.properties.row_delimiter=\x02

    flink.starrocks.sink.buffer-flush.interval-ms=15000
    ```
  1. Use the statements that are generated by SMT to create StarRocks tables and Apache Flink tables. All generated statements are saved in the result directory.

    $./starrocks-migrate-tool
    
    $ls result
    
    flink-create.1.sql    smt.tar.gz              starrocks-create.all.sql
    
    flink-create.all.sql  starrocks-create.1.sql
  2. Create a StarRocks table.

    Mysql -hxx.xx.xx.x -P9030 -uroot -p < starrocks-create.1.sql
  3. Create an Apache Flink table and then start a data synchronization task. Once the synchronization task begins, all existing data and subsequent modifications to data are synchronized to StarRocks.

    Note: When you perform this step, ensure that the Apache Flink cluster has been started. If not, use the flink/bin/start-cluster.sh to start it.

    bin/sql-client.sh -f flink-create.1.sql
  4. Check the status of the synchronization task. If an error occurs, you can view the task log for the error message and then adjust the memory and the slot in the system configuration of conf/flink-conf.yaml. For more information, see Configuration in Apache Flink.

    bin/flink list 

Usage note

  • If you configure multiple match rules, you need to match the database, table, and flink-connector-starrocks for each match rule.

    [table-rule.1]
    
    # pattern to match databases for setting properties
    
    database = ^console_19321.*$
    
    # pattern to match tables for setting properties
    
    table = ^.*$
############################################

### flink sink configurations

### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated

############################################

flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030

flink.starrocks.load-url= 192.168.1.1:8030

flink.starrocks.username=root

flink.starrocks.password=

flink.starrocks.sink.properties.column_separator=\x01

flink.starrocks.sink.properties.row_delimiter=\x02

flink.starrocks.sink.buffer-flush.interval-ms=15000



[table-rule.2]

# pattern to match databases for setting properties

database = ^database2.*$

# pattern to match tables for setting properties

table = ^.*$



############################################

### flink sink configurations

### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated

############################################

flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030

flink.starrocks.load-url= 192.168.1.1:8030

flink.starrocks.username=root

flink.starrocks.password=

# If you cannot select a suitable separator for the loaded data, you can use JSON format by replacing flink.starrocks.sink.properties.column_separator and flink.starrocks.sink.properties.row_delimiter with the following parameters. Note: By doing so, the import performance will be impacted. 

flink.starrocks.sink.properties.strip_outer_array=true

flink.starrocks.sink.properties.format=json
```

> Note: If you want to configure more parameters, such as the frequency to load data, see [Load data by using Flink-connector-starrocks](../loading/Flink-connector-starrocks.md) for more information about `sink`.
  • You can configure an individual match rule for a sharded large table. For example, you have two databases, edu_db_1 and edu_db_2, each database contains two tables, course_1 and course_2. In addition, all these tables use the same schema. You can use the following configurations to load the preceding four tables into StarRocks:

    [table-rule.3]
    
    # pattern to match databases for setting properties
    
    database = ^edu_db_[0-9]*$
    
    # pattern to match tables for setting properties
    
    table = ^course_[0-9]*$
############################################

### flink sink configurations

### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated

############################################

flink.starrocks.jdbc-url=jdbc:mysql://192.168.1.1:9030

flink.starrocks.load-url= 192.168.1.1:8030

flink.starrocks.username=root

flink.starrocks.password=

flink.starrocks.sink.properties.column_separator=\x01

flink.starrocks.sink.properties.row_delimiter=\x02

flink.starrocks.sink.buffer-flush.interval-ms=5000
```

After the tables are loaded, StarRocks generates a new table named `course_auto_shared`. You can modify the name of the table in the configuration file that is automatically generated.
  • If you want to create tables and synchronize data by using the command line of SQL Client, you need to escape the \ (backslash).

    'sink.properties.column_separator' = '\\x01'
    
    'sink.properties.row_delimiter' = '\\x02'