- Introduction
- Quick Start
- Table Design
- Data Loading
- Data Export
- Using StarRocks
- Reference
- SQL Reference
- User Account Management
- Cluster Management
- ADMIN CANCEL REPAIR
- ADMIN CHECK TABLET
- ADMIN REPAIR
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- DROP FILE
- INSTALL PLUGIN
- SHOW BACKENDS
- SHOW BROKER
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW TABLE STATUS
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER TABLE
- ALTER VIEW
- BACKUP
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE DATABASE
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- RECOVER
- RESTORE
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- DML
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- RESUME ROUTINE LOAD
- ROUTINE LOAD
- SELECT
- SHOW ALTER
- SHOW BACKUP
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- Data Type
- Auxiliary Commands
- Function Reference
- Date Functions
- Geographic Functions
- String Functions
- Aggregation Functions
- Bitmap Functions
- Array Functions
- cast function
- hash function
- Crytographic Functions
- Math Functions
- Utility Functions
- System variables
- Error code
- System limits
- SQL Reference
- Administration
- FAQ
- Deployment
- Data Migration
- SQL
- Others FAQs
- Benchmark
- Developers
- Contribute to StarRocks
- Code Style Guides
- Use the debuginfo file for debugging
- Development Environment
- Trace Tools
- Integration
Routine Load
The Routine load method supports continuous import of data from Kafka. Routine load can be paused, restarted, stopped by SQL. This section introduces the basic principle and usage of this method.
Terminology explanation
- RoutineLoadJob: A routine import job submitted by the user.
- JobScheduler: Routine import job scheduler for scheduling and splitting a RoutineLoadJob into multiple subtasks.
- Task: Subtasks of RoutineLoadJob split by JobScheduler according to rules.
- TaskScheduler: Task scheduler for scheduling the execution of a task.
Fundamentals
The import process is illustrated above.
- User submits a Kafka import job to the FE via a client that supports the MySQL protocol.
- The FE splits the job into several tasks, each of which is responsible for importing a specified portion of the data.
- Each task is assigned to a specified BE for execution. On the BE, a task is treated as a normal import task, and performed according to the stream load import mechanism.
- The BE reports to the FE when the import is completed.
- Based on the report result, the FE continues to generate new tasks or retry the failed task.
- The FE keeps generating new tasks to complete the data import without interruption.
Import example
Environment requirements
- It is supported to access Kafka clusters without authentication or with SSL/SASL authentication.
- Supports JSON text format or CSV text format with one line per message and no line breaks at the end.
- Only Kafka versions 0.10.0.0 and above are supported.
Create import task
Syntax:
CREATE ROUTINE LOAD [database.][job_name] ON [table_name]
[COLUMNS TERMINATED BY "column_separator" ,]
[COLUMNS (col1, col2, ...) ,]
[WHERE where_condition ,]
[PARTITION (part1, part2, ...)]
[PROPERTIES ("key" = "value", ...)]
FROM [DATA_SOURCE]
[(data_source_properties1 = 'value1',
data_source_properties2 = 'value2',
...)]
Example:
An example of importing data from a local Kafka cluster:
CREATE ROUTINE LOAD routine_load_wikipedia ON routine_wiki_edit
COLUMNS TERMINATED BY ",",
COLUMNS (event_time, channel, user, is_anonymous, is_minor, is_new, is_robot, is_unpatrolled, delta, added, deleted)
PROPERTIES
(
"desired_concurrent_number"="1",
"max_error_number"="1000"
)
FROM KAFKA
(
"kafka_broker_list"= "localhost:9092",
"kafka_topic" = "starrocks-load"
);
Description:
- job_name: Required. The name of the imported job. The name of the imported database can be used as a prefix of job_name. A common naming convention is timestamp + table name.
- table_name: Required. The name of the target table to be imported.
- COLUMN TERMINATED clause: Optional. Used to specify the column separator in the source data file. The separator defaults to
t
. - COLUMN clause : Optional. Used to specify the mapping relationship between the columns of the source data and the columns of the table.
- Mapped columns: If the target table has three columns
col1, col2, col3
, and the source data has four columns, where columns 1, 2, and 4 correspond to col2, col1, col3 respectively, then it is written as follows: COLUMNS (col2, col1, temp, col3), where temp column is a non-existent column, which is used to skip the third column in the source data. - Derived columns: In addition to directly importing the column of the source data, StarRocks can process operations on it. Suppose a fourth column col4 is added after the target table and its result is generated by
col1 + col2
, then it can be written as follows:COLUMNS (col2, col1, temp, col3, col4 = col1 + col2)
.
- Mapped columns: If the target table has three columns
- WHERE clause: Optional. Used to specify filtering conditions to filter out unwanted rows. The filter condition can specify mapped or derived columns. For example, to import only rows where k1 is greater than 100 and k2 is equal to 1000, it would be written as follows:
WHERE k1 > 100 and k2 = 1000
. - PARTITION clause: Optional. Used to specify to which partitions of the target table to import. If not specified, it will be automatically imported to the corresponding partition.
- PROPERTIES clause: Optional. Used to specify the general parameters of the import job.
- desired_concurrent_number: Import concurrency. Used to specify the maximum number of subtasks for one import job. This value must be greater than 0. The default value is 3.
- max_batch_interval: The maximum execution time per subtask, in "seconds". The value range is 5 to 60. The default value is 10. After Version 1.15, this parameter is the scheduling time of subtasks, i.e. how often the task is executed. The data consumption time of a task is
routine_load_task_consume_second
infe.conf
with a default value of 3s.The execution timeout of a task isroutine_load_task_timeout_second
infe.conf
with a default value of 15s. - max_batch_rows: The maximum number of rows to be read per subtask. The value must be greater than or equal to 200000. The default value is 200000. * After version 1.15, this parameter is only used to define the range of error detection, which is 10.
- max_batch_size: The maximum number of bytes to be read per subtask, in "bytes". The value range is 100MB to 1GB. The default value is 100MB. * After version 1.15, this parameter is deprecated. The data consumption time of a task is
routine_load_task_consume_second
infe.conf
with a default value of 3s. - max_error_number: The maximum number of error rows allowed within the sampling window. The default value is 0, i.e. no error rows are allowed. Note: rows that are filtered out by the where condition are not considered error rows.
- strict_mode: If or not strict mode is enabled. The default value is on. When turned on, non-null raw data will be filtered if the column type transformation results in NULL. Strict mode can be turned off with "strict_mode" = "false".
- timezone: Specify the timezone used by the import job. The default value is the session's timezone parameter. This parameter affects the results of all time zone-related functions involved in the import.
- DATA_SOURCE: Specify the data source,
KAFKA
.- data_source_properties: Specify the information related to the data source.
- kafka_broker_list: Kafka's broker connection information, in the format
ip:host
. Multiple brokers are separated by commas. - kafka_topic: Specify the topic of Kafka to subscribe to.
- kafka_partitions/kafka_offsets: Specify the kafka partitions to subscribe to, and the starting offset of each partition.
- property: the properties here mainly means kafka-related properties, and are functionally equivalent to the "--property" parameter in the kafka shell.
View job status
Displays all routine import jobs (including stopped or canceled jobs) under [database]. The result is one or more rows.
USE [database]; SHOW ALL ROUTINE LOAD;
Displays the currently running routine import job under [database] with the name
job_name
.SHOW ROUTINE LOAD FOR [database.][job_name];
Note: StarRocks can only view tasks that are currently running. Closed and unstarted tasks cannot be viewed.
Using the import task created above as an example, the following command allows you to view all running routine load jobs.
MySQL [load_test]> SHOW ROUTINE LOAD\G;
*************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":150821770,"errorRows":122,"committedTaskNum":12,"loadedRows":2399878,"loadRowsRate":199000,"abortedTaskNum":1,"totalRows":2400000,"unselectedRows":0,"receivedBytesRate":12523000,"taskExecuteTimeMs":12043}
Progress: {"0":"13634667"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.108.172:9122/api/_load_error_log?file=__shard_53/error_log_insert_stmt_47e8a1d107ed4932-8f1ddf7b01ad2fee_47e8a1d107ed4932_8f1ddf7b01ad2fee, http://172.26.108.172:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.108.172:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8
OtherMsg:
1 row in set (0.00 sec)
The import job named routine_load_wikipedia
is created in the example, where the important fields are defined as follows:
State: the status of the import job.
RUNNING
indicates that the import task is in continuous operation.Statistics are the logs of the import job.
receivedBytes: The size of the received data, in "Byte".
errorRows: The number of import error rows
committedTaskNum: The number of FE committed tasks
loadedRows: The number of rows that have been imported
loadedRowsRate: The rate of importing data, in "rows/s".
abortedTaskNum: The number of failed tasks in BE
totalRows: The total number of rows received
unselectedRows: The number of rows filtered by the
where
conditionreceivedBytesRate: The rate of receiving data, in "Bytes/s
taskExecuteTimeMs: The time taken to import, in "ms
ErrorLogUrls: Error message log, accessible by URL
Suspend the import job
After using the PAUSE
statement, the import job enters the PAUSED state. However, the job is not extinguished and can be restarted with the RESUME
statement.
- Suspend the routine import job with the name
job_name
.
PAUSE ROUTINE LOAD FOR [job_name];
MySQL [load_test]> SHOW ROUTINE LOAD\G;
*************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: 2020-05-16 16:03:39
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: PAUSED
DataSourceType: KAFKA
CurrentTaskNum: 0
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":162767220,"errorRows":132,"committedTaskNum":13,"loadedRows":2589972,"loadRowsRate":115000,"abortedTaskNum":7,"totalRows":2590104,"unselectedRows":0,"receivedBytesRate":7279000,"taskExecuteTimeMs":22359}
Progress: {"0":"13824771"}
ReasonOfStateChanged: ErrorReason{code=errCode = 100, msg='User root pauses routine load job'}
ErrorLogUrls: http://172.26.108.172:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.108.172:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8, http://172.26.108.172:9122/api/_load_error_log?file=__shard_56/error_log_insert_stmt_8753041cd5fb42d0-b5150367a5175391_8753041cd5fb42d0_b5150367a5175391
OtherMsg:
1 row in set (0.01 sec)
After suspending the import job, the state of the job changes to PAUSED and the import information in statistics updating. At this point, the job is not extinguished and you can view the paused import job by using the SHOW ROUTINE LOAD
statement.
Resume the import job
After using the RESUME
statement, the job will briefly enter the NEED_SCHEDULE state, indicating that the job is being rescheduled and will resume to the RUNNING
state shortly.
- Restart the routine import job with the name
job_name
.
RESUME ROUTINE LOAD FOR [job_name];
MySQL [load_test]> RESUME ROUTINE LOAD FOR routine_load_wikipedia;
Query OK, 0 rows affected (0.01 sec)
MySQL [load_test]> SHOW ROUTINE LOAD\G;
*************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: NEED_SCHEDULE
DataSourceType: KAFKA
CurrentTaskNum: 0
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":162767220,"errorRows":132,"committedTaskNum":13,"loadedRows":2589972,"loadRowsRate":115000,"abortedTaskNum":7,"totalRows":2590104,"unselectedRows":0,"receivedBytesRate":7279000,"taskExecuteTimeMs":22359}
Progress: {"0":"13824771"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.108.172:9122/api/_load_error_log?file=__shard_54/error_log_insert_stmt_e0c0c6b040c044fd-a162b16f6bad53e6_e0c0c6b040c044fd_a162b16f6bad53e6, http://172.26.108.172:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8, http://172.26.108.172:9122/api/_load_error_log?file=__shard_56/error_log_insert_stmt_8753041cd5fb42d0-b5150367a5175391_8753041cd5fb42d0_b5150367a5175391
OtherMsg:
1 row in set (0.00 sec)
MySQL [load_test]> SHOW ROUTINE LOAD\G;
*************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: N/A
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":175337712,"errorRows":142,"committedTaskNum":14,"loadedRows":2789962,"loadRowsRate":118000,"abortedTaskNum":7,"totalRows":2790104,"unselectedRows":0,"receivedBytesRate":7422000,"taskExecuteTimeMs":23623}
Progress: {"0":"14024771"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.108.172:9122/api/_load_error_log?file=__shard_55/error_log_insert_stmt_ce4c95f0c72440ef-a442bb300bd743c8_ce4c95f0c72440ef_a442bb300bd743c8, http://172.26.108.172:9122/api/_load_error_log?file=__shard_56/error_log_insert_stmt_8753041cd5fb42d0-b5150367a5175391_8753041cd5fb42d0_b5150367a5175391, http://172.26.108.172:9122/api/_load_error_log?file=__shard_57/error_log_insert_stmt_31304c87bb82431a-9f2baf7d5fd7f252_31304c87bb82431a_9f2baf7d5fd7f252
OtherMsg:
1 row in set (0.00 sec)
ERROR: No query specified
After restarting the import job, the state changes to NEED_SCHEDULE
at the first query, indicating that the job is being rescheduled. The state changes to RUNNING
at the second query, and the import information in statistics starts to update.
Stopping the import job
Use the STOP
statement to put the import job into the STOP
state. The data stops being imported and the import job cannot be resumed.
- Stop a routine import job with the name
job_name
.
STOP ROUTINE LOAD FOR [job_name];
MySQL [load_test]> STOP ROUTINE LOAD FOR routine_load_wikipedia;
Query OK, 0 rows affected (0.01 sec)
MySQL [load_test]> SHOW ALL ROUTINE LOAD\G;
*************************** 1. row ***************************
Id: 14093
Name: routine_load_wikipedia
CreateTime: 2020-05-16 16:00:48
PauseTime: N/A
EndTime: 2020-05-16 16:08:25
DbName: default_cluster:load_test
TableName: routine_wiki_edit
State: STOPPED
DataSourceType: KAFKA
CurrentTaskNum: 0
JobProperties: {"partitions":"*","columnToColumnExpr":"event_time,channel,user,is_anonymous,is_minor,is_new,is_robot,is_unpatrolled,delta,added,deleted","maxBatchIntervalS":"10","whereExpr":"*","maxBatchSizeBytes":"104857600","columnSeparator":"','","maxErrorNum":"1000","currentTaskConcurrentNum":"1","maxBatchRows":"200000"}
DataSourceProperties: {"topic":"starrocks-load","currentKafkaPartitions":"0","brokerList":"localhost:9092"}
CustomProperties: {}
Statistic: {"receivedBytes":325534440,"errorRows":264,"committedTaskNum":26,"loadedRows":5179944,"loadRowsRate":109000,"abortedTaskNum":18,"totalRows":5180208,"unselectedRows":0,"receivedBytesRate":6900000,"taskExecuteTimeMs":47173}
Progress: {"0":"16414875"}
ReasonOfStateChanged:
ErrorLogUrls: http://172.26.108.172:9122/api/_load_error_log?file=__shard_67/error_log_insert_stmt_79e9504cafee4fbd-b3981a65fb158cde_79e9504cafee4fbd_b3981a65fb158cde, http://172.26.108.172:9122/api/_load_error_log?file=__shard_68/error_log_insert_stmt_b6981319ce56421b-bf4486c2cd371353_b6981319ce56421b_bf4486c2cd371353, http://172.26.108.172:9122/api/_load_error_log?file=__shard_69/error_log_insert_stmt_1121400c1f6f4aed-866c381eb49c966e_1121400c1f6f4aed_866c381eb49c966e
OtherMsg:
After stopping the import job, the state of the job changes to STOP, and the import information in statistics will never be updated again. At this point, you cannot view the stopped import job with the SHOW ROUTINE LOAD
statement.
Frequently Asked Questions
Q:Import job is PAUSE, error:
Broker: Offset out of range
A:Check the latest offset by
SHOW ROUTINE LOAD
, and use Kafka client to check if there is any data in the offset.Possible causes:
- The future offset was specified when importing.
- Kafka has already cleaned up the data of this offset before importing. You need to consider the import speed of StarRocks and set reasonable parameters for log cleanup (e.g.
log.retention.hours
,log.retention.bytes
, etc.).