Introduction
You can import semi-structured data (for example, JSON) by using stream load or routine load.
Use Scenarios
- Stream Load: For JSON data stored in text files, use stream load to import.
- Routine Load: For JSON data in Kafka, use routine load to import.
Stream Load Import
Sample data:
{ "id": 123, "city" : "beijing"},
{ "id": 456, "city" : "shanghai"},
...
Example:
curl -v --location-trusted -u <username>:<password> \
-H "format: json" -H "jsonpaths: [\"$.id\", \"$.city\"]" \
-T example.json \
http://FE_HOST:HTTP_PORT/api/DATABASE/TABLE/_stream_load
The format: json
parameter allows you to execute the format of the imported data. jsonpaths
is used to execute the corresponding data import path.
Related parameters:
- jsonpaths: Select the JSON path for each column
- json_root: Select the column where the JSON starts to be parsed
- strip_outer_array: Crop the outermost array field
- strict_mode: Strictly filter for column type conversion during import
When the JSON data schema and StarRocks data schema are not exactly the same, modify the Jsonpath
.
Sample data:
{"k1": 1, "k2": 2}
Import example:
curl -v --location-trusted -u <username>:<password> \
-H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" \
-H "columns: k2, tmp_k1, k1 = tmp_k1 * 100" \
-T example.json \
http://127.0.0.1:8030/api/db1/tbl1/_stream_load
The ETL operation of multiplying k1 by 100 is performed during the import, and the column is matched with the original data by Jsonpath
.
The import results are as follows:
+------+------+
| k1 | k2 |
+------+------+
| 100 | 2 |
+------+------+
For missing columns, if the column definition is nullable, then NULL
will be added, or the default value can be added by ifnull
.
Sample data:
[
{"k1": 1, "k2": "a"},
{"k1": 2},
{"k1": 3, "k2": "c"},
]
Import Example-1:
curl -v --location-trusted -u <username>:<password> \
-H "format: json" -H "strip_outer_array: true" \
-T example.json \
http://127.0.0.1:8030/api/db1/tbl1/_stream_load
The import results are as follows:
+------+------+
| k1 | k2 |
+------+------+
| 1 | a |
+------+------+
| 2 | NULL |
+------+------+
| 3 | c |
+------+------+
Import Example-2:
curl -v --location-trusted -u <username>:<password> \
-H "format: json" -H "strip_outer_array: true" \
-H "jsonpaths: [\"$.k1\", \"$.k2\"]" \
-H "columns: k1, tmp_k2, k2 = ifnull(tmp_k2, 'x')" \
-T example.json \
http://127.0.0.1:8030/api/db1/tbl1/_stream_load
The import results are as follows:
+------+------+
| k1 | k2 |
+------+------+
| 1 | a |
+------+------+
| 2 | x |
+------+------+
| 3 | c |
+------+------+
Routine Load Import
Similar to stream load, the message content of Kafka data sources is treated as a complete JSON data.
- If a message contains multiple rows of data in array format, all rows will be imported and Kafka's offset will only be incremented by 1.
- If a JSON in Array format represents multiple rows of data, but the parsing of the JSON fails due to a JSON format error, the error row will only be incremented by 1 (given that the parsing fails, StarRocks cannot actually determine how many rows of data it contains, and can only record the error data as one row).
Use Canal to import StarRocks from MySQL with incremental sync binlogs
Canal is an open-source MySQL binlog synchronization tool from Alibaba, through which we can synchronize MySQL data to Kafka. The data is generated in JSON format in Kafka. Here is a demonstration of how to use routine load to synchronize data in Kafka for incremental data synchronization with MySQL.
- In MySQL we have a data table with the following table creation statement.
CREATE TABLE `query_record` (
`query_id` varchar(64) NOT NULL,
`conn_id` int(11) DEFAULT NULL,
`fe_host` varchar(32) DEFAULT NULL,
`user` varchar(32) DEFAULT NULL,
`start_time` datetime NOT NULL,
`end_time` datetime DEFAULT NULL,
`time_used` double DEFAULT NULL,
`state` varchar(16) NOT NULL,
`error_message` text,
`sql` text NOT NULL,
`database` varchar(128) NOT NULL,
`profile` longtext,
`plan` longtext,
PRIMARY KEY (`query_id`),
KEY `idx_start_time` (`start_time`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8
- Prerequisite: Make sure MySQL has binlog enabled and the format is ROW.
[mysqld]
log-bin=mysql-bin # Enable binlog
binlog-format=ROW # Select ROW mode
server_id=1 # MySQL replication need to be defined, and do not duplicate canal's slaveId
- Create an account and grant privileges to the secondary MySQL server:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
- Then download and install Canal.
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz
mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal