- StarRocks
- Introduction to StarRocks
- Quick Start
- Deployment
- Deployment overview
- Prepare
- Deploy
- Deploy classic StarRocks
- Deploy and use shared-data StarRocks
- Manage
- Table Design
- Data Loading
- Concepts
- Overview of data loading
- Load data from a local file system or a streaming data source using HTTP PUT
- Load data from HDFS or cloud storage
- Continuously load data from Apache Kafka®
- Bulk load using Apache Spark™
- Load data using INSERT
- Load data using Stream Load transaction interface
- Realtime synchronization from MySQL
- Continuously load data from Apache Flink®
- Change data through loading
- Transform data at loading
- Data Unloading
- Query Data Sources
- Query Acceleration
- Gather CBO statistics
- Synchronous materialized view
- Asynchronous materialized view
- Colocate Join
- Lateral Join
- Query Cache
- Index
- Computing the Number of Distinct Values
- Sorted streaming aggregate
- Administration
- Management
- Data recovery
- User Privilege and Authentication
- Performance Tuning
- Reference
- SQL Reference
- User Account Management
- Cluster Management
- ADD SQLBLACKLIST
- ADMIN CANCEL REPAIR TABLE
- ADMIN CHECK TABLET
- ADMIN REPAIR TABLE
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- CREATE RESOURCE GROUP
- DELETE SQLBLACKLIST
- DROP FILE
- DROP RESOURCE GROUP
- EXPLAIN
- INSTALL PLUGIN
- KILL
- SET
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROC
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW SQLBLACKLIST
- SHOW TABLE STATUS
- SHOW VARIABLES
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ALTER RESOURCE
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE DATABASE
- CREATE EXTERNAL CATALOG
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP ANALYZE
- DROP STATS
- DROP CATALOG
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SET CATALOG
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW META
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- CANCEL REFRESH MATERIALIZED VIEW
- CREATE ROUTINE LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- RESUME ROUTINE LOAD
- SELECT
- SHOW ALTER TABLE
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE CATALOG
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEWS
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- SUBMIT TASK
- UPDATE
- Auxiliary Commands
- Data Types
- Keywords
- AUTO_INCREMENT
- Function Reference
- Java UDFs
- Window functions
- Lambda expression
- Aggregate Functions
- array_agg
- avg
- any_value
- approx_count_distinct
- bitmap
- bitmap_agg
- count
- grouping
- grouping_id
- hll_empty
- hll_hash
- hll_raw_agg
- hll_union
- hll_union_agg
- max
- max_by
- min
- multi_distinct_sum
- multi_distinct_count
- percentile_approx
- percentile_cont
- percentile_disc
- retention
- stddev
- stddev_samp
- sum
- variance, variance_pop, var_pop
- var_samp
- window_funnel
- Array Functions
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- arrays_overlap
- array_to_bitmap
- cardinality
- element_at
- reverse
- unnest
- Bit Functions
- Bitmap Functions
- base64_to_bitmap
- bitmap_agg
- bitmap_and
- bitmap_andnot
- bitmap_contains
- bitmap_count
- bitmap_from_string
- bitmap_empty
- bitmap_has_any
- bitmap_hash
- bitmap_intersect
- bitmap_max
- bitmap_min
- bitmap_or
- bitmap_remove
- bitmap_to_array
- bitmap_to_base64
- bitmap_to_string
- bitmap_union
- bitmap_union_count
- bitmap_union_int
- bitmap_xor
- intersect_count
- sub_bitmap
- to_bitmap
- JSON Functions
- Overview of JSON functions and operators
- JSON operators
- JSON constructor functions
- JSON query and processing functions
- Map Functions
- Binary Functions
- Conditional Functions
- Cryptographic Functions
- Date Functions
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_add
- hours_diff
- hours_sub
- microseconds_add
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- months_sub
- now
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_date
- to_days
- unix_timestamp
- utc_timestamp
- week
- week_iso
- weekofyear
- weeks_add
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- Geographic Functions
- Math Functions
- String Functions
- append_trailing_char_if_absent
- ascii
- char
- char_length
- character_length
- concat
- concat_ws
- ends_with
- find_in_set
- group_concat
- hex
- hex_decode_binary
- hex_decode_string
- instr
- lcase
- left
- length
- locate
- lower
- lpad
- ltrim
- money_format
- null_or_empty
- parse_url
- repeat
- replace
- reverse
- right
- rpad
- rtrim
- space
- split
- split_part
- starts_with
- strleft
- strright
- substring
- trim
- ucase
- unhex
- upper
- Pattern Matching Functions
- Percentile Functions
- Scalar Functions
- Utility Functions
- cast function
- hash function
- System variables
- User-defined variables
- Error code
- System limits
- SQL Reference
- FAQ
- Benchmark
- Developers
- Contribute to StarRocks
- Code Style Guides
- Use the debuginfo file for debugging
- Development Environment
- Trace Tools
- Integration
Read data from StarRocks using Flink connector
StarRocks provides a self-developed connector named StarRocks Connector for Apache Flink® (Flink connector for short) to help you read data in bulk from a StarRocks cluster by using Flink.
The Flink connector supports two reading methods: Flink SQL and Flink DataStream. Flink SQL is recommended.
NOTE
The Flink connector also supports writing the data read by Flink to another StarRocks cluster or storage system. See Continuously load data from Apache Flink®.
Background information
Unlike the JDBC connector provided by Flink, the Flink connector of StarRocks supports reading data from multiple BEs of your StarRocks cluster in parallel, greatly accelerating read tasks. The following comparison shows the difference in implementation between the two connectors.
Flink connector of StarRocks
With the Flink connector of StarRocks, Flink can first obtain the query plan from the responsible FE, then distribute the obtained query plan as parameters to all the involved BEs, and finally obtain the data returned by the BEs.
JDBC connector of Flink
With the JDBC connector of Flink, Flink can only read data from individual FEs, one at a time. Data reads are slow.
Prerequisites
Flink has been deployed. If Flink has not been deployed, follow these steps to deploy it:
Install Java 8 or Java 11 in your operating system to ensure Flink can run properly. You can use the following command to check the version of your Java installation:
java -version
For example, if the following information is returned, Java 8 has been installed:
openjdk version "1.8.0_322" OpenJDK Runtime Environment (Temurin)(build 1.8.0_322-b06) OpenJDK 64-Bit Server VM (Temurin)(build 25.322-b06, mixed mode)
Download and unzip the Flink package of your choice.
NOTE
We recommend that you use Flink v1.14 or later. The minimum Flink version supported is v1.11.
# Download the Flink package. wget https://dlcdn.apache.org/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz # Unzip the Flink package. tar -xzf flink-1.14.5-bin-scala_2.11.tgz # Go to the Flink directory. cd flink-1.14.5
Start your Flink cluster.
# Start your Flink cluster. ./bin/start-cluster.sh # When the following information is displayed, your Flink cluster has successfully started: Starting cluster. Starting standalonesession daemon on host. Starting taskexecutor daemon on host.
You can also deploy Flink by following the instructions provided in Flink documentation.
Before you begin
Follow these steps to deploy the Flink connector:
Select and download the flink-connector-starrocks JAR package matching the Flink version that you are using.
NOTICE
We recommend that you download the Flink connector package whose version is 1.2.x or later and whose matching Flink version has the same first two digits as the Flink version that you are using. For example, if you use Flink v1.14.x, you can download
flink-connector-starrocks-1.2.4_flink-1.14_x.yy.jar
.If code debugging is needed, compile the Flink connector package to suit your business requirements.
Place the Flink connector package you downloaded or compiled into the
lib
directory of Flink.Restart your Flink cluster.
Parameters
Common parameters
The following parameters apply to both the Flink SQL and Flink DataStream reading methods.
Parameter | Required | Data type | Description |
---|---|---|---|
connector | Yes | STRING | The type of connector that you want to use to read data. Set the value to starrocks . |
scan-url | Yes | STRING | The address that is used to connect the FE from the web server. Format: <fe_host>:<fe_http_port> . The default port is 8030 . You can specify multiple addresses, which must be separated with a comma (,). Example: 192.168.xxx.xxx:8030,192.168.xxx.xxx:8030 . |
jdbc-url | Yes | STRING | The address that is used to connect the MySQL client of the FE. Format: jdbc:mysql://<fe_host>:<fe_query_port> . The default port number is 9030 . |
username | Yes | STRING | The username of your StarRocks cluster account. The account must have read permissions on the StarRocks table you want to read. See User privileges. |
password | Yes | STRING | The password of your StarRocks cluster account. |
database-name | Yes | STRING | The name of the StarRocks database to which the StarRocks table you want to read belongs. |
table-name | Yes | STRING | The name of the StarRocks table you want to read. |
scan.connect.timeout-ms | No | STRING | The maximum amount of time after which the connection from the Flink connector to your StarRocks cluster times out. Unit: milliseconds. Default value: 1000 . If the amount of time taken to establish the connection exceeds this limit, the read task fails. |
scan.params.keep-alive-min | No | STRING | The maximum amount of time during which the read task keeps alive. The keep-alive time is checked on a regular basis by using a polling mechanism. Unit: minutes. Default value: 10 . We recommend that you set this parameter to a value that is greater than or equal to 5 . |
scan.params.query-timeout-s | No | STRING | The maximum amount of time after which the read task times out. The timeout duration is checked during task execution. Unit: seconds. Default value: 600 . If no read result is returned after the time duration elapses, the read task stops. |
scan.params.mem-limit-byte | No | STRING | The maximum amount of memory allowed per query on each BE. Unit: bytes. Default value: 1073741824 , equal to 1 GB. |
scan.max-retries | No | STRING | The maximum number of times that the read task can be retried upon failures. Default value: 1 . If the number of times that the read task is retried exceeds this limit, the read task returns errors. |
Parameters for Flink DataStream
The following parameters apply only to the Flink DataStream reading method.
Parameter | Required | Data type | Description |
---|---|---|---|
scan.columns | No | STRING | The column that you want to read. You can specify multiple columns, which must be separated by a comma (,). |
scan.filter | No | STRING | The filter condition based on which you want to filter data. |
Assume that in Flink you create a table that consists of three columns, which are c1
, c2
, c3
. To read the rows whose values in the c1
column of this Flink table are equal to 100
, you can specify two filter conditions "scan.columns, "c1"
and "scan.filter, "c1 = 100"
.
Data type mapping between StarRocks and Flink
The following data type mapping is valid only for Flink reading data from StarRocks. For the data type mapping used for Flink writing data into StarRocks, see Continuously load data from Apache Flink®.
StarRocks | Flink |
---|---|
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
LARGEINT | STRING |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR | CHAR |
VARCHAR | STRING |
Examples
The following examples assume you have created a database named test
in your StarRocks cluster and you have the permissions of user root
.
NOTE
If a read task fails, you must re-create it.
Data example
Go to the
test
database and create a table namedscore_board
.MySQL [test]> CREATE TABLE `score_board` ( `id` int(11) NOT NULL COMMENT "", `name` varchar(65533) NULL DEFAULT "" COMMENT "", `score` int(11) NOT NULL DEFAULT "0" COMMENT "" ) ENGINE=OLAP PRIMARY KEY(`id`) COMMENT "OLAP" DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_num" = "3" );
Insert data into the
score_board
table.MySQL [test]> INSERT INTO score_board VALUES (1, 'Bob', 21), (2, 'Stan', 21), (3, 'Sam', 22), (4, 'Tony', 22), (5, 'Alice', 22), (6, 'Lucy', 23), (7, 'Polly', 23), (8, 'Tom', 23), (9, 'Rose', 24), (10, 'Jerry', 24), (11, 'Jason', 24), (12, 'Lily', 25), (13, 'Stephen', 25), (14, 'David', 25), (15, 'Eddie', 26), (16, 'Kate', 27), (17, 'Cathy', 27), (18, 'Judy', 27), (19, 'Julia', 28), (20, 'Robert', 28), (21, 'Jack', 29);
Query the
score_board
table.MySQL [test]> SELECT * FROM score_board; +------+---------+-------+ | id | name | score | +------+---------+-------+ | 1 | Bob | 21 | | 2 | Stan | 21 | | 3 | Sam | 22 | | 4 | Tony | 22 | | 5 | Alice | 22 | | 6 | Lucy | 23 | | 7 | Polly | 23 | | 8 | Tom | 23 | | 9 | Rose | 24 | | 10 | Jerry | 24 | | 11 | Jason | 24 | | 12 | Lily | 25 | | 13 | Stephen | 25 | | 14 | David | 25 | | 15 | Eddie | 26 | | 16 | Kate | 27 | | 17 | Cathy | 27 | | 18 | Judy | 27 | | 19 | Julia | 28 | | 20 | Robert | 28 | | 21 | Jack | 29 | +------+---------+-------+ 21 rows in set (0.00 sec)
Read data using Flink SQL
In your Flink cluster, create a table named
flink_test
based on the schema of the source StarRocks table (which isscore_board
in this example). In the table creation command, you must configure the read task properties, including the information about the Flink connector, the source StarRock database, and the source StarRocks table.CREATE TABLE flink_test ( `id` INT, `name` STRING, `score` INT ) WITH ( 'connector'='starrocks', 'scan-url'='192.168.xxx.xxx:8030', 'jdbc-url'='jdbc:mysql://192.168.xxx.xxx:9030', 'username'='xxxxxx', 'password'='xxxxxx', 'database-name'='test', 'table-name'='score_board' );
Use SELECT to read data from StarRocks.
SELECT id, name FROM flink_test WHERE score > 20;
When you read data by using Flink SQL, take note of the following points:
- You can use only SQL statements like
SELECT ... FROM <table_name> WHERE ...
to read data from StarRocks. Of all aggregate functions, onlycount
is supported. - Predicate pushdown is supported. For example, if your query contains a filter condition
char_1 <> 'A' and int_1 = -126
, the filter condition will be pushed down to the Flink connector and transformed into a statement that can be executed by StarRocks before the query is run. You do not need to perform extra configurations. - The LIMIT statement is not supported.
- StarRocks does not support the checkpointing mechanism. As a result, data consistency cannot be guaranteed if the read task fails.
Read data using Flink DataStream
Add the following dependencies to the
pom.xml
file:<dependency> <groupId>com.starrocks</groupId> <artifactId>flink-connector-starrocks</artifactId> <!-- for Apache Flink® 1.15 --> <version>x.x.x_flink-1.15</version> <!-- for Apache Flink® 1.14 --> <version>x.x.x_flink-1.14_2.11</version> <version>x.x.x_flink-1.14_2.12</version> <!-- for Apache Flink® 1.13 --> <version>x.x.x_flink-1.13_2.11</version> <version>x.x.x_flink-1.13_2.12</version> <!-- for Apache Flink® 1.12 --> <version>x.x.x_flink-1.12_2.11</version> <version>x.x.x_flink-1.12_2.12</version> <!-- for Apache Flink® 1.11 --> <version>x.x.x_flink-1.11_2.11</version> <version>x.x.x_flink-1.11_2.12</version> </dependency>
You must replace
x.x.x
in the preceding code example with the latest Flink connector version that you are using. See Version information.Call the Flink connector to read data from StarRocks:
import com.starrocks.connector.flink.StarRocksSource; import com.starrocks.connector.flink.table.source.StarRocksSourceOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; public class StarRocksSourceApp { public static void main(String[] args) throws Exception { StarRocksSourceOptions options = StarRocksSourceOptions.builder() .withProperty("scan-url", "192.168.xxx.xxx:8030") .withProperty("jdbc-url", "jdbc:mysql://192.168.xxx.xxx:9030") .withProperty("username", "root") .withProperty("password", "") .withProperty("table-name", "score_board") .withProperty("database-name", "test") .build(); TableSchema tableSchema = TableSchema.builder() .field("id", DataTypes.INT()) .field("name", DataTypes.STRING()) .field("score", DataTypes.INT()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(StarRocksSource.source(tableSchema, options)).setParallelism(5).print(); env.execute("StarRocks flink source"); } }
What's next
After Flink successfully reads data from StarRocks, you can use the Flink WebUI to monitor the read task. For example, you can view the totalScannedRows
metric on the Metrics page of the WebUI to obtain the number of rows that are successfully read. You can also use Flink SQL to perform calculations such as joins on the data you have read.