Continuously load data from Apache Flink®
StarRocks provides a self-developed connector named StarRocks Connector for Apache Flink® (Flink connector for short) to help you load data into a StarRocks table by using Flink. The basic principle is to accumulate the data and then load it all at a time into StarRocks through STREAM LOAD.
The Flink connector supports DataStream API, Table API & SQL, and Python API. It has a higher and more stable performance than flink-connector-jdbc provided by Apache Flink®.
NOTICE
Loading data into StarRocks tables with Flink connector needs SELECT and INSERT privileges on the target StarRocks table. If you do not have these privileges, follow the instructions provided in GRANT to grant these privileges to the user that you use to connect to your StarRocks cluster.
Version requirements
Connector | Flink | StarRocks | Java | Scala |
---|---|---|---|---|
1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 and later | 8 | 2.11,2.12 |
1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 and later | 8 | 2.11,2.12 |
1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 and later | 8 | 2.11,2.12 |
Obtain Flink connector
You can obtain the Flink connector JAR file in the following ways:
- Directly download the compiled Flink connector JAR file.
- Add the Flink connector as a dependency in your Maven project and then download the JAR file.
- Compile the source code of the Flink connector into a JAR file by yourself.
The naming format of the Flink connector JAR file is as follows:
-
Since Flink 1.15, it's
flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar
. For example, if you install Flink 1.15 and you want to use Flink connector 1.2.7, you can useflink-connector-starrocks-1.2.7_flink-1.15.jar
. -
Prior to Flink 1.15, it's
flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar
. For example, if you install Flink 1.14 and Scala 2.12 in your environment, and you want to use Flink connector 1.2.7, you can useflink-connector-starrocks-1.2.7_flink-1.14_2.12.jar
.
NOTICE
In general, the latest version of the Flink connector only maintains compatibility with the three most recent versions of Flink.
Download the compiled Jar file
Directly download the corresponding version of the Flink connector Jar file from the Maven Central Repository.
Maven Dependency
In your Maven project's pom.xml
file, add the Flink connector as a dependency according to the following format. Replace flink_version
, scala_version
, and connector_version
with the respective versions.
-
In Flink 1.15 and later
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}</version>
</dependency> -
In versions earlier than Flink 1.15
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${connector_version}_flink-${flink_version}_${scala_version}</version>
</dependency>
Compile by yourself
-
Download the Flink connector source code.
-
Execute the following command to compile the source code of Flink connector into a JAR file. Note that
flink_version
is replaced with the corresponding Flink version.sh build.sh <flink_version>
For example, if the Flink version in your environment is 1.15, you need to execute the following command:
sh build.sh 1.15
-
Go to the
target/
directory to find the Flink connector JAR file, such asflink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar
, generated upon compilation.
NOTE
The name of Flink connector which is not formally released contains the
SNAPSHOT
suffix.
Options
connector
Required: Yes
Default value: NONE
Description: The connector that you want to use. The value must be "starrocks".
jdbc-url
Required: Yes
Default value: NONE
Description: The address that is used to connect to the MySQL server of the FE. You can specify multiple addresses, which must be separated by a comma (,). Format: jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3>
.
load-url
Required: Yes
Default value: NONE
Description: The address that is used to connect to the HTTP server of the FE. You can specify multiple addresses, which must be separated by a semicolon (;). Format: <fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2>
.
database-name
Required: Yes
Default value: NONE
Description: The name of the StarRocks database into which you want to load data.
table-name
Required: Yes
Default value: NONE
Description: The name of the table that you want to use to load data into StarRocks.
username
Required: Yes
Default value: NONE
Description: The username of the account that you want to use to load data into StarRocks. The account needs SELECT and INSERT privileges on the target StarRocks table.
password
Required: Yes
Default value: NONE
Description: The password of the preceding account.
sink.version
Required: No
Default value: AUTO
Description: The interface used to load data. This parameter is supported from Flink connector version 1.2.4 onwards.
V1
: Use Stream Load interface to load data. Connectors before 1.2.4 only support this mode.V2
: Use Stream Load transaction interface to load data. It requires StarRocks to be at least version 2.4. RecommendsV2
because it optimizes the memory usage and provides a more stable exactly-once implementation.AUTO
: If the version of StarRocks supports transaction Stream Load, will chooseV2
automatically, otherwise chooseV1
sink.label-prefix
Required: No
Default value: NONE
Description: The label prefix used by Stream Load. Recommend to configure it if you are using exactly-once with connector 1.2.8 and later. See exactly-once usage notes.
sink.semantic
Required: No
Default value: at-least-once
Description: The semantic guaranteed by sink. Valid values: at-least-once and exactly-once.
sink.buffer-flush.max-bytes
Required: No
Default value: 94371840(90M)
Description: The maximum size of data that can be accumulated in memory before being sent to StarRocks at a time. The maximum value ranges from 64 MB to 10 GB. Setting this parameter to a larger value can improve loading performance but may increase loading latency. This parameter only takes effect when sink.semantic
is set to at-least-once
. If sink.semantic
is set to exactly-once
, the data in memory is flushed when a Flink checkpoint is triggered. In this circumstance, this parameter does not take effect.
sink.buffer-flush.max-rows
Required: No
Default value: 500000
Description: The maximum number of rows that can be accumulated in memory before being sent to StarRocks at a time. This parameter is available only when sink.version
is V1
and sink.semantic
is at-least-once
. Valid values: 64000 to 5000000.
sink.buffer-flush.interval-ms
Required: No
Default value: 300000
Description: The interval at which data is flushed. This parameter is available only when sink.semantic
is at-least-once
. Valid values: 1000 to 3600000. Unit: ms.