- Introduction
- Quick Start
- Table Design
- Data Loading
- Overview of data loading
- Load data from a local file system or a streaming data source using HTTP push
- Load data from HDFS or cloud storage
- Routine Load
- Spark Load
- Insert Into
- Change data through loading
- Transform data at loading
- Json Loading
- Synchronize data from MySQL
- Load data by using flink-connector-starrocks
- DataX Writer
- Data Export
- Using StarRocks
- Reference
- SQL Reference
- User Account Management
- Cluster Management
- ADMIN CANCEL REPAIR
- ADMIN CHECK TABLET
- ADMIN REPAIR
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- DROP FILE
- INSTALL PLUGIN
- SHOW BACKENDS
- SHOW BROKER
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW TABLE STATUS
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER TABLE
- ALTER VIEW
- BACKUP
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE DATABASE
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- RECOVER
- RESTORE
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- DML
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- RESUME ROUTINE LOAD
- ROUTINE LOAD
- SELECT
- SHOW ALTER
- SHOW BACKUP
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- Data Types
- Auxiliary Commands
- Function Reference
- Java UDFs
- Window Function
- Date Functions
- convert_tz
- curdate
- current_timestamp
- curtime
- datediff
- date_add
- date_format
- date_sub
- date_trunc
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- from_days
- from_unixtime
- hour
- minute
- month
- monthname
- now
- quarter
- second
- str_to_date
- timediff
- timestampadd
- timestampdiff
- to_date
- to_days
- unix_timestamp
- utc_timestamp
- weekofyear
- year
- hours_diff
- minutes_diff
- months_diff
- seconds_diff
- weeks_diff
- years_diff
- Aggregate Functions
- Geographic Functions
- String Functions
- JSON Functions
- Overview of JSON functions and operators
- JSON constructor functions
- JSON query and processing functions
- JSON operators
- Aggregate Functions
- Bitmap Functions
- Array Functions
- cast function
- hash function
- Cryptographic Functions
- Math Functions
- Utility Functions
- System variables
- Error code
- System limits
- SQL Reference
- Administration
- FAQ
- Deploy
- Data Migration
- SQL
- Other FAQs
- Benchmark
- Developers
- Contribute to StarRocks
- Code Style Guides
- Use the debuginfo file for debugging
- Development Environment
- Trace Tools
- Integration
Introduction
You can import semi-structured data (for example, JSON) by using stream load or routine load.
Use Scenarios
- Stream Load: For JSON data stored in text files, use stream load to import.
- Routine Load: For JSON data in Kafka, use routine load to import.
Stream Load Import
Sample data:
{ "id": 123, "city" : "beijing"},
{ "id": 456, "city" : "shanghai"},
...
Example:
curl -v --location-trusted -u root: \
-H "format: json" -H "jsonpaths: [\"$.id\", \"$.city\"]" \
-T example.json \
http://FE_HOST:HTTP_PORT/api/DATABASE/TABLE/_stream_load
The format: json
parameter allows you to execute the format of the imported data. jsonpaths
is used to execute the corresponding data import path.
Related parameters:
- jsonpaths: Select the JSON path for each column
- json_root: Select the column where the JSON starts to be parsed
- strip_outer_array: Crop the outermost array field
- strict_mode: Strictly filter for column type conversion during import
When the JSON data schema and StarRocks data schema are not exactly the same, modify the Jsonpath
.
Sample data:
{"k1": 1, "k2": 2}
Import example:
curl -v --location-trusted -u root: \
-H "format: json" -H "jsonpaths: [\"$.k2\", \"$.k1\"]" \
-H "columns: k2, tmp_k1, k1 = tmp_k1 * 100" \
-T example.json \
http://127.0.0.1:8030/api/db1/tbl1/_stream_load
The ETL operation of multiplying k1 by 100 is performed during the import, and the column is matched with the original data by Jsonpath
.
The import results are as follows:
+------+------+
| k1 | k2 |
+------+------+
| 100 | 2 |
+------+------+
For missing columns, if the column definition is nullable, then NULL
will be added, or the default value can be added by ifnull
.
Sample data:
[
{"k1": 1, "k2": "a"},
{"k1": 2},
{"k1": 3, "k2": "c"},
]
Import Example-1:
curl -v --location-trusted -u root: \
-H "format: json" -H "strip_outer_array: true" \
-T example.json \
http://127.0.0.1:8030/api/db1/tbl1/_stream_load
The import results are as follows:
+------+------+
| k1 | k2 |
+------+------+
| 1 | a |
+------+------+
| 2 | NULL |
+------+------+
| 3 | c |
+------+------+
Import Example-2:
curl -v --location-trusted -u root: \
-H "format: json" -H "strip_outer_array: true" \
-H "jsonpaths: [\"$.k1\", \"$.k2\"]" \
-H "columns: k1, tmp_k2, k2 = ifnull(tmp_k2, 'x')" \
-T example.json \
http://127.0.0.1:8030/api/db1/tbl1/_stream_load
The import results are as follows:
+------+------+
| k1 | k2 |
+------+------+
| 1 | a |
+------+------+
| 2 | x |
+------+------+
| 3 | c |
+------+------+
Routine Load Import
Similar to stream load, the message content of Kafka data sources is treated as a complete JSON data.
- If a message contains multiple rows of data in array format, all rows will be imported and Kafka's offset will only be incremented by 1.
- If a JSON in Array format represents multiple rows of data, but the parsing of the JSON fails due to a JSON format error, the error row will only be incremented by 1 (given that the parsing fails, StarRocks cannot actually determine how many rows of data it contains, and can only record the error data as one row).
Use Canal to import StarRocks from MySQL with incremental sync binlogs
Canal is an open-source MySQL binlog synchronization tool from Alibaba, through which we can synchronize MySQL data to Kafka. The data is generated in JSON format in Kafka. Here is a demonstration of how to use routine load to synchronize data in Kafka for incremental data synchronization with MySQL.
- In MySQL we have a data table with the following table creation statement.
CREATE TABLE `query_record` (
`query_id` varchar(64) NOT NULL,
`conn_id` int(11) DEFAULT NULL,
`fe_host` varchar(32) DEFAULT NULL,
`user` varchar(32) DEFAULT NULL,
`start_time` datetime NOT NULL,
`end_time` datetime DEFAULT NULL,
`time_used` double DEFAULT NULL,
`state` varchar(16) NOT NULL,
`error_message` text,
`sql` text NOT NULL,
`database` varchar(128) NOT NULL,
`profile` longtext,
`plan` longtext,
PRIMARY KEY (`query_id`),
KEY `idx_start_time` (`start_time`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8
- Prerequisite: Make sure MySQL has binlog enabled and the format is ROW.
[mysqld]
log-bin=mysql-bin # Enable binlog
binlog-format=ROW # Select ROW mode
server_id=1 # MySQL replication need to be defined, and do not duplicate canal's slaveId
- Create an account and grant privileges to the secondary MySQL server:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
- Then download and install Canal.
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz
mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal
- Modify the configuration (MySQL related).
$ vi conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,need to change to your own database information
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,need to change to your own database information
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
# Select the name of the table to be synchronized and the partition name of the kafka target.
canal.mq.dynamicTopic=databasename.query_record
canal.mq.partitionHash= databasename.query_record:query_id
- Modify the configuration (Kafka related).
$ vi /usr/local/canal/conf/canal.properties
# Available options: tcp(by default), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq Cluster Configuration: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
canal.mq.servers = 127.0.0.1:6667
canal.mq.retries = 0
# This value can be increased in flagMessage mode, but do not exceed the maximum size of the MQ message.
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# In flatMessage mode, please change this value to a larger value, 50-200 is recommended.
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal's batch size with a default value of 50K. Please do not exceed 1M due to Kafka's maximum message size limit (under 900K)
canal.mq.canalBatchSize = 50
# Timeout of `Canal get`, in milliseconds. Empty indicates unlimited timeout.
canal.mq.canalGetTimeout = 100
# Whether the object is in flat json format
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# Whether Kafka message delivery uses transactions
canal.mq.transaction = false
- Initiation
bin/startup.sh
The corresponding synchronization log is shown in logs/example/example.log
and in Kafka, with the following format:
{
"data": [{
"query_id": "3c7ebee321e94773-b4d79cc3f08ca2ac",
"conn_id": "34434",
"fe_host": "172.26.34.139",
"user": "zhaoheng",
"start_time": "2020-10-19 20:40:10.578",
"end_time": "2020-10-19 20:40:10",
"time_used": "1.0",
"state": "FINISHED",
"error_message": "",
"sql": "COMMIT",
"database": "",
"profile": "",
"plan": ""
}, {
"query_id": "7ff2df7551d64f8e-804004341bfa63ad",
"conn_id": "34432",
"fe_host": "172.26.34.139",
"user": "zhaoheng",
"start_time": "2020-10-19 20:40:10.566",
"end_time": "2020-10-19 20:40:10",
"time_used": "0.0",
"state": "FINISHED",
"error_message": "",
"sql": "COMMIT",
"database": "",
"profile": "",
"plan": ""
}, {
"query_id": "3a4b35d1c1914748-be385f5067759134",
"conn_id": "34440",
"fe_host": "172.26.34.139",
"user": "zhaoheng",
"start_time": "2020-10-19 20:40:10.601",
"end_time": "1970-01-01 08:00:00",
"time_used": "-1.0",
"state": "RUNNING",
"error_message": "",
"sql": " SELECT SUM(length(lo_custkey)), SUM(length(c_custkey)) FROM lineorder_str INNER JOIN customer_str ON lo_custkey=c_custkey;",
"database": "ssb",
"profile": "",
"plan": ""
}],
"database": "center_service_lihailei",
"es": 1603111211000,
"id": 122,
"isDdl": false,
"mysqlType": {
"query_id": "varchar(64)",
"conn_id": "int(11)",
"fe_host": "varchar(32)",
"user": "varchar(32)",
"start_time": "datetime(3)",
"end_time": "datetime",
"time_used": "double",
"state": "varchar(16)",
"error_message": "text",
"sql": "text",
"database": "varchar(128)",
"profile": "longtext",
"plan": "longtext"
},
"old": null,
"pkNames": ["query_id"],
"sql": "",
"sqlType": {
"query_id": 12,
"conn_id": 4,
"fe_host": 12,
"user": 12,
"start_time": 93,
"end_time": 93,
"time_used": 8,
"state": 12,
"error_message": 2005,
"sql": 2005,
"database": 12,
"profile": 2005,
"plan": 2005
},
"table": "query_record",
"ts": 1603111212015,
"type": "INSERT"
}
Add json_root
and strip_outer_array = true
to import data from data
.
create routine load manual.query_job on query_record
columns (query_id,conn_id,fe_host,user,start_time,end_time,time_used,state,error_message,`sql`,`database`,profile,plan)
PROPERTIES (
"format"="json",
"json_root"="$.data",
"desired_concurrent_number"="1",
"strip_outer_array" ="true",
"max_error_number"="1000"
)
FROM KAFKA (
"kafka_broker_list"= "172.26.92.141:9092",
"kafka_topic" = "databasename.query_record"
);
This completes the near real-time synchronization of data from MySQL to StarRocks.
View status and error messages of the import job by show routine load
.