- StarRocks
- Introduction to StarRocks
- Quick Start
- Table Design
- Data Loading
- Overview of data loading
- Load data from a local file system or a streaming data source using HTTP PUT
- Load data from HDFS or cloud storage
- Continuously load data from Apache Kafka®
- Bulk load using Apache Sparkâ„¢
- Load data using INSERT
- Load data using Stream Load transaction interface
- Realtime synchronization from MySQL
- Continuously load data from Apache Flink®
- Change data through loading
- Transform data at loading
- Data Unloading
- Query Data Sources
- Query Acceleration
- Gather CBO statistics
- Materialized view
- Colocate Join
- Lateral Join
- Query Cache
- Index
- Computing the Number of Distinct Values
- Administration
- Deployment
- Management
- Data Recovery
- User Privilege and Authentication
- Performance Tuning
- Reference
- SQL Reference
- Keywords
- User Account Management
- Cluster Management
- ADMIN CANCEL REPAIR TABLE
- ADMIN CHECK TABLET
- ADMIN REPAIR TABLE
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- CREATE RESOURCE GROUP
- DROP FILE
- DROP RESOURCE GROUP
- INSTALL PLUGIN
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROC
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW TABLE STATUS
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ALTER RESOURCE
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE EXTERNAL CATALOG
- CREATE DATABASE
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP ANALYZE
- DROP STATS
- DROP CATALOG
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW META
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- CANCEL REFRESH MATERIALIZED VIEW
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- RESUME ROUTINE LOAD
- CREATE ROUTINE LOAD
- SELECT
- SHOW ALTER TABLE
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEW
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- Auxiliary Commands
- Data Types
- Function Reference
- Java UDFs
- Window functions
- Lambda expression
- Aggregate Functions
- Array Functions
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- arrays_overlap
- array_to_bitmap
- reverse
- unnest
- Bit Functions
- Bitmap Functions
- base64_to_bitmap
- bitmap_and
- bitmap_andnot
- bitmap_contains
- bitmap_count
- bitmap_from_string
- bitmap_empty
- bitmap_has_any
- bitmap_hash
- bitmap_intersect
- bitmap_max
- bitmap_min
- bitmap_or
- bitmap_remove
- bitmap_to_array
- bitmap_to_base64
- bitmap_to_string
- bitmap_union
- bitmap_union_count
- bitmap_union_int
- bitmap_xor
- intersect_count
- sub_bitmap
- to_bitmap
- Conditional Functions
- Cryptographic Functions
- Date Functions
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_sub
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- now
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_date
- to_days
- unix_timestamp
- utc_timestamp
- week
- weekofyear
- weeks_add
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- Geographic Functions
- JSON Functions
- Overview of JSON functions and operators
- JSON operators
- JSON constructor functions
- JSON query and processing functions
- Map Functions
- Math Functions
- String Functions
- Pattern Matching Functions
- Percentile Functions
- Scalar Functions
- Utility Functions
- cast function
- hash function
- System variables
- User-defined variables
- Error code
- System limits
- SQL Reference
- FAQ
- Deploy
- Data Migration
- SQL
- Query Dump
- Other FAQs
- Benchmark
- Developers
- Contribute to StarRocks
- Development Environment
- Trace Tools
- Integrate with StarRocks
Read data from StarRocks using Flink connector
This topic describes how to use the source function of flink-connector-starrocks to read data from StarRocks.
If you need to use the sink function of flink-connector-starrocks to write data into StarRocks, see Flink connector in the Data Loading chapter.
Introduction
You can use the source function of flink-connector-starrocks to read data from StarRocks. Different from the Flink JDBC connector, flink-connector-starrocks can read data from multiple StarRocks backends (BEs) in parallel, which significantly improves data reading efficiency. Difference between the two connectors:
- flink-connector-starrocks: Flink first obtains the query plan from the frontend (FE), delivers the query plan as a parameter to BE nodes, and then obtains data results from BE nodes.
- Flink JDBC connector: Flink JDBC connector can only read data from the FE in a serial fashion. The data reading efficiency is low.
Procedure
Step 1: Install flink-connector-starrocks
- Select a flink-connector-starrocks version based on your Flink version and download the JAR package of flink-connector-starrocks.
If you need to debug code, you can select the corresponding branch code and compile the code.
2. Place the downloaded or compiled JAR package in the lib
directory of Flink.
3. Restart Flink.
Step 2: Use flink-connector-starrocks to read StarRocks data
The source function of flink-connector-starrocks cannot guarantee exactly-once semantics. If the reading task fails, you must repeat this step to create another reading task.
- If you use a Flink SQL client (recommended), you can read data from StarRocks by referring to the following command. For more information about the parameters in this command, see Parameter description.
-- Create a table in Flink based on the target StarRocks table and configure table properties (including information about flink-connector-starrocks, database, and table).
CREATE TABLE flink_test (
date_1 DATE,
datetime_1 TIMESTAMP(6),
char_1 CHAR(20),
varchar_1 VARCHAR,
boolean_1 BOOLEAN,
tinyint_1 TINYINT,
smallint_1 SMALLINT,
int_1 INT,
bigint_1 BIGINT,
largeint_1 STRING,
float_1 FLOAT,
double_1 DOUBLE,FLI
decimal_1 DECIMAL(27,9)
) WITH (
'connector'='starrocks',
'scan-url'='192.168.xxx.xxx:8030,192.168.xxx.xxx:8030',
'jdbc-url'='jdbc:mysql://192.168.xxx.xxx:9030',
'username'='root',
'password'='xxxxxx',
'database-name'='flink_test',
'table-name'='flink_test'
);
-- Execute an SQL statement to read data from StarRocks.
select date_1, smallint_1 from flink_test where char_1 `<>` 'A' and int_1 = -126;
Only some of the SQL statements can be used to read StarRocks data, such as
select ... from table_name where ...
. Aggregate functions except for COUNT are not supported.
Predicate pushdown is supported. Predicates can be automatically pushed down when you execute SQL statements, such as the filter conditions in the preceding example.
char_1 < > 'A' and int_1 = -126
will be pushed down to the connector and converted into a statement suitable for querying StarRocks data. Additional configuration is not required.
- If you use Flink DataStream, you must add a dependency before you use flash-connector-starrocks to read StarRocks data.
Add the following dependency to the pom.xml
file.
Replace x.x.x with the latest version number of flink-connector-starrocks. You can click version informationto obtain the latest version number.
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<!-- for flink-1.14 -->
<version>x.x.x_flink-1.14_2.11</version>
<version>x.x.x_flink-1.14_2.12</version>
<!-- for flink-1.13 -->
<version>x.x.x_flink-1.13_2.11</version>
<version>x.x.x_flink-1.13_2.12</version>
<!-- for flink-1.12 -->
<version>x.x.x_flink-1.12_2.11</version>
<version>x.x.x_flink-1.12_2.12</version>
<!-- for flink-1.11 -->
<version>x.x.x_flink-1.11_2.11</version>
<version>x.x.x_flink-1.11_2.12</version>
</dependency>
Use flink-connector-starrocks to read data from StarRocks by referring to the following sample code. The following table describes the parameters in these commands.
StarRocksSourceOptions options = StarRocksSourceOptions.builder()
.withProperty("scan-url", "192.168.xxx.xxx:8030,192.168.xxx.xxx:8030")
.withProperty("jdbc-url", "jdbc:mysql://192.168.xxx.xxx:9030")
.withProperty("username", "root")
.withProperty("password", "xxxxxx")
.withProperty("table-name", "flink_test")
.withProperty("database-name", "test")
.withProperty("cloumns", "char_1, date_1")
.withProperty("filters", "int_1 = 10")
.build();
TableSchema tableSchema = TableSchema.builder()
.field("date_1", DataTypes.DATE())
.field("datetime_1", DataTypes.TIMESTAMP(6))
.field("char_1", DataTypes.CHAR(20))
.field("varchar_1", DataTypes.STRING())
.field("boolean_1", DataTypes.BOOLEAN())
.field("tinyint_1", DataTypes.TINYINT())
.field("smallint_1", DataTypes.SMALLINT())
.field("int_1", DataTypes.INT())
.field("bigint_1", DataTypes.BIGINT())
.field("largeint_1", DataTypes.STRING())
.field("float_1", DataTypes.FLOAT())
.field("double_1", DataTypes.DOUBLE())
.field("decimal_1", DataTypes.DECIMAL(27, 9))
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(StarRocksSource.source(options, tableSchema)).setParallelism(5).print();
env.execute("StarRocks flink source");
Parameter description
Parameter | Required | Data type | Description |
---|---|---|---|
connector | Yes | String | The connector. Set the value to starrocks. |
scan-url | Yes | String | The scan URL of the FE node. The URL is used to access the FE node through the web server. The format is < FE IP address >: < FE HTTP port >. The port number defaults to 8030. Separate multiple addresses with commas, for example, 192.168.xxx.xxx:8030, 192.168.xxx.xxx:8030. |
jdbc-url | Yes | String | The JDBC URL of the FE node. This URL is used to access the MySQL client on the FE node. The format is jdbc:mysql://< FE IP address >:< FE query port >. The port number defaults to 9030. |
username | Yes | String | The username in StarRocks. The username must have read permissions to the target database and table. For more information, see User permissions. |
password | Yes | String | The password of the username. |
database-name | Yes | String | The name of the StarRocks database. |
table-name | Yes | String | The name of the StarRocks table. |
scan.connect.timeout-ms | No | String | The maximum duration for flink-connector-starrocks to connect to StarRocks, in milliseconds. The default value is 1000. If this duration is exceeded, the connection times out and an error occurs. |
scan.params.keep-alive-min | No | String | The keep-alive duration of the query task, in minutes. The default value is 10. we recommend that you set this parameter to a value greater than or equal to 5. |
scan.params.query-timeout-s | No | String | The timeout duration of the query task, in seconds. The default value is 600. If the query result is not returned within this duration, the query task is stopped. |
scan.params.mem-limit-byte | No | String | The maximum memory space allowed for a single query in the BE node, in bytes. The default value is 1073741824 (1 GB). |
scan.max-retries | No | String | The maximum number of retries when a query fails. The default value is 1. An error occurs if this value is exceeded. |
Data type mapping between Flink and StarRocks
The data type mapping in the following table applies only to reading StarRocks data from Flink. For the data type mapping for writing Flink data to StarRocks, see Flink connector in the Data Loading chapter.
StarRocks | Flink |
---|---|
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
LARGEINT | STRING |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR | CHAR |
VARCHAR | STRING |
What to do next
After data is read from StarRocks, you can use Flink WEBUI to observe the details of the reading task. For example, the Metrics page of Flink WEBUI displays the number of data rows that are read (totalScannedRows
).