- Introduction
- Quick Start
- Table Design
- Data Loading
- Overview of data loading
- Load data from a local file system or a streaming data source using HTTP push
- Load data from HDFS or cloud storage
- Routine Load
- Spark Load
- Insert Into
- Change data through loading
- Transform data at loading
- Json Loading
- Synchronize data from MySQL
- Load data by using flink-connector-starrocks
- DataX Writer
- Data Export
- Using StarRocks
- Reference
- SQL Reference
- User Account Management
- Cluster Management
- ADMIN CANCEL REPAIR
- ADMIN CHECK TABLET
- ADMIN REPAIR
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- DROP FILE
- INSTALL PLUGIN
- SHOW BACKENDS
- SHOW BROKER
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW TABLE STATUS
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER TABLE
- ALTER VIEW
- BACKUP
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE DATABASE
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- RECOVER
- RESTORE
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- DML
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- RESUME ROUTINE LOAD
- ROUTINE LOAD
- SELECT
- SHOW ALTER
- SHOW BACKUP
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- Data Types
- Auxiliary Commands
- Function Reference
- Java UDFs
- Window Function
- Date Functions
- convert_tz
- curdate
- current_timestamp
- curtime
- datediff
- date_add
- date_format
- date_sub
- date_trunc
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- from_days
- from_unixtime
- hour
- minute
- month
- monthname
- now
- quarter
- second
- str_to_date
- timediff
- timestampadd
- timestampdiff
- to_date
- to_days
- unix_timestamp
- utc_timestamp
- weekofyear
- year
- hours_diff
- minutes_diff
- months_diff
- seconds_diff
- weeks_diff
- years_diff
- Aggregate Functions
- Geographic Functions
- String Functions
- JSON Functions
- Overview of JSON functions and operators
- JSON constructor functions
- JSON query and processing functions
- JSON operators
- Aggregate Functions
- Bitmap Functions
- Array Functions
- cast function
- hash function
- Cryptographic Functions
- Math Functions
- Utility Functions
- System variables
- Error code
- System limits
- SQL Reference
- Administration
- FAQ
- Deploy
- Data Migration
- SQL
- Other FAQs
- Benchmark
- Developers
- Contribute to StarRocks
- Code Style Guides
- Use the debuginfo file for debugging
- Development Environment
- Trace Tools
ROUTINE LOAD
Description
Routine Load allows you to submit a permanent import task to import data into StarRocks by constantly reading from a specified data source. Currently only support importing text format (CSV) data from Kafka without authentication or SSL authentication.
Syntax
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]
Parameters
[db.]job_name
The name of the import job, within the same database, can only have one job running with the same name.
tbl_name
This specify the name of the table you want to import.
load_properties
It is used to describe imported data. Syntax:
[COLUMNS TERMINATED BY '<terminator>'],
[COLUMNS ([<column_name> [, ...] ] [, column_assignment [, ...] ] )],
[WHERE <expr>],
[PARTITION ([ <partition_name> [, ...] ])]
column_assignment:
<column_name> = column_expression
- column_separator:
You can specify a column separator for the data in the CSV format. For example, specify a comma (,) as a column separator. Default to: \t.
COLUMNS TERMINATED BY ","
- columns_mapping:
Specifies the mapping relationships of columns in the source data and define how derived columns are generated.
- Mapped column:
It specifies in order which columns in the source data correspond to which columns in the destination table. For columns you want to skip, you can specify a column name that does not exist. Suppose the destination table has three columns, k1, k2, v1. Source data has four columns, of which columns 1, 2, and 4 correspond to k2, k1, and v1, respectively. Write as follows:
COLUMNS (k2, k1, xxx, v1)
Here, xxx s a non-existent column used to skip the third column in the source data.
- Derived columns:
A column in the form of col_name = expr, we call a derived column. That is, expr is used to calculate the values of the corresponding columns in the destination table. Derived columns are usually queued after mapped columns, although this is not mandatory, StarRocks always parses the mapped columns before the derived columns. In the next example, suppose the destination table also has column 4, v2, which is generated by the sum of K1 and k2. Can be written as follows:
COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);
For the data in the CSV format, the number of mapping columns in COLUMNS
must be consistent with the number of columns in the source data.
- where_predicates
It is used to specify filter conditions to filter out unnecessary columns. Filter columns can be mapped or derived columns. For example, if we only want to import columns with k1 greater than 100 and k2 equal to 1000, write as follows:
WHERE k1 > 100 and k2 = 1000
- partitions
It specifies which partitions to import into the destination table. If not specified, it is automatically imported into the corresponding partition.Example:
PARTITION(p1, p2, p3)
job_properties
General parameters used to specify routine import jobs.
Syntax:
PROPERTIES (
"key1" = "val1",
"key2" = "val2"
)
We currently support the following parameters:
desired_concurrent_number
Expected concurrency. A routine import job is divided into subtasks to execute. This parameter specifies the maximum number of tasks a job can perform simultaneously. Must be greater than 0. Default is 3. This concurrency degree is not the actual concurrency degree. The actual concurrency degree is considered by the number of nodes in the cluster, the load, and the data source. Example:
"desired_concurrent_number" = "3"
max_batch_interval
Task scheduling time, which is how often a task executes, defaults to 10s. Task consumption data time is routinein fe.conf Load_Task_Consume_Second, defaults to 3s. Task execution timeout is routine_in fe.conf Load Task_Timeout_Second, defaults to 15s. Example:
"max_batch_interval" = "20"
max_error_number/max_batch_rows
Maximum number of error rows allowed in the sampling window. Must be greater than or equal to 0. The default is 0, which means no error lines are allowed. Sampling window is maxBatch_Rows 10, defaulted to (200000 10 = 2000000)
. That is, if the number of error rows is greater than max_within the sampling window Error Number causes routine operations to be suspended and requires manual intervention to check for data quality issues. Rows filtered by where criteria are not erroneous rows.
strict_mode
Whether strict mode is on or off by default. If turned on, column type transformations for non-empty raw data are filtered if the result is NULL. Specify "strict_mode" = "true"
timezone
Specify the time zone used by the import job. The default is to use the timezone parameter of Session. This parameter affects the results of all time zone related functions involved in importing.
format
Specify the time zone used by the import job. The default is to use the timezone parameter of Session. This parameter affects the results of all time zone related functions involved in importing.
jsonpaths
Jsonpaths: There are two ways to import json: simple mode and matching mode. If jsonpath is set, it is a matching mode import, otherwise it is a simple mode import, you can refer to an example.
strip_outer_array
Boolean type, true, indicates that the JSON data starts with an array object and flattens it in the array object, defaulting to false.
json_root
Json_ Root is a legal jsonpath string that specifies the root node of the JSON document with a default value of''.
data_source
Type of data source. It currently supports: Apache Kafka
data_source_properties
It specifies information about the data source.
Syntax:
(
"key1" = "val1",
"key2" = "val2"
)
- Apache Kafka
kafka_broker_list
kafka's broker connection information. The format is ip:host. Multiple brokers are separated by commas.
Example:
"kafka_broker_list" = "broker1:9092,broker2:9092"
kafka_topic
Specify a topic for Kafka to subscribe to.
Example:
"kafka_topic" = "my_topic"
kafka_partitions/kafka_offsets
Specify the kafka partition you want to subscribe to, and the starting offset for each of the corresponding partitions.
offset can specify a specific offset greater than or equal to 0, or:
1. OFFSET_BEGINNING: Subscribe from a location with data.
2. OFFSET_END: Subscribe from the end.
Default from OFFSET_if not specified END starts subscribing to all partitions under top.
Example:
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
property
specify custom kafka parameters.
The function is equivalent to the"--property" parameter in the kafka shell.
When the value of a parameter is a file, the keyword "FILE:" needs to be added before the value.
For information on how to create a file, see "HELP CREATE FILE;"
For more supported custom parameters, see the client-side configuration item in librdkafka's official CONFIGURATION document.
Example:
"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca-cert"
1.When connecting to Kafka using SSL, the following parameters need to be specified:
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca-cert",
Where:
"property.security.protocol" is used to indicate that the connection is SSL.
"property.ssl.ca.location" is used when be accesses kafka, specifying the location of the CA certificate.
If client authentication is turned on on at the Kafka server side, settings are also required:
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg",
Where:
"property.ssl.certificate.location" specifies the location of public key of client.
"property.ssl.key.location" specifies the location of private key of client.
"property.ssl.key.password" specifies the password of private key of client.
2.When connecting Kafka using SASL, the following parameters need to be specified:
"property.security.protocol"="SASL_PLAINTEXT",
"property.sasl.mechanism"="PLAIN",
"property.sasl.username"="admin",
"property.sasl.password"="admin"
Where:
"property.security.protocol" specifies that the protocol is SASL_ PLAINTEXT.
"property.sasl.mechanism" specifies that the authentication method for SASL is PLAIN.
"property.sasl.username" specifies the user name of sasl.
"property.sasl.password" specifies the password for sasl.
3.Specify the default starting offset for Kafka partition
If kafka_is not specified Partitions/kafka_ Offsets, consumes all partitions by default, at which point kafka_can be specified Default_ Offsets specify the starting offset. Default to OFFSET_ END, subscribe from the end.
Value is:
1.OFFSET_BEGINNING: Subscribe from a location with data.
2.OFFSET_END: Subscribe from the end.
Example:
"property.kafka_default_offsets" = "OFFSET_BEGINNING
Sample Import Data Format
- Integer Type (TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234
- Floating Point Type (FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, .356
- Date and Time Type (DATE/DATETIME): 2017-10-03, 2017-06-13 12:34:03
- String Type (CHAR/VARCHAR) without quotation marks: I am a student, a
- NULL Value: \N
Examples
- Create a kafka routine import task named test1 for example_tbl in example_db. Specify the column separator, group.id and client.id, automatically consume all partitions by default, and subscribe from a location with data (OFFSET_BEGINNING).
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS TERMINATED BY ",",
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.group.id" = "xxx",
"property.client.id" = "xxx",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
- Create a kafka routine import task named test1 for example_tbl in example_db. The import task is in strict mode.
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
WHERE k1 > 100 and k2 like "%starrocks%"
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);
- Import data from the Kafka cluster using SSL authentication. The client.id parameter is also set. Import task is in non-strict mode and time zone is Africa/Abidjan.
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
WHERE k1 > 100 and k2 like "%starrocks%"
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "false",
"timezone" = "Africa/Abidjan"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg",
"property.client.id" = "my_client_id"
);
- Import JSON in simple mode.
CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
COLUMNS(category,price,author)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
);
Two JSON data formats are supported:
1){"category":"a9jadhx","author":"test","price":895}
2)[
{"category":"a9jadhx","author":"test","price":895},
{"category":"axdfa1","author":"EvelynWaugh","price":1299}
]
- Precise import of JSON data format:
CREATE TABLE `example_tbl` (
`category` varchar(24) NULL COMMENT "",
`author` varchar(24) NULL COMMENT "",
`timestamp` bigint(20) NULL COMMENT "",
`dt` int(11) NULL COMMENT "",
`price` double REPLACE
) ENGINE=OLAP
AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
COMMENT "OLAP"
PARTITION BY RANGE(`dt`)
(PARTITION p0 VALUES [("-2147483648"), ("20200509")),
PARTITION p20200509 VALUES [("20200509"), ("20200510")),
PARTITION p20200510 VALUES [("20200510"), ("20200511")),
PARTITION p20200511 VALUES [("20200511"), ("20200512")))
DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4
PROPERTIES (
"storage_type" = "COLUMN",
"replication_num" = "1"
);
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
"strip_outer_array" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
);
JSON data format:
[
{"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},
{"category":"22","author":"2avc","price":895,"timestamp":1589191487},
{"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}
]
Note:
If the JSON data starts with an array and each object in the array is a record, stripis required Outer Array is set to true to flatten the array.
If the JSON data starts with an array and each object in the array is a record, our ROOT node is actually an object in the array when setting jsonpath.
Users specify root node json_ root.
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
"strip_outer_array" = "true",
"json_root" = "$.RECORDS"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
);
JSON data format:
{
"RECORDS":[
{"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},
{"category":"22","author":"2avc","price":895,"timestamp":1589191487},
{"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}
]
}
Keywords
CREATE, ROUTINE, LOAD