- StarRocks
- Introduction to StarRocks
- Quick Start
- Deployment
- Deployment overview
- Prepare
- Deploy
- Deploy classic StarRocks
- Deploy and use shared-data StarRocks
- Manage
- Table Design
- Data Loading
- Concepts
- Overview of data loading
- Load data from a local file system or a streaming data source using HTTP PUT
- Load data from HDFS or cloud storage
- Continuously load data from Apache KafkaĀ®
- Bulk load using Apache Sparkā¢
- Load data using INSERT
- Load data using Stream Load transaction interface
- Realtime synchronization from MySQL
- Continuously load data from Apache FlinkĀ®
- Change data through loading
- Transform data at loading
- Data Unloading
- Query Data Sources
- Query Acceleration
- Gather CBO statistics
- Synchronous materialized view
- Asynchronous materialized view
- Colocate Join
- Lateral Join
- Query Cache
- Index
- Computing the Number of Distinct Values
- Sorted streaming aggregate
- Administration
- Management
- Data recovery
- User Privilege and Authentication
- Performance Tuning
- Reference
- SQL Reference
- User Account Management
- Cluster Management
- ADD SQLBLACKLIST
- ADMIN CANCEL REPAIR TABLE
- ADMIN CHECK TABLET
- ADMIN REPAIR TABLE
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- CREATE RESOURCE GROUP
- DELETE SQLBLACKLIST
- DROP FILE
- DROP RESOURCE GROUP
- EXPLAIN
- INSTALL PLUGIN
- KILL
- SET
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROC
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW SQLBLACKLIST
- SHOW TABLE STATUS
- SHOW VARIABLES
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ALTER RESOURCE
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE DATABASE
- CREATE EXTERNAL CATALOG
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP ANALYZE
- DROP STATS
- DROP CATALOG
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SET CATALOG
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW META
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- CANCEL REFRESH MATERIALIZED VIEW
- CREATE ROUTINE LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- RESUME ROUTINE LOAD
- SELECT
- SHOW ALTER TABLE
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE CATALOG
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEWS
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- SUBMIT TASK
- UPDATE
- Auxiliary Commands
- Data Types
- Keywords
- AUTO_INCREMENT
- Function Reference
- Java UDFs
- Window functions
- Lambda expression
- Aggregate Functions
- array_agg
- avg
- any_value
- approx_count_distinct
- bitmap
- bitmap_agg
- count
- grouping
- grouping_id
- hll_empty
- hll_hash
- hll_raw_agg
- hll_union
- hll_union_agg
- max
- max_by
- min
- multi_distinct_sum
- multi_distinct_count
- percentile_approx
- percentile_cont
- percentile_disc
- retention
- stddev
- stddev_samp
- sum
- variance, variance_pop, var_pop
- var_samp
- window_funnel
- Array Functions
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- arrays_overlap
- array_to_bitmap
- cardinality
- element_at
- reverse
- unnest
- Bit Functions
- Bitmap Functions
- base64_to_bitmap
- bitmap_agg
- bitmap_and
- bitmap_andnot
- bitmap_contains
- bitmap_count
- bitmap_from_string
- bitmap_empty
- bitmap_has_any
- bitmap_hash
- bitmap_intersect
- bitmap_max
- bitmap_min
- bitmap_or
- bitmap_remove
- bitmap_to_array
- bitmap_to_base64
- bitmap_to_string
- bitmap_union
- bitmap_union_count
- bitmap_union_int
- bitmap_xor
- intersect_count
- sub_bitmap
- to_bitmap
- JSON Functions
- Overview of JSON functions and operators
- JSON operators
- JSON constructor functions
- JSON query and processing functions
- Map Functions
- Binary Functions
- Conditional Functions
- Cryptographic Functions
- Date Functions
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_add
- hours_diff
- hours_sub
- microseconds_add
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- months_sub
- now
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_date
- to_days
- unix_timestamp
- utc_timestamp
- week
- week_iso
- weekofyear
- weeks_add
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- Geographic Functions
- Math Functions
- String Functions
- append_trailing_char_if_absent
- ascii
- char
- char_length
- character_length
- concat
- concat_ws
- ends_with
- find_in_set
- group_concat
- hex
- hex_decode_binary
- hex_decode_string
- instr
- lcase
- left
- length
- locate
- lower
- lpad
- ltrim
- money_format
- null_or_empty
- parse_url
- repeat
- replace
- reverse
- right
- rpad
- rtrim
- space
- split
- split_part
- starts_with
- strleft
- strright
- substring
- trim
- ucase
- unhex
- upper
- Pattern Matching Functions
- Percentile Functions
- Scalar Functions
- Utility Functions
- cast function
- hash function
- System variables
- User-defined variables
- Error code
- System limits
- SQL Reference
- FAQ
- Benchmark
- Developers
- Contribute to StarRocks
- Code Style Guides
- Use the debuginfo file for debugging
- Development Environment
- Trace Tools
- Integration
Load data from a local file system or a streaming data source using HTTP PUT
StarRocks provides the loading method HTTP-based Stream Load to help you load data from a local file system or a streaming data source.
Stream Load runs in synchronous loading mode. After you submit a load job, StarRocks synchronously runs the job, and returns the result of the job after the job finishes. You can determine whether the job is successful based on the job result.
Stream Load is suitable for the following business scenarios:
Load a local data file.
In most cases, we recommend that you use curl to submit a load job, which is run to load the data of a local data file into StarRocks.
Load streaming data.
In most cases, we recommend that you use programs such as Apache FlinkĀ® to submit a load job, within which a series of tasks can be generated to continuously load streaming data in real time into StarRocks.
Stream Load supports data transformation at data loading and supports data changes made by UPSERT and DELETE operations during data loading. For more information, see Transform data at loading and Change data through loading.
NOTE
After you load data into a StarRocks table by using Stream Load, the data of the materialized views that are created on that table is also updated.
Supported data file formats
Stream Load supports the following data file formats:
CSV
JSON
You can use the streaming_load_max_mb
parameter to specify the maximum size of each data file you want to load. The default maximum size is 10 GB. We recommend that you retain the default value of this parameter. For more information, see the "Parameter configurations" section of this topic.
NOTE
For CSV data, take note of the following points:
- You can use a UTF-8 string, such as a comma (,), tab, or pipe (|), whose length does not exceed 50 bytes as a text delimiter.
- Null values are denoted by using
\N
. For example, a data file consists of three columns, and a record from that data file holds data in the first and third columns but no data in the second column. In this situation, you need to use\N
in the second column to denote a null value. This means the record must be compiled asa,\N,b
instead ofa,,b
.a,,b
denotes that the second column of the record holds an empty string.
Limits
Stream Load does not support loading the data of a CSV file that contains a JSON-formatted column.
How it works
You can submit a load request on your client to an FE according to HTTP, and the FE then uses an HTTP redirect to forward the load request to a specific BE. You can also directly submit a load request on your client to a BE of your choice.
NOTE
If you submit load requests to an FE, the FE uses a polling mechanism to decide which BE will serve as a coordinator to receive and process the load requests. The polling mechanism helps achieve load balancing within your StarRocks cluster. Therefore, we recommend that you send load requests to an FE.
The BE that receives the load request runs as the Coordinator BE to split data based on the used schema into portions and assign each portion of the data to the other involved BEs. After the load finishes, the Coordinator BE returns the result of the load job to your client. Note that if you stop the Coordinator BE during the load, the load job fails.
The following figure shows the workflow of a Stream Load job.
Load a local data file
Create a load job
This section uses curl as an example to describe how to load the data of a CSV or JSON file from your local file system into StarRocks. For detailed syntax and parameter descriptions, see STREAM LOAD.
Note that in StarRocks some literals are used as reserved keywords by the SQL language. Do not directly use these keywords in SQL statements. If you want to use such a keyword in an SQL statement, enclose it in a pair of backticks (`). See Keywords.
Load CSV data
Data examples
In your StarRocks database
test_db
, create a table namedtable1
that uses the Primary Key table. The table consists of three columns:id
,name
, andscore
, of whichid
is the primary key.MySQL [test_db]> CREATE TABLE `table1` ( `id` int(11) NOT NULL COMMENT "user ID", `name` varchar(65533) NULL COMMENT "user name", `score` int(11) NOT NULL COMMENT "user score" ) ENGINE=OLAP PRIMARY KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 10;
In your local file system, create a CSV file named
example1.csv
. The file consists of three columns, which represent the user ID, user name, and user score in sequence.1,Lily,23 2,Rose,23 3,Alice,24 4,Julia,25
Load data
Run the following command to load the data of example1.csv
into table1
:
curl --location-trusted -u <username>:<password> -H "label:123" \
-H "Expect:100-continue" \
-H "column_separator:," \
-H "Expect:100-continue" \
-H "columns: id, name, score" \
-T example1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load
example1.csv
consists of three columns, which are separated by commas (,) and can be mapped in sequence onto the id
, name
, and score
columns of table1
. Therefore, you need to use the column_separator
parameter to specify the comma (,) as the column separator. You also need to use the columns
parameter to temporarily name the three columns of example1.csv
as id
, name
, and score
, which are mapped in sequence onto the three columns of table1
.
Query data
After the load is complete, query the data of table1
to verify that the load is successful:
MySQL [test_db]> SELECT * FROM table1;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 1 | Lily | 23 |
| 2 | Rose | 23 |
| 3 | Alice | 24 |
| 4 | Julia | 25 |
+------+-------+-------+
4 rows in set (0.00 sec)
Load JSON data
Data examples
In your StarRocks database
test_db
, create a table namedtable2
that uses the Primary Key table. The table consists of two columns:id
andcity
, of whichid
is the primary key.MySQL [test_db]> CREATE TABLE `table2` ( `id` int(11) NOT NULL COMMENT "city ID", `city` varchar(65533) NULL COMMENT "city name" ) ENGINE=OLAP PRIMARY KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 10;
In your local file system, create a JSON file named
example2.json
. The file consists of two columns, which represent city ID and city name in sequence.{"name": "Beijing", "code": 2}
Load data
Run the following command to load the data of example2.json
into table2
:
curl -v --location-trusted -u <username>:<password> -H "strict_mode: true" \
-H "Expect:100-continue" \
-H "format: json" -H "jsonpaths: [\"$.name\", \"$.code\"]" \
-H "columns: city,tmp_id, id = tmp_id * 100" \
-T example2.json -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table2/_stream_load
example2.json
consists of two keys, name
and code
, which are mapped onto the id
and city
columns of table2
, as shown in the following figure.
The mappings shown in the preceding figure are described as follows:
StarRocks extracts the
name
andcode
keys ofexample2.json
and maps them onto thename
andcode
fields declared in thejsonpaths
parameter.StarRocks extracts the
name
andcode
fields declared in thejsonpaths
parameter and maps them in sequence onto thecity
andtmp_id
fields declared in thecolumns
parameter.StarRocks extracts the
city
andtmp_id
fields declared in thecolumns
parameter and maps them by name onto thecity
andid
columns oftable2
.
NOTE
In the preceding example, the value of
code
inexample2.json
is multiplied by 100 before it is loaded into theid
column oftable2
.
For detailed mappings between jsonpaths
, columns
, and the columns of the StarRocks table, see the "Column mappings" section in STREAM LOAD.
Query data
After the load is complete, query the data of table2
to verify that the load is successful:
MySQL [test_db]> SELECT * FROM table2;
+------+--------+
| id | city |
+------+--------+
| 200 | Beijing|
+------+--------+
4 rows in set (0.01 sec)
View a load job
After a load job is complete, StarRocks returns the result of the job in JSON format. For more information, see the "Return value" section in STREAM LOAD.
Stream Load does not allow you to query the result of a load job by using the SHOW LOAD statement.
Cancel a load job
Stream Load does not allow you to cancel a load job. If a load job times out or encounters errors, StarRocks automatically cancels the job.
Load streaming data
Stream Load allows you to load streaming data into StarRocks in real time by using programs. For more information, see the following topics:
For information about how to run Stream Load jobs by using Flink, see Load data by using flink-connector-starrocks.
For information about how to run Stream Load jobs by using Java programs, visit https://github.com/StarRocks/demo/MiscDemo/stream_load.
For information about how to run Stream Load jobs by using Apache Sparkā¢, see 01_sparkStreaming2StarRocks.
Parameter configurations
This section describes some system parameters that you need to configure if you choose the loading method Stream Load. These parameter configurations take effect on all Stream Load jobs.
streaming_load_max_mb
: the maximum size of each data file you want to load. The default maximum size is 10 GB. For more information, see BE configuration items.We recommend that you do not load more than 10 GB of data at a time. If the size of a data file exceeds 10 GB, we recommend that you split the data file into small files that each are less than 10 GB in size and then load these files one by one. If you cannot split a data file greater than 10 GB, you can increase the value of this parameter based on the file size.
After you increase the value of this parameter, the new value can take effect only after you restart the BEs of your StarRocks cluster. Additionally, system performance may deteriorate, and the costs of retries in the event of load failures also increase.
NOTE
When you load the data of a JSON file, take note of the following points:
The size of each JSON object in the file cannot exceed 4 GB. If any JSON object in the file exceeds 4 GB, StarRocks throws an error "This parser can't support a document that big."
By default, the JSON body in an HTTP request cannot exceed 100 MB. If the JSON body exceeds 100 MB, StarRocks throws an error "The size of this batch exceed the max size [104857600] of json type data data [8617627793]. Set ignore_json_size to skip check, although it may lead huge memory consuming." To prevent this error, you can add
"ignore_json_size:true"
in the HTTP request header to ignore the check on the JSON body size.
stream_load_default_timeout_second
: the timeout period of each load job. The default timeout period is 600 seconds. For more information, see FE configuration items.If many of the load jobs that you create time out, you can increase the value of this parameter based on the calculation result that you obtain from the following formula:
Timeout period of each load job > Amount of data to be loaded/Average loading speed
For example, if the size of the data file that you want to load is 10 GB and the average loading speed of your StarRocks cluster is 100 MB/s, set the timeout period to more than 100 seconds.
NOTE
Average loading speed in the preceding formula is the average loading speed of your StarRocks cluster. It varies depending on the disk I/O and the number of BEs in your StarRocks cluster.
Stream Load also provides the
timeout
parameter, which allows you to specify the timeout period of an individual load job. For more information, see STREAM LOAD.
Usage notes
If a field is missing for a record in the data file you want to load and the column onto which the field is mapped in your StarRocks table is defined as NOT NULL
, StarRocks automatically fills a NULL
value in the mapping column of your StarRocks table during the load of the record. You can also use the ifnull()
function to specify the default value that you want to fill.
For example, if the field that represents city ID in the preceding example2.json
file is missing and you want to fill an x
value in the mapping column of table2
, you can specify "columns: city, tmp_id, id = ifnull(tmp_id, 'x')"
.