- StarRocks
- Introduction to StarRocks
- Quick Start
- Deployment
- Deployment overview
- Prepare
- Deploy
- Deploy shared-nothing StarRocks
- Deploy and use shared-data StarRocks
- Manage
- Table Design
- Understand StarRocks table design
- Table types
- Data distribution
- Data compression
- Sort keys and prefix indexes
- Data Loading
- Concepts
- Overview of data loading
- Load data from a local file system or a streaming data source using HTTP PUT
- Load data from HDFS
- Load data from cloud storage
- Load data from Apache Kafka®
- Continuously load data from Apache Kafka®
- Load data from Apache Spark™
- Load data using INSERT
- Load data using Stream Load transaction interface
- Realtime synchronization from MySQL
- Continuously load data from Apache Flink®
- Change data through loading
- Transform data at loading
- Data Unloading
- Query Data Lakes
- Query Acceleration
- Gather CBO statistics
- Synchronous materialized views
- Asynchronous materialized views
- Colocate Join
- Lateral Join
- Query Cache
- Index
- Computing the Number of Distinct Values
- Sorted streaming aggregate
- Integrations
- Administration
- Management
- Data recovery
- User Privilege and Authentication
- Performance Tuning
- Reference
- SQL Reference
- User Account Management
- Cluster Management
- ADD SQLBLACKLIST
- ADMIN CANCEL REPAIR TABLE
- ADMIN CHECK TABLET
- ADMIN REPAIR TABLE
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER STORAGE VOLUME
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- CREATE RESOURCE GROUP
- CREATE STORAGE VOLUME
- DELETE SQLBLACKLIST
- DESC STORAGE VOLUME
- DROP FILE
- DROP RESOURCE GROUP
- DROP STORAGE VOLUME
- EXPLAIN
- INSTALL PLUGIN
- KILL
- SET
- SET DEFAULT STORAGE VOLUME
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROC
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW SQLBLACKLIST
- SHOW STORAGE VOLUMES
- SHOW TABLE STATUS
- SHOW VARIABLES
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ALTER RESOURCE
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE DATABASE
- CREATE EXTERNAL CATALOG
- CREATE FUNCTION
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE VIEW
- DROP ANALYZE
- DROP CATALOG
- DROP DATABASE
- DROP FUNCTION
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP STATS
- DROP TABLE
- DROP VIEW
- HLL
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SET CATALOG
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW FUNCTION
- SHOW META
- SHOW RESOURCES
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- CANCEL REFRESH MATERIALIZED VIEW
- CREATE ROUTINE LOAD
- DELETE
- DROP TASK
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- RESUME ROUTINE LOAD
- SELECT
- SHOW ALTER TABLE
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE CATALOG
- SHOW CREATE DATABASE
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEWS
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- SUBMIT TASK
- UPDATE
- Auxiliary Commands
- Data Types
- Keywords
- Function Reference
- Function list
- Java UDFs
- Window functions
- Lambda expression
- Aggregate Functions
- any_value
- approx_count_distinct
- array_agg
- avg
- bitmap
- bitmap_agg
- count
- corr
- covar_pop
- covar_samp
- group_concat
- grouping
- grouping_id
- hll_empty
- hll_hash
- hll_raw_agg
- hll_union
- hll_union_agg
- max
- max_by
- min
- min_by
- multi_distinct_sum
- multi_distinct_count
- percentile_approx
- percentile_cont
- percentile_disc
- retention
- stddev
- stddev_samp
- sum
- variance, variance_pop, var_pop
- var_samp
- window_funnel
- Array Functions
- all_match
- any_match
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_generate
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- arrays_overlap
- array_to_bitmap
- cardinality
- element_at
- reverse
- unnest
- Bit Functions
- Bitmap Functions
- base64_to_bitmap
- bitmap_agg
- bitmap_and
- bitmap_andnot
- bitmap_contains
- bitmap_count
- bitmap_from_string
- bitmap_empty
- bitmap_has_any
- bitmap_hash
- bitmap_intersect
- bitmap_max
- bitmap_min
- bitmap_or
- bitmap_remove
- bitmap_subset_in_range
- bitmap_subset_limit
- bitmap_to_array
- bitmap_to_base64
- bitmap_to_string
- bitmap_union
- bitmap_union_count
- bitmap_union_int
- bitmap_xor
- intersect_count
- sub_bitmap
- to_bitmap
- JSON Functions
- Overview of JSON functions and operators
- JSON operators
- JSON constructor functions
- JSON query and processing functions
- Map Functions
- Binary Functions
- Conditional Functions
- Cryptographic Functions
- Date Functions
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_diff
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_add
- hours_diff
- hours_sub
- last_day
- makedate
- microseconds_add
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- months_sub
- next_day
- now
- previous_day
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_date
- to_days
- unix_timestamp
- utc_timestamp
- week
- week_iso
- weekofyear
- weeks_add
- day_of_week_iso
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- Geographic Functions
- Math Functions
- String Functions
- append_trailing_char_if_absent
- ascii
- char
- char_length
- character_length
- concat
- concat_ws
- ends_with
- find_in_set
- group_concat
- hex
- hex_decode_binary
- hex_decode_string
- instr
- lcase
- left
- length
- locate
- lower
- lpad
- ltrim
- money_format
- null_or_empty
- parse_url
- repeat
- replace
- reverse
- right
- rpad
- rtrim
- space
- split
- split_part
- starts_with
- strleft
- strright
- str_to_map
- substring
- trim
- ucase
- unhex
- upper
- url_decode
- url_encode
- Pattern Matching Functions
- Percentile Functions
- Scalar Functions
- Struct Functions
- Table Functions
- Utility Functions
- cast function
- hash function
- AUTO_INCREMENT
- Generated columns
- System variables
- User-defined variables
- Error code
- System limits
- AWS IAM policies
- SQL Reference
- FAQ
- Benchmark
- Ecosystem Release Notes
- Developers
- Contribute to StarRocks
- Code Style Guides
- Use the debuginfo file for debugging
- Development Environment
- Trace Tools
Load data using Kafka connector
StarRocks provides a self-developed connector named Apache Kafka® connector (StarRocks Connector for Apache Kafka®) that continuously consumes messages from Kafka and loads them into StarRocks. The Kafka connector guarantees at-least-once semantics.
The Kafka connector can seamlessly integrate with Kafka Connect, which allows StarRocks better integrated with the Kafka ecosystem. It is a wise choice if you want to load real-time data into StarRocks. Compared with Routine Load, it is recommended to use the Kafka connector in the following scenarios:
- The format of source data is, for example, Protobuf, not JSON, CSV, or Avro.
- Customize data transformation, such as Debezium-formatted CDC data.
- Load data from multiple Kafka topics.
- Load data from Confluent cloud.
- Need finer control over load batch sizes, parallelism, and other parameters to achieve a balance between load speed and resource utilization.
Preparations
Set up Kafka environment
Both self-managed Apache Kafka clusters and Confluent cloud are supported.
- For a self-managed Apache Kafka cluster, make sure that you deploy the Apache Kafka cluster and Kafka Connect cluster and create topics.
- For Confluent cloud, make sure that you have a Confluent account and create clusters and topics.
Install Kafka connector
Submit the Kafka connector into Kafka Connect:
Self-managed Kafka cluster:
- Download and unzip starrocks-kafka-connector-1.0.0.tar.gz.
- Copy the extracted directory to the libs directory of Kafka. Restart Kafka Connect to read the latest JAR files.
Confluent cloud:
NOTE
The Kafka connector is not currently uploaded to Confluent Hub. You need to upload the compressed file to Confluent cloud.
Create StarRocks table
Create a table or tables in StarRocks according to Kafka Topics and data.
Examples
The following steps take a self-managed Kafka cluster as an example to demonstrate how to configure the Kafka connector and start the Kafka connect in order to load data into StarRocks.
Create a Kafka connector configuration file named connect-StarRocks-sink.properties and configure the parameters. For detailed information about parameters, see Parameters.
name=starrocks-kafka-connector connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector topics=dbserver1.inventory.customers starrocks.http.url=192.168.xxx.xxx:8030,192.168.xxx.xxx:8030 starrocks.username=root starrocks.password=123456 starrocks.database.name=inventory key.converter=io.confluent.connect.json.JsonSchemaConverter value.converter=io.confluent.connect.json.JsonSchemaConverter
NOTICE
If the source data is CDC data, such as data in Debezium format, and the StarRocks table is a Primary Key table, you also need to configure
transform
in order to synchronize the source data changes to the Primary Key table.Run the Kafka Connector. For parameters and description in the following command, see Kafka Documentation.
Standalone mode
bin/connect-standalone worker.properties connect-StarRocks-sink.properties [connector2.properties connector3.properties ...]
Distributed mode
NOTE
It is recommended to use the distributed mode in the production environment.
Start the worker.
bin/connect-distributed worker.properties
Note that in distributed mode the connector configurations are not passed on the command line. Instead, use the REST API described below to configure the Kafka connector and run the Kafka connect.
curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{ "name":"starrocks-kafka-connector", "config":{ "connector.class":"com.starrocks.connector.kafka.SinkConnector", "topics":"dbserver1.inventory.customers", "starrocks.http.url":"192.168.xxx.xxx:8030,192.168.xxx.xxx:8030", "starrocks.user":"root", "starrocks.password":"123456", "starrocks.database.name":"inventory", "key.converter":"io.confluent.connect.json.JsonSchemaConverter", "value.converter":"io.confluent.connect.json.JsonSchemaConverter" } }
Query data in the StarRocks table.
Parameters
Parameter | Required | Default value | Description |
---|---|---|---|
name | YES | Name for this Kafka connector. It must be globally unique among all Kafka connectors within this Kafka connect cluster. For example, starrocks-kafka-connector. | |
connector.class | YES | com.starrocks.connector.kafka.SinkConnector | Class used by this Kafka connector's sink. |
topics | YES | One or more topics to subscribe to, where each topic corresponds to a StarRocks table. By default, StarRocks assumes that the topic name matches the name of the StarRocks table. So StarRocks determines the target StarRocks table by using the topic name. Please choose either to fill in topics or topics.regex (below), but not both.However, if the StarRocks table name is not the same as the topic name, then use the optional starrocks.topic2table.map parameter (below) to specify the mapping from topic name to table name. | |
topics.regex | Regular expression to match the one or more topics to subscribe to. For more description, see topics . Please choose either to fill in topics.regex or topics (above), but not both. | ||
starrocks.topic2table.map | NO | The mapping of the StarRocks table name and the topic name when the topic name is different from the StarRocks table name. The format is <topic-1>:<table-1>,<topic-2>:<table-2>,... . | |
starrocks.http.url | YES | The HTTP URL of the FE in your StarRocks cluster. The format is <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>,... . Multiple addresses are separated by commas (,). For example, 192.168.xxx.xxx:8030,192.168.xxx.xxx:8030 . | |
starrocks.database.name | YES | The name of StarRocks database. | |
starrocks.username | YES | The username of your StarRocks cluster account. The user needs the INSERT privilege on the StarRocks table. | |
starrocks.password | YES | The password of your StarRocks cluster account. | |
key.converter | YES | In this scenario, the Kafka connector provided by StarRocks is a sink connector. This parameter specifies the key converter for the sink connector, which is used to deserialize the keys of Kafka data. You need to determine the value of this parameter based on the key converter used by the source connector. | |
value.converter | YES | This parameter specifies the value converter for the sink connector, which is used to deserialize the values of Kafka data. You need to determine the value of this parameter based on the value converter used by the source connector. | |
key.converter.schema.registry.url | NO | Schema registry URL for the key converter. | |
value.converter.schema.registry.url | NO | Schema registry URL for the value converter. | |
tasks.max | NO | 1 | The upper limit for the number of task threads that the Kafka connector can create, which is usually the same as the number of CPU cores on the worker nodes in the Kafka Connect cluster. You can tune this parameter to control load performance. |
bufferflush.maxbytes | NO | 94371840(90M) | The maximum size of data that can be accumulated in memory before being sent to StarRocks at a time. The maximum value ranges from 64 MB to 10 GB. Keep in mind that the Stream Load SDK buffer may create multiple Stream Load jobs to buffer data. Therefore, the threshold mentioned here refers to the total data size. |
bufferflush.intervalms | NO | 300000 | Interval for sending a batch of data which controls the load latency. Range: [1000, 3600000]. |
connect.timeoutms | NO | 1000 | Timeout for connecting to the HTTP URL. Range: [100, 60000]. |
sink.properties.* | Stream Load parameters o control load behavior. For example, the parameter sink.properties.format specifies the format used for Stream Load, such as CSV or JSON. For a list of supported parameters and their descriptions, see [STREAM LOAD](../sql-reference/sql-statements/data-manipulation/STREAM LOAD.md). | ||
sink.properties.format | NO | json | The format used for Stream Load. The Kafka connector will transform each batch of data to the format before sending them to StarRocks. Valid values: csv and json . For more information, see CSV parameters**和** JSON parameters. |
Limits
- It is not supported to flatten a single message from a Kafka topic into multiple data rows and load into StarRocks.
- The Kafka Connector's Sink guarantees at-least-once semantics.
Best practices
Load Debezium-formatted CDC data
If the Kafka data is in Debezium CDC format and the StarRocks table is a Primary Key table, you also need to configure the transforms
parameter and other related parameters.
transforms=addfield,unwrap
transforms.addfield.type=com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=true
transforms.unwrap.delete.handling.mode
In the above configurations, we specify transforms=addfield,unwrap
.
- The addfield transform is used to add the __op field to each record of Debezium CDC-formatted data to support the StarRocks primary key model table. If the StarRocks table is not a Primary Key table, you do not need to specify the addfield transform. The addfield transform class is com.Starrocks.Kafka.Transforms.AddOpFieldForDebeziumRecord. It is included in the Kafka connector JAR file, so you do not need to manually install it.
- The unwrap transform is provided by Debezium and is used to unwrap Debezium's complex data structure based on the operation type. For more information, see New Record State Extraction.