- StarRocks
- Introduction to StarRocks
- Quick Start
- Table Design
- Data Loading
- Concepts
- Overview of data loading
- Load data from a local file system or a streaming data source using HTTP PUT
- Load data from HDFS or cloud storage
- Continuously load data from Apache Kafka®
- Bulk load using Apache Spark™
- Load data using INSERT
- Load data using Stream Load transaction interface
- Synchronize data from MySQL in real time
- Continuously load data from Apache Flink®
- Change data through loading
- Transform data at loading
- Data Unloading
- Query Data Sources
- Query Acceleration
- Gather CBO statistics
- Materialized view
- Colocate Join
- Lateral Join
- Index
- Computing the Number of Distinct Values
- Administration
- Deployment
- Management
- Data Recovery
- User Privilege and Authentication
- Performance Tuning
- Reference
- SQL Reference
- User Account Management
- Cluster Management
- ADD SQLBLACKLIST
- ADMIN CANCEL REPAIR TABLE
- ADMIN CHECK TABLET
- ADMIN REPAIR TABLE
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- CREATE RESOURCE GROUP
- DELETE SQLBLACKLIST
- DROP FILE
- DROP RESOURCE GROUP
- EXPLAIN
- INSTALL PLUGIN
- KILL
- SET
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROC
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW SQLBLACKLIST
- SHOW TABLE STATUS
- SHOW VARIABLES
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ALTER RESOURCE
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE EXTERNAL CATALOG
- CREATE DATABASE
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP ANALYZE
- DROP STATS
- DROP CATALOG
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW META
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- USE
- DML
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- CANCEL REFRESH MATERIALIZED VIEW
- CREATE ROUTINE LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- RESUME ROUTINE LOAD
- SELECT
- SHOW ALTER TABLE
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEW
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- Auxiliary Commands
- Data Types
- Keywords
- Function Reference
- Java UDFs
- Window functions
- Aggregate Functions
- Array Functions
- Bit Functions
- Bitmap Functions
- base64_to_bitmap
- bitmap_agg
- bitmap_and
- bitmap_andnot
- bitmap_contains
- bitmap_count
- bitmap_from_string
- bitmap_empty
- bitmap_has_any
- bitmap_hash
- bitmap_intersect
- bitmap_max
- bitmap_min
- bitmap_or
- bitmap_remove
- bitmap_to_array
- bitmap_to_string
- bitmap_union
- bitmap_union_count
- bitmap_union_int
- bitmap_xor
- intersect_count
- to_bitmap
- Conditional Functions
- Cryptographic Functions
- Date Functions
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_format
- date_sub, subdate
- date_trunc
- datediff
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_add
- hours_diff
- hours_sub
- microseconds_add
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- now
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_date
- to_days
- unix_timestamp
- utc_timestamp
- week
- weekofyear
- weeks_add
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- Geographic Functions
- JSON Functions
- Overview of JSON functions and operators
- JSON operators
- JSON constructor functions
- JSON query and processing functions
- Math Functions
- String Functions
- Pattern Matching Functions
- Percentile Functions
- Scalar Functions
- Utility Functions
- cast function
- hash function
- System variables
- Error code
- System limits
- SQL Reference
- FAQ
- Benchmark
- Developers
- Contribute to StarRocks
- Code Style Guides
- Use the debuginfo file for debugging
- Development Environment
- Trace Tools
Continuously load data from Apache Flink®
This topic describes how to load data from Apache Flink® to StarRocks.
Overview
The flink-connector-jdbc tool provided by Apache Flink® may not meet your performance requirements in certain scenarios. Therefore we provide a new connector named flink-connector-starrocks, which can cache data and then load data at a time by using Stream Load.
Procedure
To load data from Apache Flink® into StarRocks by using flink-connector-starrocks, perform the following steps:
Download the source code of flink-connector-starrocks.
Find a file named pom.xml. Add the following code snippet to pom.xml and replace
x.x.x
in the code snippet with the latest version number of flink-connector-starrocks.<dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <!-- for flink-1.11, flink-1.12 --> <version>x.x.x_flink-1.11</version> <!-- for flink-1.13 --> <version>x.x.x_flink-1.13</version> </dependency>
Use one of the following methods to load data
Load data as raw JSON string streams.
// -------- sink with raw json string stream -------- fromElements(new String[]{ "{\"score\": \"99\", \"name\": \"stephen\"}", "{\"score\": \"100\", \"name\": \"lebron\"}" }).addSink( StarRocksSink.sink( // the sink options StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port,xxxxx") .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port") .withProperty("username", "xxx") .withProperty("password", "xxx") .withProperty("table-name", "xxx") .withProperty("database-name", "xxx") // Since 2.4, StarRocks support partial updates for Primary Key tables. You can specify the columns to be updated by configuring the following two properties. // The '__op' column must be specified at the end of 'sink.properties.columns'. // .withProperty("sink.properties.partial_update", "true") // .withProperty("sink.properties.columns", "k1,k2,k3,__op") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .build() ) ).setParallelism(1); // Define the parallelism of the sink. In the scenario of multiple paralel sinks, you need to guarantee the data order. // -------- sink with stream transformation -------- class RowData { public int score; public String name; public RowData(int score, String name) { ...... } } fromElements( new RowData[]{ new RowData(99, "stephen"), new RowData(100, "lebron") } ).addSink( StarRocksSink.sink( // the table structure TableSchema.builder() .field("score", DataTypes.INT()) .field("name", DataTypes.VARCHAR(20)) .build(), // the sink options StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port,xxxxx") .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port") .withProperty("username", "xxx") .withProperty("password", "xxx") .withProperty("table-name", "xxx") .withProperty("database-name", "xxx") // Since 2.4, StarRocks support partial updates for Primary Key tables. You can specify the columns to be updated by configuring the following two properties. // The '__op' column must be specified at the end of 'sink.properties.columns'. // .withProperty("sink.properties.partial_update", "true") // .withProperty("sink.properties.columns", "k1,k2,k3,__op") .withProperty("sink.properties.format", "csv") .withProperty("sink.properties.column_separator", "\\x01") .withProperty("sink.properties.row_delimiter", "\\x02") .build(), // set the slots with streamRowData (slots, streamRowData) -> { slots[0] = streamRowData.score; slots[1] = streamRowData.name; } ) );
Load data as tables.
// create a table with `structure` and `properties` // Needed: Add `com.starrocks.connector.flink.table.StarRocksDynamicTableSinkFactory` to: `src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory` tEnv.executeSql( "CREATE TABLE USER_RESULT(" + "name VARCHAR," + "score BIGINT" + ") WITH ( " + "'connector' = 'starrocks'," + "'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx'," + "'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port'," + "'database-name' = 'xxx'," + "'table-name' = 'xxx'," + "'username' = 'xxx'," + "'password' = 'xxx'," + "'sink.buffer-flush.max-rows' = '1000000'," + "'sink.buffer-flush.max-bytes' = '300000000'," + "'sink.buffer-flush.interval-ms' = '5000'," + // Since 2.4, StarRocks support partial updates for Primary Key tables. You can specify the columns to be updated by configuring the following two properties. // The '__op' column must be specified at the end of 'sink.properties.columns'. // "'sink.properties.partial_update' = 'true'," + // "'sink.properties.columns' = 'k1,k2,k3'," + "'sink.properties.column_separator' = '\\x01'," + "'sink.properties.row_delimiter' = '\\x02'," + "'sink.max-retries' = '3'," + // Stream load properties like `'sink.properties.columns' = 'k1, v1'` "'sink.properties.*' = 'xxx'," + // Define the parallelism of the sink. In the scenario of multiple paralel sinks, you need to guarantee the data order. "'sink.parallelism' = '1'" ")" );
The following table describes the sink
options that you can configure when you load data as tables.
Option | Required | Default value | Data type | Description |
---|---|---|---|---|
connector | Yes | NONE | STRING | The connector that you want to use. The value must be starrocks. |
jdbc-url | Yes | NONE | STRING | The URL that is used to query data from StarRocks. |
load-url | Yes | NONE | STRING | The URL that is used to load all data in a time. Format: fe_ip:http_port;fe_ip:http_port. |
database-name | Yes | NONE | STRING | The name of the StarRocks database into which you want to load data. |
table-name | Yes | NONE | STRING | The name of the table that you want to use to load data into StarRocks. |
username | Yes | NONE | STRING | The username of the account that you want to use to load data into StarRocks. |
password | Yes | NONE | STRING | The password of the preceding account. |
sink.semantic | No | at-least-once | STRING | The semantics that is supported by your sink. Valid values: at-least-once and exactly-once. If you specify the value as exactly-once, sink.buffer-flush.max-bytes , sink.buffer-flush.max-bytes , and sink.buffer-flush.interval-ms are invalid. |
sink.buffer-flush.max-bytes | No | 94371840(90M) | STRING | The maximum size of data that can be loaded into StarRocks at a time. Valid values: 64 MB to 10 GB. |
sink.buffer-flush.max-rows | No | 500000 | STRING | The maximum number of rows that can be loaded into StarRocks at a time. Valid values: 64000 to 5000000. |
sink.buffer-flush.interval-ms | No | 300000 | STRING | The interval at which data is flushed. Valid values: 1000 to 3600000. Unit: ms. |
sink.max-retries | No | 3 | STRING | The number of times that the system retries to perform the Stream Load. Valid values: 0 to 10. |
sink.connect.timeout-ms | No | 1000 | STRING | The period of time after which the stream load times out. Valid values: 100 to 60000. Unit: ms. |
sink.properties.* | No | NONE | STRING | The properties of the stream load. The properties include k1, k2, and k3. Since 2.4, the flink-connector-starrocks supports partial updates for Primary Key tables. |
Usage notes
When you load data from Apache Flink® into StarRocks, take note of the following points:
If you specify the exactly-once semantics, the two-phase commit (2PC) protocol must be supported to ensure efficient data loading. StarRocks does not support this protocol. Therefore we need to rely on Apache Flink® to achieve exactly-once. The overall process is as follows:
Save data and its label at each checkpoint that is completed at a specific checkpoint interval.
After data and labels are saved, block the flushing of data cached in the state at the first invoke after each checkpoint is completed.
If StarRocks unexpectedly exits, the operators for Apache Flink® sink streaming are blocked for a long time and Apache Flink® issues a monitoring alert or shuts down.
By default, data is loaded in the CSV format. You can set the
sink.properties.row_delimiter
parameter to\\x02
to specify a row separator and set thesink.properties.column_separator
parameter to\\x01
to specify a column separator.If data loading pauses, you can increase the memory of the Flink task.
If the preceding code runs as expected and StarRocks can receive data, but the data loading fails, check whether your machine can access the HTTP port of the backends (BEs) in your StarRocks cluster. If you can successfully ping the HTTP port returned by the execution of the SHOW BACKENDS command in your StarRocks cluster, your machine can access the HTTP port of the BEs in your StarRocks cluster. For example, a machine has a public IP address and a private IP address, the HTTP ports of frontends (FEs) and BEs can be accessed through the public IP address of the FEs and BEs, the IP address that is bounded with your StarRocks cluster is the private IP address, and the value of
loadurl
for the Flink task is the HTTP port of the public IP address of the FEs. The FEs forwards the data loading task to the private IP address of the BEs. In this example, if the machine cannot ping the private IP address of the BEs, the data loading fails.