Edit

CREATE ROUTINE LOAD

Description

Routine Load can stream Apache Kafka® events into StarRocks. It is an asynchronuous loading submitted by using MySQL protocol.

Currently, Routine Load supports to load CSV and JSON data from Kafka. As for security measures, Routine Load can connect to Kafka without authentication,encryption and authentication using SSL, as well as authentication using SASL.

This topic introduces CREATE ROUTINE LOAD‘s syntax, parameters and examples.

For scenarios, principle and basic steps, please see Continuously load data from Apache Kafka® .

Syntax

CREATE ROUTINE LOAD [db.]job_name ON tbl_name

[load_properties]

[job_properties]

FROM data_source

[data_source_properties]

Parameters

[db.]job_name

The name of the import job, within the same database, can only have one job running with the same name.

tbl_name

This specify the name of the table you want to import.

load_properties

It is used to describe imported data. Syntax:

[COLUMNS TERMINATED BY '<terminator>'],

[COLUMNS ([<column_name> [, ...] ] [, column_assignment [, ...] ] )],

[WHERE <expr>],

[PARTITION ([ <partition_name> [, ...] ])]



column_assignment:

<column_name> = column_expression
  • column_separator:

You can specify a column separator for the data in the CSV format. For example, specify a comma (,) as a column separator. Default to: \t.

COLUMNS TERMINATED BY ","
  • columns_mapping:

Specifies the mapping relationships of columns in the source data and define how derived columns are generated.

  • Mapped column:

It specifies in order which columns in the source data correspond to which columns in the destination table. For columns you want to skip, you can specify a column name that does not exist. Suppose the destination table has three columns, k1, k2, v1. Source data has four columns, of which columns 1, 2, and 4 correspond to k2, k1, and v1, respectively. Write as follows:

COLUMNS (k2, k1, xxx, v1)

Here, xxx s a non-existent column used to skip the third column in the source data.

  • Derived columns:

A column in the form of col_name = expr, we call a derived column. That is, expr is used to calculate the values of the corresponding columns in the destination table. Derived columns are usually queued after mapped columns, although this is not mandatory, StarRocks always parses the mapped columns before the derived columns. In the next example, suppose the destination table also has column 4, v2, which is generated by the sum of K1 and k2. Can be written as follows:

COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);

For the data in the CSV format, the number of mapping columns in COLUMNS must be consistent with the number of columns in the source data.

  • where_predicates

It is used to specify filter conditions to filter out unnecessary columns. Filter columns can be mapped or derived columns. For example, if we only want to import columns with k1 greater than 100 and k2 equal to 1000, write as follows:

WHERE k1 > 100 and k2 = 1000
  • partitions

It specifies which partitions to import into the destination table. If not specified, it is automatically imported into the corresponding partition.Example:

PARTITION(p1, p2, p3)

job_properties

General parameters used to specify routine import jobs.

Syntax:

PROPERTIES (

    "key1" = "val1",

    "key2" = "val2"

)

We currently support the following parameters:

  • desired_concurrent_number

Expected concurrency. A routine import job is divided into subtasks to execute. This parameter specifies the maximum number of tasks a job can perform simultaneously. Must be greater than 0. Default is 3. This concurrency degree is not the actual concurrency degree. The actual concurrency degree is considered by the number of nodes in the cluster, the load, and the data source. Example:

"desired_concurrent_number" = "3"

  • max_batch_interval

Task scheduling time, which is how often a task executes, defaults to 10s. Task consumption data time is routinein fe.conf Load_Task_Consume_Second, defaults to 3s. Task execution timeout is routine_in fe.conf Load Task_Timeout_Second, defaults to 15s. Example:

"max_batch_interval" = "20"

  • max_error_number/max_batch_rows

Maximum number of error rows allowed in the sampling window. Must be greater than or equal to 0. The default is 0, which means no error lines are allowed. Sampling window is maxBatch_Rows 10, defaulted to (200000 10 = 2000000). That is, if the number of error rows is greater than max_within the sampling window Error Number causes routine operations to be suspended and requires manual intervention to check for data quality issues. Rows filtered by where criteria are not erroneous rows.

  • strict_mode

Whether strict mode is on or off by default. If turned on, column type transformations for non-empty raw data are filtered if the result is NULL. Specify "strict_mode" = "true"

  • timezone

Specify the time zone used by the import job. The default is to use the timezone parameter of Session. This parameter affects the results of all time zone related functions involved in importing.

  • format

Specify the time zone used by the import job. The default is to use the timezone parameter of Session. This parameter affects the results of all time zone related functions involved in importing.

  • jsonpaths

Jsonpaths: There are two ways to import json: simple mode and matching mode. If jsonpath is set, it is a matching mode import, otherwise it is a simple mode import, you can refer to an example.

  • strip_outer_array

Boolean type, true, indicates that the JSON data starts with an array object and flattens it in the array object, defaulting to false.

  • json_root

Json_ Root is a legal jsonpath string that specifies the root node of the JSON document with a default value of''.

data_source

Type of data source. It currently supports: Apache Kafka

data_source_properties

It specifies information about the data source.

Syntax:

(

"key1" = "val1",

"key2" = "val2"

)
  • Apache Kafka
    • kafka_broker_list
kafka's broker connection information. The format is ip:host. Multiple brokers are separated by commas.

Example:



"kafka_broker_list" = "broker1:9092,broker2:9092"
  • kafka_topic
Specify a topic for Kafka to subscribe to.

Example:



"kafka_topic" = "my_topic"
  • kafka_partitions/kafka_offsets
Specify the kafka partition you want to subscribe to, and the starting offset for each of the corresponding partitions.

offset can specify a specific offset greater than or equal to 0, or:



1. OFFSET_BEGINNING: Subscribe from a location with data.

2. OFFSET_END: Subscribe from the end.



Default from OFFSET_if not specified END starts subscribing to all partitions under top.

Example:



"kafka_partitions" = "0,1,2,3",

"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
  • property
specify custom kafka parameters.

The function is equivalent to the"--property" parameter in the kafka shell.

When the value of a parameter is a file, the keyword "FILE:" needs to be added before the value.

For information on how to create a file, see CREATE FILE.

For more supported custom parameters, see the client-side configuration item in librdkafka's official CONFIGURATION document.

Example:

"property.client.id" = "12345",

"property.ssl.ca.location" = "FILE:ca-cert"



1.When connecting to Kafka using SSL, the following parameters need to be specified:

"property.security.protocol" = "ssl",

"property.ssl.ca.location" = "FILE:ca-cert",



Where:

"property.security.protocol" is used to indicate that the connection is SSL.

"property.ssl.ca.location" is used when be accesses kafka, specifying the location of the CA certificate.

If client authentication is turned on on at the Kafka server side, settings are also required:

"property.ssl.certificate.location" = "FILE:client.pem",

"property.ssl.key.location" = "FILE:client.key",

"property.ssl.key.password" = "abcdefg",



Where:

"property.ssl.certificate.location" specifies the location of public key of client.

"property.ssl.key.location" specifies the location of private key of client.

"property.ssl.key.password" specifies the password of private key of client.



2.When connecting Kafka using SASL, the following parameters need to be specified:

"property.security.protocol"="SASL_PLAINTEXT",

"property.sasl.mechanism"="PLAIN",

"property.sasl.username"="admin",

"property.sasl.password"="admin"



Where:

"property.security.protocol" specifies that the protocol is SASL_ PLAINTEXT.

"property.sasl.mechanism" specifies that the authentication method for SASL is PLAIN.

"property.sasl.username" specifies the user name of sasl.

"property.sasl.password" specifies the password for sasl.



3.Specify the default starting offset for Kafka partition

If kafka_is not specified Partitions/kafka_ Offsets, consumes all partitions by default, at which point kafka_can be specified Default_ Offsets specify the starting offset. Default to OFFSET_ END, subscribe from the end.

Value is: 

1.OFFSET_BEGINNING: Subscribe from a location with data.

2.OFFSET_END: Subscribe from the end.

Example:

"property.kafka_default_offsets" = "OFFSET_BEGINNING

Sample Import Data Format

  • Integer Type (TINYINT/SMALLINT/INT/BIGINT/LARGEINT): 1, 1000, 1234
  • Floating Point Type (FLOAT/DOUBLE/DECIMAL): 1.1, 0.23, .356
  • Date and Time Type (DATE/DATETIME): 2017-10-03, 2017-06-13 12:34:03
  • String Type (CHAR/VARCHAR) without quotation marks: I am a student, a
  • NULL Value: \N

Examples

  • Create a kafka routine import task named test1 for example_tbl in example_db. Specify the column separator, group.id and client.id, automatically consume all partitions by default, and subscribe from a location with data (OFFSET_BEGINNING).
CREATE ROUTINE LOAD example_db.test1 ON example_tbl

COLUMNS TERMINATED BY ",",

COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)

PROPERTIES

(

    "desired_concurrent_number"="3",

    "max_batch_interval" = "20",

    "strict_mode" = "false"

)

FROM KAFKA

(

    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",

    "kafka_topic" = "my_topic",

    "property.group.id" = "xxx",

    "property.client.id" = "xxx",

    "property.kafka_default_offsets" = "OFFSET_BEGINNING"

);
  • Create a kafka routine import task named test1 for example_tbl in example_db. The import task is in strict mode.
CREATE ROUTINE LOAD example_db.test1 ON example_tbl

COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),

WHERE k1 > 100 and k2 like "%starrocks%"

PROPERTIES

(

    "desired_concurrent_number"="3",

    "max_batch_interval" = "20",

    "strict_mode" = "true"

)

FROM KAFKA

(

    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",

    "kafka_topic" = "my_topic",

    "kafka_partitions" = "0,1,2,3",

    "kafka_offsets" = "101,0,0,200"

);
  • Import data from the Kafka cluster using SSL authentication. The client.id parameter is also set. Import task is in non-strict mode and time zone is Africa/Abidjan.
CREATE ROUTINE LOAD example_db.test1 ON example_tbl

COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),

WHERE k1 > 100 and k2 like "%starrocks%"

PROPERTIES

(

    "desired_concurrent_number"="3",

    "max_batch_interval" = "20",

    "strict_mode" = "false",

    "timezone" = "Africa/Abidjan"

)

FROM KAFKA

(

    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",

    "kafka_topic" = "my_topic",

    "property.security.protocol" = "ssl",

    "property.ssl.ca.location" = "FILE:ca.pem",

    "property.ssl.certificate.location" = "FILE:client.pem",

    "property.ssl.key.location" = "FILE:client.key",

    "property.ssl.key.password" = "abcdefg",

    "property.client.id" = "my_client_id"

);
  • Import JSON in simple mode.
CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1

COLUMNS(category,price,author)

PROPERTIES

(

    "desired_concurrent_number"="3",

    "max_batch_interval" = "20",

    "strict_mode" = "false",

    "format" = "json"

)

FROM KAFKA

(

    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",

    "kafka_topic" = "my_topic",

    "kafka_partitions" = "0,1,2",

    "kafka_offsets" = "0,0,0"

);

Two JSON data formats are supported:

1){"category":"a9jadhx","author":"test","price":895}

2[

{"category":"a9jadhx","author":"test","price":895},

{"category":"axdfa1","author":"EvelynWaugh","price":1299}

]
  • Precise import of JSON data format:
CREATE TABLE `example_tbl` (

`category` varchar(24) NULL COMMENT "",

`author` varchar(24) NULL COMMENT "",

`timestamp` bigint(20) NULL COMMENT "",

`dt` int(11) NULL COMMENT "",

`price` double REPLACE

) ENGINE=OLAP

AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)

COMMENT "OLAP"

PARTITION BY RANGE(`dt`)

(PARTITION p0 VALUES [("-2147483648"), ("20200509")),

PARTITION p20200509 VALUES [("20200509"), ("20200510")),

PARTITION p20200510 VALUES [("20200510"), ("20200511")),

PARTITION p20200511 VALUES [("20200511"), ("20200512")))

DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4

PROPERTIES (

    "storage_type" = "COLUMN",

    "replication_num" = "1"

);



CREATE ROUTINE LOAD example_db.test1 ON example_tbl

COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))

PROPERTIES

(

    "desired_concurrent_number"="3",

    "max_batch_interval" = "20",

    "strict_mode" = "false",

    "format" = "json",

    "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",

    "strip_outer_array" = "true"

)

FROM KAFKA

(

    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",

    "kafka_topic" = "my_topic",

    "kafka_partitions" = "0,1,2",

    "kafka_offsets" = "0,0,0"

);

JSON data format:

[

{"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},

{"category":"22","author":"2avc","price":895,"timestamp":1589191487},

{"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}

]

Note:

  • If the JSON data starts with an array and each object in the array is a record, stripis required Outer Array is set to true to flatten the array.

  • If the JSON data starts with an array and each object in the array is a record, our ROOT node is actually an object in the array when setting jsonpath.

  • Users specify root node json_ root.

CREATE ROUTINE LOAD example_db.test1 ON example_tbl

COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))

PROPERTIES

(

    "desired_concurrent_number"="3",

    "max_batch_interval" = "20",

    "strict_mode" = "false",

    "format" = "json",

    "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",

    "strip_outer_array" = "true",

    "json_root" = "$.RECORDS"

)

FROM KAFKA

(

    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",

    "kafka_topic" = "my_topic",

    "kafka_partitions" = "0,1,2",

    "kafka_offsets" = "0,0,0"

);

JSON data format:

{

"RECORDS":[

{"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},

{"category":"22","author":"2avc","price":895,"timestamp":1589191487},

{"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}

]

}