- StarRocks
- Introduction to StarRocks
- Quick Start
- Deployment
- Deployment overview
- Prepare
- Deploy
- Deploy shared-nothing StarRocks
- Deploy and use shared-data StarRocks
- Manage
- Table Design
- Understand StarRocks table design
- Table types
- Data distribution
- Data compression
- Sort keys and prefix indexes
- Data Loading
- Concepts
- Overview of data loading
- Load data from a local file system or a streaming data source using HTTP PUT
- Load data from HDFS
- Load data from cloud storage
- Load data from Apache Kafka®
- Continuously load data from Apache Kafka®
- Load data from Apache Spark™
- Load data using INSERT
- Load data using Stream Load transaction interface
- Realtime synchronization from MySQL
- Continuously load data from Apache Flink®
- Change data through loading
- Transform data at loading
- Data Unloading
- Query Data Lakes
- Query Acceleration
- Gather CBO statistics
- Synchronous materialized views
- Asynchronous materialized views
- Colocate Join
- Lateral Join
- Query Cache
- Index
- Computing the Number of Distinct Values
- Sorted streaming aggregate
- Integrations
- Administration
- Management
- Data recovery
- User Privilege and Authentication
- Performance Tuning
- Reference
- SQL Reference
- User Account Management
- Cluster Management
- ADD SQLBLACKLIST
- ADMIN CANCEL REPAIR TABLE
- ADMIN CHECK TABLET
- ADMIN REPAIR TABLE
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER STORAGE VOLUME
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- CREATE RESOURCE GROUP
- CREATE STORAGE VOLUME
- DELETE SQLBLACKLIST
- DESC STORAGE VOLUME
- DROP FILE
- DROP RESOURCE GROUP
- DROP STORAGE VOLUME
- EXPLAIN
- INSTALL PLUGIN
- KILL
- SET
- SET DEFAULT STORAGE VOLUME
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROC
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW SQLBLACKLIST
- SHOW STORAGE VOLUMES
- SHOW TABLE STATUS
- SHOW VARIABLES
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ALTER RESOURCE
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE DATABASE
- CREATE EXTERNAL CATALOG
- CREATE FUNCTION
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE VIEW
- DROP ANALYZE
- DROP CATALOG
- DROP DATABASE
- DROP FUNCTION
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP STATS
- DROP TABLE
- DROP VIEW
- HLL
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SET CATALOG
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW FUNCTION
- SHOW META
- SHOW RESOURCES
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- CANCEL REFRESH MATERIALIZED VIEW
- CREATE ROUTINE LOAD
- DELETE
- DROP TASK
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- RESUME ROUTINE LOAD
- SELECT
- SHOW ALTER TABLE
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE CATALOG
- SHOW CREATE DATABASE
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEWS
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- SUBMIT TASK
- UPDATE
- Auxiliary Commands
- Data Types
- Keywords
- Function Reference
- Function list
- Java UDFs
- Window functions
- Lambda expression
- Aggregate Functions
- any_value
- approx_count_distinct
- array_agg
- avg
- bitmap
- bitmap_agg
- count
- corr
- covar_pop
- covar_samp
- group_concat
- grouping
- grouping_id
- hll_empty
- hll_hash
- hll_raw_agg
- hll_union
- hll_union_agg
- max
- max_by
- min
- min_by
- multi_distinct_sum
- multi_distinct_count
- percentile_approx
- percentile_cont
- percentile_disc
- retention
- stddev
- stddev_samp
- sum
- variance, variance_pop, var_pop
- var_samp
- window_funnel
- Array Functions
- all_match
- any_match
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_generate
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- arrays_overlap
- array_to_bitmap
- cardinality
- element_at
- reverse
- unnest
- Bit Functions
- Bitmap Functions
- base64_to_bitmap
- bitmap_agg
- bitmap_and
- bitmap_andnot
- bitmap_contains
- bitmap_count
- bitmap_from_string
- bitmap_empty
- bitmap_has_any
- bitmap_hash
- bitmap_intersect
- bitmap_max
- bitmap_min
- bitmap_or
- bitmap_remove
- bitmap_subset_in_range
- bitmap_subset_limit
- bitmap_to_array
- bitmap_to_base64
- bitmap_to_string
- bitmap_union
- bitmap_union_count
- bitmap_union_int
- bitmap_xor
- intersect_count
- sub_bitmap
- to_bitmap
- JSON Functions
- Overview of JSON functions and operators
- JSON operators
- JSON constructor functions
- JSON query and processing functions
- Map Functions
- Binary Functions
- Conditional Functions
- Cryptographic Functions
- Date Functions
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_diff
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_add
- hours_diff
- hours_sub
- last_day
- makedate
- microseconds_add
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- months_sub
- next_day
- now
- previous_day
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_date
- to_days
- unix_timestamp
- utc_timestamp
- week
- week_iso
- weekofyear
- weeks_add
- day_of_week_iso
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- Geographic Functions
- Math Functions
- String Functions
- append_trailing_char_if_absent
- ascii
- char
- char_length
- character_length
- concat
- concat_ws
- ends_with
- find_in_set
- group_concat
- hex
- hex_decode_binary
- hex_decode_string
- instr
- lcase
- left
- length
- locate
- lower
- lpad
- ltrim
- money_format
- null_or_empty
- parse_url
- repeat
- replace
- reverse
- right
- rpad
- rtrim
- space
- split
- split_part
- starts_with
- strleft
- strright
- str_to_map
- substring
- trim
- ucase
- unhex
- upper
- url_decode
- url_encode
- Pattern Matching Functions
- Percentile Functions
- Scalar Functions
- Struct Functions
- Table Functions
- Utility Functions
- cast function
- hash function
- AUTO_INCREMENT
- Generated columns
- System variables
- User-defined variables
- Error code
- System limits
- AWS IAM policies
- SQL Reference
- FAQ
- Benchmark
- Ecosystem Release Notes
- Developers
- Contribute to StarRocks
- Code Style Guides
- Use the debuginfo file for debugging
- Development Environment
- Trace Tools
Continuously load data from Apache Flink®
StarRocks provides a self-developed connector named Flink connector for Apache Flink® (Flink connector for short) to help you load data into a StarRocks table by using Flink. The basic principle is to accumulate the data and then load it all at a time into StarRocks through STREAM LOAD. The Flink connector supports DataStream API, Table API & SQL, and Python API. The Flink connector also has a higher performance than flink-connector-jdbc provided by Apache Flink®.
Version requirements
Connector | Flink | StarRocks | Java | Scala |
---|---|---|---|---|
1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 and later | 8 | 2.11,2.12 |
Obtain Flink connector
You can obtain the Flink connector JAR file in the following ways:
- Directly download the compiled Flink connector JAR file.
- Add the Flink connector as a dependency in your Maven project and then download the JAR file.
- Compile the source code of the Flink connector into a JAR file by yourself.
The naming format of the Flink connector JAR file is as follows:
Since Flink 1.15, it's
flink-connector-starrocks-${connector_version}_flink-${flink_version}.jar
. For example, if you install Flink 1.15 and you want to use Flink connector 1.2.7, you can useflink-connector-starrocks-1.2.7_flink-1.15.jar
.Prior to Flink 1.15, it's
flink-connector-starrocks-${connector_version}_flink-${flink_version}_${scala_version}.jar
. For example, if you install Flink 1.14 and Scala 2.12 in your environment, and you want to use Flink connector 1.2.7, you can useflink-connector-starrocks-1.2.7_flink-1.14_2.12.jar
.
NOTICE
In general, the latest version of the Flink connector only maintains compatibility with the three most recent versions of Flink.
Download the compiled Jar file
Directly download the corresponding version of the Flink connector Jar file from the Maven Central Repository.
Maven Dependency
In your Maven project's pom.xml
file, add the Flink connector as a dependency according to the following format. Replace flink_version
, scala_version
, and connector_version
with the respective versions.
In Flink 1.15 and later
<dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <version>${connector_version}_flink-${flink_version}</version> </dependency>
In versions earlier than Flink 1.15
<dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <version>${connector_version}_flink-${flink_version}_${scala_version}</version> </dependency>
Compile by yourself
Download the Flink connector package.
Execute the following command to compile the source code of Flink connector into a JAR file. Note that
flink_version
is replaced with the corresponding Flink version.sh build.sh <flink_version>
For example, if the Flink version in your environment is 1.15, you need to execute the following command:
sh build.sh 1.15
Go to the
target/
directory to find the Flink connector JAR file, such asflink-connector-starrocks-1.2.7_flink-1.15-SNAPSHOT.jar
, generated upon compilation.
NOTE
The name of Flink connector which is not formally released contains the
SNAPSHOT
suffix.
Options
Option | Required | Default value | Description |
---|---|---|---|
connector | Yes | NONE | The connector that you want to use. The value must be "starrocks". |
jdbc-url | Yes | NONE | The address that is used to connect to the MySQL server of the FE. You can specify multiple addresss, which must be separated by a comma (,). Format: jdbc:mysql://<fe_host1>:<fe_query_port1>,<fe_host2>:<fe_query_port2>,<fe_host3>:<fe_query_port3> . |
load-url | Yes | NONE | The HTTP URL of the FE in your StarRocks cluster. You can specify multiple URLs, which must be separated by a semicolon (;). Format: <fe_host1>:<fe_http_port1>;<fe_host2>:<fe_http_port2> . |
database-name | Yes | NONE | The name of the StarRocks database into which you want to load data. |
table-name | Yes | NONE | The name of the table that you want to use to load data into StarRocks. |
username | Yes | NONE | The username of the account that you want to use to load data into StarRocks. |
password | Yes | NONE | The password of the preceding account. |
sink.version | No | AUTO | The implementation to use. This parameter is supported from Flink connector version 1.2.4 onwards.
|
sink.label-prefix | No | NONE | The label prefix used by Stream Load. |
sink.semantic | No | at-least-once | The semantics that is supported by your sink. Valid values: at-least-once and exactly-once. |
sink.buffer-flush.max-bytes | No | 94371840(90M) | The maximum size of data that can be accumulated in memory before being sent to StarRocks at a time. The maximum value ranges from 64 MB to 10 GB. Setting this parameter to a larger value can improve loading performance but may increase loading latency. |
sink.buffer-flush.max-rows | No | 500000 | The maximum number of rows that can be accumulated in memory before being sent to StarRocks at a time. This parameter is available only when you set sink.version to V1 and set sink.semantic to at-least-once . Valid values: 64000 to 5000000. |
sink.buffer-flush.interval-ms | No | 300000 | The interval at which data is flushed. This parameter is available only when you set sink.semantic to at-least-once . Valid values: 1000 to 3600000. Unit: ms. |
sink.max-retries | No | 3 | The number of times that the system retries to perform the Stream Load job. This parameter is available only when you set sink.version to V1 . Valid values: 0 to 10. |
sink.connect.timeout-ms | No | 1000 | The timeout for establishing HTTP connection. Valid values: 100 to 60000. Unit: ms. |
sink.wait-for-continue.timeout-ms | No | 10000 | Supported since 1.2.7. The timeout for waiting response of HTTP 100-continue from the FE. Valid values: 3000 to 600000 . Unit: ms |
sink.ignore.update-before | No | true | Supported since version 1.2.8. Whether to ignore UPDATE_BEFORE records from Flink when loading data to Primary Key tables. If this parameter is set to false, the record is treated as a delete operation to StarRocks table. |
sink.properties.* | No | NONE | The parameters that are used to control Stream Load behavior. For example, the parameter sink.properties.format specifies the format used for Stream Load, such as CSV or JSON. For a list of supported parameters and their descriptions, see STREAM LOAD. |
sink.properties.format | No | csv | The format used for Stream Load. The Flink connector will transform each batch of data to the format before sending them to StarRocks. Valid values: csv and json . |
sink.properties.row_delimiter | No | \n | The row delimiter for CSV-formatted data. |
sink.properties.column_separator | No | \t | The column separator for CSV-formatted data. |
sink.properties.max_filter_ratio | No | 0 | The maximum error tolerance of the Stream Load. It's the maximum percentage of data records that can be filtered out due to inadequate data quality. Valid values: 0 to 1 . Default value: 0 . See Stream Load for details. |
sink.parallelism | No | NONE | The parallelism of the connector. Only available for Flink SQL. If not set, Flink planner will decide the parallelism. In the scenario of multi-parallelism, users need to guarantee data is written in the correct order. |
Data type mapping between Flink and StarRocks
Flink data type | StarRocks data type |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
BINARY | INT |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE(N) | DATETIME |
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N) | DATETIME |
ARRAY<T> | ARRAY<T> |
MAP<KT,VT> | JSON STRING |
ROW<arg T...> | JSON STRING |
Usage notes
Exactly Once
If you want sink to guarantee exactly-once semantics, we recommend you to upgrade StarRocks to 2.5 or later, and Flink connector to 1.2.4 or later
Since Flink connector 1.2.4, the exactly-once is redesigned based on Stream Load transaction interface provided by StarRocks since 2.4. Compared to the previous implementation based on non-transactional Stream Load non-transactional interface, the new implementation reduces memory usage and checkpoint overhead, thereby enhancing real-time performance and stability of loading.
If the version of StarRocks is earlier than 2.4 or the version of Flink connector is earlier than 1.2.4, the sink will automatically choose the implementation based on Stream Load non-transactional interface.
Configurations to guarantee exactly-once
The value of
sink.semantic
needs to beexactly-once
.If the version of Flink connector is 1.2.8 and later, it is recommended to specify the value of
sink.label-prefix
. Note that the label prefix must be unique among all types of loading in StarRocks, such as Flink jobs, Routine Load, and Broker Load.If the label prefix is specified, the Flink connector will use the label prefix to clean up lingering transactions that may be generated in some Flink failure scenarios, such as the Flink job fails when a checkpoint is still in progress. These lingering transactions are generally in
PREPARED
status if you useSHOW PROC '/transactions/<db_id>/running';
to view them in StarRocks. When the Flink job restores from checkpoint, the Flink connector will find these lingering transactions according to the label prefix and some information in checkpoint, and abort them. The Flink connector can not abort them when the Flink job exits because of the two-phase-commit mechanism to implement the exactly-once. When the Flink job exits, the Flink connector has not received the notification from Flink checkpoint coordinator whether the transactions should be included in a successful checkpoint, and it may lead to data loss if these transactions are aborted anyway. You can have an overview about how to achieve end-to-end exactly-once in Flink in this blogpost.If the label prefix is not specified, lingering transactions will be cleaned up by StarRocks only after they time out. However the number of running transactions can reach the limitation of StarRocks
max_running_txn_num_per_db
if Flink jobs fail frequently before transactions time out. The timeout length is controlled by StarRocks FE configurationprepared_transaction_default_timeout_second
whose default value is86400
(1 day). You can set a smaller value to it to make transactions expired faster when the label prefix is not specified.
If you are certain that the Flink job will eventually recover from checkpoint or savepoint after a long downtime because of stop or continuous failover, please adjust the following StarRocks configurations accordingly, to avoid data loss.
prepared_transaction_default_timeout_second
: StarRocks FE configuration, default value is86400
. The value of this configuration needs to be larger than the downtime of the Flink job. Otherwise, the lingering transactions that are included in a successful checkpoint may be aborted because of timeout before you restart the Flink job, which leads to data loss.Note that when you set a larger value to this configuration, it is better to specify the value of
sink.label-prefix
so that the lingering transactions can be cleaned according to the label prefix and some information in checkpoint, instead of due to timeout (which may cause data loss).label_keep_max_second
andlabel_keep_max_num
: StarRocks FE configurations, default values are259200
and1000
respectively. For details, see FE configurations. The value oflabel_keep_max_second
needs to be larger than the downtime of the Flink job. Otherwise, the Flink connector can not check the state of transactions in StarRocks by using the transaction labels saved in the Flink's savepoint or checkpoint and figure out whether these transactions are committed or not, which may eventually lead to data loss.
These configurations are mutable and can be modified by using
ADMIN SET FRONTEND CONFIG
:ADMIN SET FRONTEND CONFIG ("prepared_transaction_default_timeout_second" = "3600"); ADMIN SET FRONTEND CONFIG ("label_keep_max_second" = "259200"); ADMIN SET FRONTEND CONFIG ("label_keep_max_num" = "1000");
Flush Policy
The Flink connector will buffer the data in memory, and flush them in batch to StarRocks via Stream Load. How the flush is triggered is different between at-least-once and exactly-once.
For at-least-once, the flush will be triggered when any of the following conditions are met:
- the bytes of buffered rows reaches the limit
sink.buffer-flush.max-bytes
- the number of buffered rows reaches the limit
sink.buffer-flush.max-rows
. (Only valid for sink version V1) - the elapsed time since the last flush reaches the limit
sink.buffer-flush.interval-ms
- a checkpoint is triggered
For exactly-once, the flush only happens when a checkpoint is triggered.
Monitoring load metrics
The Flink connector provides the following metrics to monitor loading.
Metric | Type | Description |
---|---|---|
totalFlushBytes | counter | successfully flushed bytes. |
totalFlushRows | counter | number of rows successfully flushed. |
totalFlushSucceededTimes | counter | number of times that the data is successfully flushed. |
totalFlushFailedTimes | counter | number of times that the data fails to be flushed. |
totalFilteredRows | counter | number of rows filtered, which is also included in totalFlushRows. |
Examples
The following examples show how to use the Flink connector to load data into a StarRocks table with Flink SQL or Flink DataStream.
Preparations
Create a StarRocks table
Create a database test
and create a Primary Key table score_board
.
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
Set up Flink environment
Download Flink binary Flink 1.15.2, and unzip it to directory
flink-1.15.2
.Download Flink connector 1.2.7, and put it into the directory
flink-1.15.2/lib
.Run the following commands to start a Flink cluster:
cd flink-1.15.2 ./bin/start-cluster.sh
Run with Flink SQL
Run the following command to start a Flink SQL client.
./bin/sql-client.sh
Create a Flink table
score_board
, and insert values into the table via Flink SQL Client.
Note you must define the primary key in the Flink DDL if you want to load data into a Primary Key table of StarRocks. It's optional for other types of StarRocks tables.
```SQL
CREATE TABLE `score_board` (
`id` INT,
`name` STRING,
`score` INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
'load-url' = '127.0.0.1:8030',
'database-name' = 'test',
'table-name' = 'score_board',
'username' = 'root',
'password' = ''
);
INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100);
```
Run with Flink DataStream
There are several ways to implement a Flink DataStream job according to the type of the input records, such as a CSV Java String
, a JSON Java String
or a custom Java object.
The input records are CSV-format
String
. See LoadCsvRecords for a complete example./** * Generate CSV-format records. Each record has three values separated by "\t". * These values will be loaded to the columns `id`, `name`, and `score` in the StarRocks table. */ String[] records = new String[]{ "1\tstarrocks-csv\t100", "2\tflink-csv\t100" }; DataStream<String> source = env.fromElements(records); /** * Configure the connector with the required properties. * You also need to add properties "sink.properties.format" and "sink.properties.column_separator" * to tell the connector the input records are CSV-format, and the column separator is "\t". * You can also use other column separators in the CSV-format records, * but remember to modify the "sink.properties.column_separator" correspondingly. */ StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", jdbcUrl) .withProperty("load-url", loadUrl) .withProperty("database-name", "test") .withProperty("table-name", "score_board") .withProperty("username", "root") .withProperty("password", "") .withProperty("sink.properties.format", "csv") .withProperty("sink.properties.column_separator", "\t") .build(); // Create the sink with the options. SinkFunction<String> starRockSink = StarRocksSink.sink(options); source.addSink(starRockSink);
The input records are JSON-format
String
. See LoadJsonRecords for a complete example./** * Generate JSON-format records. * Each record has three key-value pairs corresponding to the columns `id`, `name`, and `score` in the StarRocks table. */ String[] records = new String[]{ "{\"id\":1, \"name\":\"starrocks-json\", \"score\":100}", "{\"id\":2, \"name\":\"flink-json\", \"score\":100}", }; DataStream<String> source = env.fromElements(records); /** * Configure the connector with the required properties. * You also need to add properties "sink.properties.format" and "sink.properties.strip_outer_array" * to tell the connector the input records are JSON-format and to strip the outermost array structure. */ StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", jdbcUrl) .withProperty("load-url", loadUrl) .withProperty("database-name", "test") .withProperty("table-name", "score_board") .withProperty("username", "root") .withProperty("password", "") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .build(); // Create the sink with the options. SinkFunction<String> starRockSink = StarRocksSink.sink(options); source.addSink(starRockSink);
The input records are custom Java objects. See LoadCustomJavaRecords for a complete example.
In this example, the input record is a simple POJO
RowData
.public static class RowData { public int id; public String name; public int score; public RowData() {} public RowData(int id, String name, int score) { this.id = id; this.name = name; this.score = score; } }
The main program is as follows:
// Generate records which use RowData as the container. RowData[] records = new RowData[]{ new RowData(1, "starrocks-rowdata", 100), new RowData(2, "flink-rowdata", 100), }; DataStream<RowData> source = env.fromElements(records); // Configure the connector with the required properties. StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", jdbcUrl) .withProperty("load-url", loadUrl) .withProperty("database-name", "test") .withProperty("table-name", "score_board") .withProperty("username", "root") .withProperty("password", "") .build(); /** * The connector will use a Java object array (Object[]) to represent a row to be loaded into the StarRocks table, * and each element is the value for a column. * You need to define the schema of the Object[] which matches that of the StarRocks table. */ TableSchema schema = TableSchema.builder() .field("id", DataTypes.INT().notNull()) .field("name", DataTypes.STRING()) .field("score", DataTypes.INT()) // When the StarRocks table is a Primary Key table, you must specify notNull(), for example, DataTypes.INT().notNull(), for the primary key `id`. .primaryKey("id") .build(); // Transform the RowData to the Object[] according to the schema. RowDataTransformer transformer = new RowDataTransformer(); // Create the sink with the schema, options, and transformer. SinkFunction<RowData> starRockSink = StarRocksSink.sink(schema, options, transformer); source.addSink(starRockSink);
The
RowDataTransformer
in the main program is defined as follows:private static class RowDataTransformer implements StarRocksSinkRowBuilder<RowData> { /** * Set each element of the object array according to the input RowData. * The schema of the array matches that of the StarRocks table. */ @Override public void accept(Object[] internalRow, RowData rowData) { internalRow[0] = rowData.id; internalRow[1] = rowData.name; internalRow[2] = rowData.score; // When the StarRocks table is a Primary Key table, you need to set the last element to indicate whether the data loading is an UPSERT or DELETE operation. internalRow[internalRow.length - 1] = StarRocksSinkOP.UPSERT.ordinal(); } }
Best practices
Load data to a Primary Key table
This section will show how to load data to a StarRocks Primary Key table to achieve partial updates and conditional updates. You can see Change data through loading for the introduction of those features. These examples use Flink SQL.
Preparations
Create a database test
and create a Primary Key table score_board
in StarRocks.
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
Partial update
This example will show how to load data only to columns id
and name
.
Insert two data rows into the StarRocks table
score_board
in MySQL client.mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100); mysql> select * from score_board; +------+-----------+-------+ | id | name | score | +------+-----------+-------+ | 1 | starrocks | 100 | | 2 | flink | 100 | +------+-----------+-------+ 2 rows in set (0.02 sec)
Create a Flink table
score_board
in Flink SQL client.Define the DDL which only includes the columns
id
andname
.Set the option
sink.properties.partial_update
totrue
which tells the Flink connector to perform partial updates.If the Flink connector version <= 1.2.7, you also need to set the option
sink.properties.columns
toid,name,__op
to tells the Flink connector which columns need to be updated. Note that you need to append the field__op
at the end. The field__op
indicates that the data loading is an UPSERT or DELETE operation, and its values are set by the connector automatically.CREATE TABLE `score_board` ( `id` INT, `name` STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', 'load-url' = '127.0.0.1:8030', 'database-name' = 'test', 'table-name' = 'score_board', 'username' = 'root', 'password' = '', 'sink.properties.partial_update' = 'true', -- only for Flink connector version <= 1.2.7 'sink.properties.columns' = 'id,name,__op' );
Insert two data rows into the Flink table. The primary keys of the data rows are as same as these of rows in the StarRocks table. but the values in the column
name
are modified.INSERT INTO `score_board` VALUES (1, 'starrocks-update'), (2, 'flink-update');
Query the StarRocks table in MySQL client.
mysql> select * from score_board; +------+------------------+-------+ | id | name | score | +------+------------------+-------+ | 1 | starrocks-update | 100 | | 2 | flink-update | 100 | +------+------------------+-------+ 2 rows in set (0.02 sec)
You can see that only values for
name
change, and the values forscore
do not change.
Conditional update
This example will show how to do conditional update according to the value of column score
. The update for an id
takes effect only when the new value for score
is has a greater or equal to the old value.
Insert two data rows into the StarRocks table in MySQL client.
mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'flink', 100); mysql> select * from score_board; +------+-----------+-------+ | id | name | score | +------+-----------+-------+ | 1 | starrocks | 100 | | 2 | flink | 100 | +------+-----------+-------+ 2 rows in set (0.02 sec)
Create a Flink table
score_board
in the following ways:- Define the DDL including all of columns.
- Set the option
sink.properties.merge_condition
toscore
to tell the connector to use the columnscore
as the condition. - Set the option
sink.version
toV1
which tells the connector to use Stream Load.
CREATE TABLE `score_board` ( `id` INT, `name` STRING, `score` INT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', 'load-url' = '127.0.0.1:8030', 'database-name' = 'test', 'table-name' = 'score_board', 'username' = 'root', 'password' = '', 'sink.properties.merge_condition' = 'score' );
Insert two data rows into the Flink table. The primary keys of the data rows are as same as these of rows in the StarRocks table. The first data row has a smaller value in the column
score
, and the second data row has a larger value in the columnscore
.INSERT INTO `score_board` VALUES (1, 'starrocks', 99), (2, 'flink', 101);
Query the StarRocks table in MySQL client.
mysql> select * from score_board; +------+-----------+-------+ | id | name | score | +------+-----------+-------+ | 1 | starrocks | 100 | | 2 | flink | 101 | +------+-----------+-------+ 2 rows in set (0.02 sec)
You can see that only the value of score
in the second data row changes, and the value of score
in the first data row does not change.
Load data into columns of BITMAP type
BITMAP
is often used to accelerate count distinct, such as counting UV, see Use Bitmap for exact Count Distinct.
Here we take the counting of UV as an example to show how to load data into columns of the BITMAP
type.
Create a StarRocks Aggregate table in MySQL client.
In the database
test
, create an Aggregate tablepage_uv
where the columnvisit_users
is defined as theBITMAP
type and configured with the aggregate functionBITMAP_UNION
.CREATE TABLE `test`.`page_uv` ( `page_id` INT NOT NULL COMMENT 'page ID', `visit_date` datetime NOT NULL COMMENT 'access time', `visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID' ) ENGINE=OLAP AGGREGATE KEY(`page_id`, `visit_date`) DISTRIBUTED BY HASH(`page_id`);
Create a Flink table in Flink SQL client.
The column
visit_user_id
in the Flink table is ofBIGINT
type, and we want to load this column to the columnvisit_users
ofBITMAP
type in the StarRocks table. So when defining the DDL of the Flink table, note that:Because Flink does not support
BITMAP
, you need to define a columnvisit_user_id
asBIGINT
type to represent the columnvisit_users
ofBITMAP
type in the StarRocks table.You need to set the option
sink.properties.columns
topage_id,visit_date,user_id,visit_users=to_bitmap(visit_user_id)
, which tells the connector the column mapping beween the Flink table and StarRocks table. Also you need to useto_bitmap
function to tell the connector to convert the data ofBIGINT
type intoBITMAP
type.CREATE TABLE `page_uv` ( `page_id` INT, `visit_date` TIMESTAMP, `visit_user_id` BIGINT ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', 'load-url' = '127.0.0.1:8030', 'database-name' = 'test', 'table-name' = 'page_uv', 'username' = 'root', 'password' = '', 'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=to_bitmap(visit_user_id)' );
Load data into Flink table in Flink SQL client.
INSERT INTO `page_uv` VALUES (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13), (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23), (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33), (1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13), (2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23);
Calculate page UVs from the StarRocks table in MySQL client.
MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `page_uv` GROUP BY `page_id`; +---------+-----------------------------+ | page_id | count(DISTINCT visit_users) | +---------+-----------------------------+ | 2 | 1 | | 1 | 3 | +---------+-----------------------------+ 2 rows in set (0.05 sec)
Load data into columns of HLL type
HLL
can be used for approximate count distinct, see Use HLL for approximate count distinct.
Here we take the counting of UV as an example to show how to load data into columns of the HLL
type.
Create a StarRocks Aggregate table
In the database
test
, create an Aggregate tablehll_uv
where the columnvisit_users
is defined as theHLL
type and configured with the aggregate functionHLL_UNION
.CREATE TABLE `hll_uv` ( `page_id` INT NOT NULL COMMENT 'page ID', `visit_date` datetime NOT NULL COMMENT 'access time', `visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID' ) ENGINE=OLAP AGGREGATE KEY(`page_id`, `visit_date`) DISTRIBUTED BY HASH(`page_id`);
Create a Flink table in Flink SQL client.
The column
visit_user_id
in the Flink table is ofBIGINT
type, and we want to load this column to the columnvisit_users
ofHLL
type in the StarRocks table. So when defining the DDL of the Flink table, note that:Because Flink does not support
BITMAP
, you need to define a columnvisit_user_id
asBIGINT
type to represent the columnvisit_users
ofHLL
type in the StarRocks table.You need to set the option
sink.properties.columns
topage_id,visit_date,user_id,visit_users=hll_hash(visit_user_id)
which tells the connector the column mapping between Flink table and StarRocks table. Also you need to usehll_hash
function to tell the connector to convert the data ofBIGINT
type intoHLL
type.CREATE TABLE `hll_uv` ( `page_id` INT, `visit_date` TIMESTAMP, `visit_user_id` BIGINT ) WITH ( 'connector' = 'starrocks', 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030', 'load-url' = '127.0.0.1:8030', 'database-name' = 'test', 'table-name' = 'hll_uv', 'username' = 'root', 'password' = '', 'sink.properties.columns' = 'page_id,visit_date,visit_user_id,visit_users=hll_hash(visit_user_id)' );
Load data into Flink table in Flink SQL client.
INSERT INTO `hll_uv` VALUES (3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78), (4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2), (3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674);
Calculate page UVs from the StarRocks table in MySQL client.
mysql> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`; **+---------+-----------------------------+ | page_id | count(DISTINCT visit_users) | +---------+-----------------------------+ | 3 | 2 | | 4 | 1 | +---------+-----------------------------+ 2 rows in set (0.04 sec)