- StarRocks
- Introduction to StarRocks
- Quick Start
- Table Design
- Data Loading
- Overview of data loading
- Load data from a local file system or a streaming data source using HTTP PUT
- Load data from HDFS or cloud storage
- Continuously load data from Apache Kafka®
- Bulk load using Apache Spark™
- Load data using INSERT
- Load data using Stream Load transaction interface
- Realtime synchronization from MySQL
- Continuously load data from Apache Flink®
- Change data through loading
- Transform data at loading
- Data Unloading
- Query Data Sources
- Query Acceleration
- Gather CBO statistics
- Materialized view
- Colocate Join
- Lateral Join
- Query Cache
- Index
- Computing the Number of Distinct Values
- Administration
- Deployment
- Management
- Data Recovery
- User Privilege and Authentication
- Performance Tuning
- Reference
- SQL Reference
- Keywords
- User Account Management
- Cluster Management
- ADMIN CANCEL REPAIR TABLE
- ADMIN CHECK TABLET
- ADMIN REPAIR TABLE
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- CREATE RESOURCE GROUP
- DROP FILE
- DROP RESOURCE GROUP
- INSTALL PLUGIN
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROC
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW TABLE STATUS
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ALTER RESOURCE
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE EXTERNAL CATALOG
- CREATE DATABASE
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP ANALYZE
- DROP STATS
- DROP CATALOG
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW META
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- CANCEL REFRESH MATERIALIZED VIEW
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- RESUME ROUTINE LOAD
- ROUTINE LOAD
- SELECT
- SHOW ALTER TABLE
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEW
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- Auxiliary Commands
- Data Types
- Function Reference
- Java UDFs
- Window functions
- Lambda expression
- Aggregate Functions
- Array Functions
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- arrays_overlap
- array_to_bitmap
- reverse
- unnest
- Bit Functions
- Bitmap Functions
- base64_to_bitmap
- bitmap_and
- bitmap_andnot
- bitmap_contains
- bitmap_count
- bitmap_from_string
- bitmap_empty
- bitmap_has_any
- bitmap_hash
- bitmap_intersect
- bitmap_max
- bitmap_min
- bitmap_or
- bitmap_remove
- bitmap_to_array
- bitmap_to_base64
- bitmap_to_string
- bitmap_union
- bitmap_union_count
- bitmap_union_int
- bitmap_xor
- intersect_count
- sub_bitmap
- to_bitmap
- Conditional Functions
- Cryptographic Functions
- Date Functions
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_sub
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- now
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_date
- to_days
- unix_timestamp
- utc_timestamp
- week
- weekofyear
- weeks_add
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- Geographic Functions
- JSON Functions
- Overview of JSON functions and operators
- JSON operators
- JSON constructor functions
- JSON query and processing functions
- Map Functions
- Math Functions
- String Functions
- Pattern Matching Functions
- Percentile Functions
- Scalar Functions
- Utility Functions
- cast function
- hash function
- System variables
- User-defined variables
- Error code
- System limits
- SQL Reference
- FAQ
- Deploy
- Data Migration
- SQL
- Query Dump
- Other FAQs
- Benchmark
- Developers
- Contribute to StarRocks
- Development Environment
- Trace Tools
- Integrate with StarRocks
CREATE ROUTINE LOAD
Description
Routine Load can stream Apache Kafka® events into StarRocks. It is an asynchronuous loading submitted by using MySQL protocol.
Currently, Routine Load supports to load CSV and JSON data from Kafka. As for security measures, Routine Load can connect to Kafka without authentication,encryption and authentication using SSL, as well as authentication using SASL.
This topic introduces CREATE ROUTINE LOAD‘s syntax, parameters and examples.
For scenarios, principle and basic steps, please see Continuously load data from Apache Kafka® .
Syntax
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[load_properties]
[job_properties]
FROM data_source
[data_source_properties]
Parameters
[db.]job_name
The name of the import job, within the same database, can only have one job running with the same name.
tbl_name
This specify the name of the table you want to import.
load_properties
It is used to describe imported data. Syntax:
[COLUMNS TERMINATED BY '<terminator>'],
[COLUMNS ([<column_name> [, ...] ] [, column_assignment [, ...] ] )],
[WHERE <expr>],
[PARTITION ([ <partition_name> [, ...] ])]
column_assignment:
<column_name> = column_expression
- column_separator:
You can specify a column separator for the data in the CSV format. For example, specify a comma (,) as a column separator. Default to: \t.
COLUMNS TERMINATED BY ","
- columns_mapping:
Specifies the mapping relationships of columns in the source data and define how derived columns are generated.
- Mapped column:
It specifies in order which columns in the source data correspond to which columns in the destination table. For columns you want to skip, you can specify a column name that does not exist. Suppose the destination table has three columns, k1, k2, v1. Source data has four columns, of which columns 1, 2, and 4 correspond to k2, k1, and v1, respectively. Write as follows:
COLUMNS (k2, k1, xxx, v1)
Here, xxx s a non-existent column used to skip the third column in the source data.
- Derived columns:
A column in the form of col_name = expr, we call a derived column. That is, expr is used to calculate the values of the corresponding columns in the destination table. Derived columns are usually queued after mapped columns, although this is not mandatory, StarRocks always parses the mapped columns before the derived columns. In the next example, suppose the destination table also has column 4, v2, which is generated by the sum of K1 and k2. Can be written as follows:
COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);
For the data in the CSV format, the number of mapping columns in COLUMNS
must be consistent with the number of columns in the source data.
- where_predicates
It is used to specify filter conditions to filter out unnecessary columns. Filter columns can be mapped or derived columns. For example, if we only want to import columns with k1 greater than 100 and k2 equal to 1000, write as follows:
WHERE k1 > 100 and k2 = 1000
- partitions
It specifies which partitions to import into the destination table. If not specified, it is automatically imported into the corresponding partition.Example:
PARTITION(p1, p2, p3)
job_properties
General parameters used to specify routine import jobs.
Syntax:
PROPERTIES (
"key1" = "val1",
"key2" = "val2"
)
We currently support the following parameters:
desired_concurrent_number
Expected concurrency. A routine import job is divided into subtasks to execute. This parameter specifies the maximum number of tasks a job can perform simultaneously. Must be greater than 0. Default is 3. This concurrency degree is not the actual concurrency degree. The actual concurrency degree is considered by the number of nodes in the cluster, the load, and the data source. Example:
"desired_concurrent_number" = "3"
max_batch_interval
Task scheduling time, which is how often a task executes, defaults to 10s. Task consumption data time is routinein fe.conf Load_Task_Consume_Second, defaults to 3s. Task execution timeout is routine_in fe.conf Load Task_Timeout_Second, defaults to 15s. Example:
"max_batch_interval" = "20"
max_error_number/max_batch_rows
Maximum number of error rows allowed in the sampling window. Must be greater than or equal to 0. The default is 0, which means no error lines are allowed. Sampling window is maxBatch_Rows 10, defaulted to (200000 10 = 2000000)
. That is, if the number of error rows is greater than max_within the sampling window Error Number causes routine operations to be suspended and requires manual intervention to check for data quality issues. Rows filtered by where criteria are not erroneous rows.
strict_mode
Whether strict mode is on or off by default. If turned on, column type transformations for non-empty raw data are filtered if the result is NULL. Specify "strict_mode" = "true"
timezone
Specify the time zone used by the import job. The default is to use the timezone parameter of Session. This parameter affects the results of all time zone related functions involved in importing.
merge_condition
Specifies the name of the column you want to use as the condition to determine whether updates can take effect. The update from a source record to a destination record takes effect only when the source data record has a larger value than the destination data record in the specified column. For more information, see Change data through loading.
For example, a table contains columns id
, timestamp
and so on. And you want to update data rows with the same primary key based on the timestamp
column. The data row with the same primary key will be updated only if the value of the timestamp column of the newly loaded data is greater than the value of the current data in StarRocks. Then you can set "merge_condition" = "timestamp"
.
NOTE
The column that you specify cannot be a primary key column. Additionally, only tables that use the Primary Key model support conditional updates."
CREATE ROUTINE LOAD test_db.table5 on table5
COLUMNS (id, version, score),
COLUMNS TERMINATED BY ','
PROPERTIES
(
"merge_condition" = "version"
)
FROM KAFKA
(
"kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "topic5",
"property.kafka_default_offsets" ="OFFSET_BEGINNING"
);
format
Specify the time zone used by the import job. The default is to use the timezone parameter of Session. This parameter affects the results of all time zone related functions involved in importing.
jsonpaths
Jsonpaths: There are two ways to import json: simple mode and matching mode. If jsonpath is set, it is a matching mode import, otherwise it is a simple mode import, you can refer to an example.
strip_outer_array
Boolean type, true, indicates that the JSON data starts with an array object and flattens it in the array object, defaulting to false.
json_root
Json_ Root is a legal jsonpath string that specifies the root node of the JSON document with a default value of''.
data_source
Type of data source. It currently supports: Apache Kafka
data_source_properties
It specifies information about the data source.
Syntax:
(
"key1" = "val1",
"key2" = "val2"
)
- Apache Kafka
kafka_broker_list
kafka's broker connection information. The format is ip:host. Multiple brokers are separated by commas.
Example:
"kafka_broker_list" = "broker1:9092,broker2:9092"
kafka_topic
Specify a topic for Kafka to subscribe to.
Example:
"kafka_topic" = "my_topic"
kafka_partitions/kafka_offsets
Specify the kafka partition you want to subscribe to, and the starting offset for each of the corresponding partitions.
offset can specify a specific offset greater than or equal to 0, or:
1. OFFSET_BEGINNING: Subscribe from a location with data.
2. OFFSET_END: Subscribe from the end.
Default from OFFSET_if not specified END starts subscribing to all partitions under top.
Example:
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
property
specify custom kafka parameters.
The function is equivalent to the"--property" parameter in the kafka shell.
When the value of a parameter is a file, the keyword "FILE:" needs to be added before the value.
For information on how to create a file, see CREATE FILE.
For more supported custom parameters, see the client-side configuration item in librdkafka's official CONFIGURATION document.
Example:
"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca-cert"
1.When connecting to Kafka using SSL, the following parameters need to be specified:
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca-cert",
Where:
"property.security.protocol" is used to indicate that the connection is SSL.
"property.ssl.ca.location" is used when be accesses kafka, specifying the location of the CA certificate.
If client authentication is turned on on at the Kafka server side, settings are also required:
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg",
Where:
"property.ssl.certificate.location" specifies the location of public key of client.
"property.ssl.key.location" specifies the location of private key of client.
"property.ssl.key.password" specifies the password of private key of client.
2.When connecting Kafka using SASL, the following parameters need to be specified:
"property.security.protocol"="SASL_PLAINTEXT",
"property.sasl.mechanism"="PLAIN",
"property.sasl.username"="admin",
"property.sasl.password"="admin"
Where:
"property.security.protocol" specifies that the protocol is SASL_ PLAINTEXT.
"property.sasl.mechanism" specifies that the authentication method for SASL is PLAIN.
"property.sasl.username" specifies the user name of sasl.
"property.sasl.password" specifies the password for sasl.
3.Specify the default starting offset for Kafka partition
If kafka_is not specified Partitions/kafka_ Offsets, consumes all partitions by default, at which point kafka_can be specified Default_ Offsets specify the starting offset. Default to OFFSET_ END, subscribe from the end.
Value is:
1.OFFSET_BEGINNING: Subscribe from a location with data.
2.OFFSET_END: Subscribe from the end.
Example:
"property.kafka_default_offsets" = "OFFSET_BEGINNING
Sample Import Data Format
- Integer Type (TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234
- Floating Point Type (FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, .356
- Date and Time Type (DATE/DATETIME): 2017-10-03, 2017-06-13 12:34:03
- String Type (CHAR/VARCHAR) without quotation marks: I am a student, a
- NULL Value: \N
Examples
- Create a kafka routine import task named test1 for example_tbl in example_db. Specify the column separator, group.id and client.id, automatically consume all partitions by default, and subscribe from a location with data (OFFSET_BEGINNING).
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS TERMINATED BY ",",
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "false"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.group.id" = "xxx",
"property.client.id" = "xxx",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
- Create a kafka routine import task named test1 for example_tbl in example_db. The import task is in strict mode.
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
WHERE k1 > 100 and k2 like "%starrocks%"
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,0,200"
);
- Import data from the Kafka cluster using SSL authentication. The client.id parameter is also set. Import task is in non-strict mode and time zone is Africa/Abidjan.
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
WHERE k1 > 100 and k2 like "%starrocks%"
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "false",
"timezone" = "Africa/Abidjan"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"property.security.protocol" = "ssl",
"property.ssl.ca.location" = "FILE:ca.pem",
"property.ssl.certificate.location" = "FILE:client.pem",
"property.ssl.key.location" = "FILE:client.key",
"property.ssl.key.password" = "abcdefg",
"property.client.id" = "my_client_id"
);
- Import JSON in simple mode.
CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
COLUMNS(category,price,author)
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "false",
"format" = "json"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
);
Two JSON data formats are supported:
1){"category":"a9jadhx","author":"test","price":895}
2)[
{"category":"a9jadhx","author":"test","price":895},
{"category":"axdfa1","author":"EvelynWaugh","price":1299}
]
- Precise import of JSON data format:
CREATE TABLE `example_tbl` (
`category` varchar(24) NULL COMMENT "",
`author` varchar(24) NULL COMMENT "",
`timestamp` bigint(20) NULL COMMENT "",
`dt` int(11) NULL COMMENT "",
`price` double REPLACE
) ENGINE=OLAP
AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
COMMENT "OLAP"
PARTITION BY RANGE(`dt`)
(PARTITION p0 VALUES [("-2147483648"), ("20200509")),
PARTITION p20200509 VALUES [("20200509"), ("20200510")),
PARTITION p20200510 VALUES [("20200510"), ("20200511")),
PARTITION p20200511 VALUES [("20200511"), ("20200512")))
DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4
PROPERTIES (
"storage_type" = "COLUMN",
"replication_num" = "1"
);
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
"strip_outer_array" = "true"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
);
JSON data format:
[
{"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},
{"category":"22","author":"2avc","price":895,"timestamp":1589191487},
{"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}
]
Note:
If the JSON data starts with an array and each object in the array is a record, stripis required Outer Array is set to true to flatten the array.
If the JSON data starts with an array and each object in the array is a record, our ROOT node is actually an object in the array when setting jsonpath.
Users specify root node json_ root.
CREATE ROUTINE LOAD example_db.test1 ON example_tbl
COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
PROPERTIES
(
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"strict_mode" = "false",
"format" = "json",
"jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
"strip_outer_array" = "true",
"json_root" = "$.RECORDS"
)
FROM KAFKA
(
"kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
"kafka_topic" = "my_topic",
"kafka_partitions" = "0,1,2",
"kafka_offsets" = "0,0,0"
);
JSON data format:
{
"RECORDS":[
{"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},
{"category":"22","author":"2avc","price":895,"timestamp":1589191487},
{"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}
]
}