- StarRocks
- Introduction to StarRocks
- Quick Start
- Deployment
- Deployment overview
- Prepare
- Deploy
- Deploy classic StarRocks
- Deploy and use shared-data StarRocks
- Manage
- Table Design
- Data Loading
- Concepts
- Overview of data loading
- Load data from a local file system or a streaming data source using HTTP PUT
- Load data from HDFS or cloud storage
- Continuously load data from Apache Kafka®
- Bulk load using Apache Spark™
- Load data using INSERT
- Load data using Stream Load transaction interface
- Realtime synchronization from MySQL
- Continuously load data from Apache Flink®
- Change data through loading
- Transform data at loading
- Data Unloading
- Query Data Sources
- Query Acceleration
- Gather CBO statistics
- Synchronous materialized view
- Asynchronous materialized view
- Colocate Join
- Lateral Join
- Query Cache
- Index
- Computing the Number of Distinct Values
- Sorted streaming aggregate
- Administration
- Management
- Data recovery
- User Privilege and Authentication
- Performance Tuning
- Reference
- SQL Reference
- User Account Management
- Cluster Management
- ADD SQLBLACKLIST
- ADMIN CANCEL REPAIR TABLE
- ADMIN CHECK TABLET
- ADMIN REPAIR TABLE
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- CREATE RESOURCE GROUP
- DELETE SQLBLACKLIST
- DROP FILE
- DROP RESOURCE GROUP
- EXPLAIN
- INSTALL PLUGIN
- KILL
- SET
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROC
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW SQLBLACKLIST
- SHOW TABLE STATUS
- SHOW VARIABLES
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ALTER RESOURCE
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE DATABASE
- CREATE EXTERNAL CATALOG
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP ANALYZE
- DROP STATS
- DROP CATALOG
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SET CATALOG
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW META
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- CANCEL REFRESH MATERIALIZED VIEW
- CREATE ROUTINE LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- RESUME ROUTINE LOAD
- SELECT
- SHOW ALTER TABLE
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE CATALOG
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEWS
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- SUBMIT TASK
- UPDATE
- Auxiliary Commands
- Data Types
- Keywords
- AUTO_INCREMENT
- Function Reference
- Java UDFs
- Window functions
- Lambda expression
- Aggregate Functions
- array_agg
- avg
- any_value
- approx_count_distinct
- bitmap
- bitmap_agg
- count
- grouping
- grouping_id
- hll_empty
- hll_hash
- hll_raw_agg
- hll_union
- hll_union_agg
- max
- max_by
- min
- multi_distinct_sum
- multi_distinct_count
- percentile_approx
- percentile_cont
- percentile_disc
- retention
- stddev
- stddev_samp
- sum
- variance, variance_pop, var_pop
- var_samp
- window_funnel
- Array Functions
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- arrays_overlap
- array_to_bitmap
- cardinality
- element_at
- reverse
- unnest
- Bit Functions
- Bitmap Functions
- base64_to_bitmap
- bitmap_agg
- bitmap_and
- bitmap_andnot
- bitmap_contains
- bitmap_count
- bitmap_from_string
- bitmap_empty
- bitmap_has_any
- bitmap_hash
- bitmap_intersect
- bitmap_max
- bitmap_min
- bitmap_or
- bitmap_remove
- bitmap_to_array
- bitmap_to_base64
- bitmap_to_string
- bitmap_union
- bitmap_union_count
- bitmap_union_int
- bitmap_xor
- intersect_count
- sub_bitmap
- to_bitmap
- JSON Functions
- Overview of JSON functions and operators
- JSON operators
- JSON constructor functions
- JSON query and processing functions
- Map Functions
- Binary Functions
- Conditional Functions
- Cryptographic Functions
- Date Functions
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_add
- hours_diff
- hours_sub
- microseconds_add
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- months_sub
- now
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_date
- to_days
- unix_timestamp
- utc_timestamp
- week
- week_iso
- weekofyear
- weeks_add
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- Geographic Functions
- Math Functions
- String Functions
- append_trailing_char_if_absent
- ascii
- char
- char_length
- character_length
- concat
- concat_ws
- ends_with
- find_in_set
- group_concat
- hex
- hex_decode_binary
- hex_decode_string
- instr
- lcase
- left
- length
- locate
- lower
- lpad
- ltrim
- money_format
- null_or_empty
- parse_url
- repeat
- replace
- reverse
- right
- rpad
- rtrim
- space
- split
- split_part
- starts_with
- strleft
- strright
- substring
- trim
- ucase
- unhex
- upper
- Pattern Matching Functions
- Percentile Functions
- Scalar Functions
- Utility Functions
- cast function
- hash function
- System variables
- User-defined variables
- Error code
- System limits
- SQL Reference
- FAQ
- Benchmark
- Developers
- Contribute to StarRocks
- Code Style Guides
- Use the debuginfo file for debugging
- Development Environment
- Trace Tools
- Integration
Overview of data loading
Data loading is the process of cleansing and transforming raw data from various data sources based on your business requirements and loading the resulting data into StarRocks to facilitate blazing-fast data analytics.
You can load data into StarRocks by running load jobs. Each load job has a unique label that is specified by the user or automatically generated by StarRocks to identify the job. Each label can be used only for one load job. After a load job is complete, its label cannot be reused for any other load jobs. Only the labels of failed load jobs can be reused. This mechanism helps ensure that the data associated with a specific label can be loaded only once, thus implementing At-Most-Once semantics.
All the loading methods provided by StarRocks can guarantee atomicity. Atomicity means that the qualified data within a load job must be all successfully loaded or none of the qualified data is successfully loaded. It never happens that some of the qualified data is loaded while the other data is not. Note that the qualified data does not include the data that is filtered out due to quality issues such as data type conversion errors.
StarRocks supports two communication protocols that can be used to submit load jobs: MySQL and HTTP. For more information about the protocol supported by each loading method, see the "Loading methods" section of this topic.
Supported data types
StarRocks supports loading data of all data types. You only need to take note of the limits on the loading of a few specific data types. For more information, see Data types.
Loading modes
StarRocks supports two loading modes: synchronous loading mode and asynchronous loading mode.
NOTE
If you load data by using external programs, you must choose a loading mode that best suits your business requirements before you decide the loading method of your choice.
Synchronous loading
In synchronous loading mode, after you submit a load job, StarRocks synchronously runs the job to load data, and returns the result of the job after the job finishes. You can check whether the job is successful based on the job result.
StarRocks provides two loading methods that support synchronous loading: Stream Load and INSERT.
The process of synchronous loading is as follows:
Create a load job.
View the job result returned by StarRocks.
Check whether the job is successful based on the job result. If the job result indicates a load failure, you can retry the job.
Asynchronous loading
In asynchronous loading mode, after you submit a load job, StarRocks immediately returns the job creation result.
If the result indicates a job creation success, StarRocks asynchronously runs the job. However, that does not mean that the data has been successfully loaded. You must use statements or commands to check the status of the job. Then, you can determine whether the data is successfully loaded based on the job status.
If the result indicates a job creation failure, you can determine whether you need to retry the job based on the failure information.
StarRocks provides three loading methods that support asynchronous loading: Broker Load, Routine Load, and Spark Load.
The process of asynchronous loading is as follows:
Create a load job.
View the job creation result returned by StarRocks and determine whether the job is successfully created. a. If the job creation succeeds, go to Step 3. b. If the job creation fails, return to Step 1.
Use statements or commands to check the status of the job until the job status shows FINISHED or CANCELLED.
The workflow of a Broker Load or Spark Load job consists of five stages, as shown in the following figure.
The workflow is described as follows:
PENDING
The job is in queue waiting to be scheduled by an FE.
ETL
The FE pre-processes the data, including cleansing, partitioning, sorting, and aggregation.
NOTE
If the job is a Broker Load job, this stage is directly finished.
LOADING
The FE cleanses and transforms the data, and then sends the data to the BEs. After all data is loaded, the data is in queue waiting to take effect. At this time, the status of the job remains LOADING.
FINISHED
When all data takes effect, the status of the job becomes FINISHED. At this time, the data can be queried. FINISHED is a final job state.
CANCELLED
Before the status of the job becomes FINISHED, you can cancel the job at any time. Additionally, StarRocks can automatically cancel the job in case of load errors. After the job is canceled, the status of the job becomes CANCELLED. CANCELLED is also a final job state.
The workflow of a Routine job is described as follows:
The job is submitted to an FE from a MySQL client.
The FE splits the job into multiple tasks. Each task is engineered to load data from multiple partitions.
The FE distributes the tasks to specified BEs.
The BEs execute the tasks, and report to the FE after they finish the tasks.
The FE generates subsequent tasks, retries failed tasks if there are any, or suspends task scheduling based on the reports from the BEs.
Loading methods
StarRocks provides five loading methods to help you load data in various business scenarios: Stream Load, Broker Load, Routine Load, Spark Load, and INSERT.
Loading method | Protocol | Business scenario | Data volume per load job | Data source | Data file format | Loading mode |
---|---|---|---|---|---|---|
Stream Load | HTTP | Load data files from local file systems or load data streams by using programs. | 10 GB or less |
|
| Synchronous |
Broker Load | MySQL | Load data from HDFS or cloud storage. | Dozens of GB to hundreds of GB |
|
| Asynchronous |
Routine Load | MySQL | Load data in real time from Apache Kafka®. | MBs to GBs of data as mini-batches | Kafka |
| Asynchronous |
Spark Load | MySQL |
| Dozens of GB to TBs |
|
| Asynchronous |
INSERT INTO SELECT | MySQL |
| Not fixed (The data volume varies based on the memory size.) |
| StarRocks tables | Synchronous |
INSERT INTO VALUES | MySQL |
| In small quantities |
| SQL | Synchronous |
You can determine the loading method of your choice based on your business scenario, data volume, data source, data file format, and loading frequency. Additionally, take note of the following points when you select a loading method:
If you load data from Kafka and the data requires multi-table joins and extract, transform and load (ETL), you can use Apache Flink® to pre-process the data and then use the flink-connector-starrocks plug-in to perform a Stream Load job to load the data into StarRocks.
If you load data from Hive, you can use Broker Load or Spark Load to load the data. However, we recommend that you create an external Hive table and then use the INSERT INTO SELECT statement to load the data into the external Hive table.
If you load data from MySQL databases, you can use starrockswriter to load the data. However, we recommend that you create an external MySQL table and then load the data into the external MySQL table.
If you load data from other data sources such as Oracle and PostgreSQL, we recommend that you use starrockswriter.
The following figure provides an overview of various data sources supported by StarRocks and the loading methods that you can use to load data from these data sources.
Memory limits
StarRocks provides parameters for you to limit the memory usage for each load job, thereby reducing memory consumption, especially in high concurrency scenarios. However, do not specify an excessively low memory usage limit. If the memory usage limit is excessively low, data may be frequently flushed from memory to disk because the memory usage for load jobs reaches the specified limit. We recommend that you specify a proper memory usage limit based on your business scenario.
The parameters that are used to limit memory usage vary for each loading method. For more information, see Stream Load, Broker Load, Routine Load, Spark Load, and INSERT. Note that a load job usually runs on multiple BEs. Therefore, the parameters limit the memory usage of each load job on each involved BE rather than the total memory usage of the load job on all involved BEs.
StarRocks also provides parameters for you to limit the total memory usage of all load jobs that run on each individual BE. For more information, see the "System configurations" section of this topic.
Usage notes
Automatically fill in the destination column while loading
When you load data, you can choose not to load the data from a specific field of your data file:
If you have specified the
DEFAULT
keyword for the destination StarRocks table column mapping the source field when you create the StarRocks table, StarRocks automatically fills the specified default value into the destination column.Stream Load, Broker Load, Routine Load, and INSERT supports
DEFAULT current_timestamp
,DEFAULT <default_value>
, andDEFAULT (<expression>)
. Spark Load supports onlyDEFAULT current_timestamp
andDEFAULT <default_value>
.NOTE
DEFAULT (<expression>)
supports only the functionsuuid()
anduuid_numeric()
.If you did not specify the
DEFAULT
keyword for the destination StarRocks table column mapping the source field when you create the StarRocks table, StarRocks automatically fillsNULL
into the destination column.NOTE
If the destination column is defined as
NOT NULL
, the load fails.For Stream Load, Broker Load, Routine Load, and Spark Load, you can also specify the value you want to fill in the destination column by using the parameter that is used to specify column mapping.
For information about the usage of NOT NULL
and DEFAULT
, see CREATE TABLE.
Set write quorum for data loading
If your StarRocks cluster has multiple data replicas, you can set different write quorum for tables, that is, how many replicas are required to return loading success before StarRocks can determine the loading task is successful. You can specify write quorum by adding the property write_quorum
when you CREATE TABLE, or add this property to an existing table using ALTER TABLE. This property is supported from v2.5.
System configurations
This section describes some parameter configurations that are applicable to all of the loading methods provided by StarRocks.
FE configurations
You can configure the following parameters in the configuration file fe.conf of each FE:
max_load_timeout_second
andmin_load_timeout_second
These parameters specify the maximum timeout period and minimum timeout period of each load job. The timeout periods are measured in seconds. The default maximum timeout period spans 3 days, and the default minimum timeout period spans 1 second. The maximum timeout period and minimum timeout period that you specify must fall within the range of 1 second to 3 days. These parameters are valid for both synchronous load jobs and asynchronous load jobs.
desired_max_waiting_jobs
This parameter specifies the maximum number of load jobs that can be held waiting in queue. The default value is 1024 (100 in v2.4 and earlier, and 1024 in v2.5 and later). When the number of load jobs in the PENDING state on an FE reaches the maximum number that you specify, the FE rejects new load requests. This parameter is valid only for asynchronous load jobs.
max_running_txn_num_per_db
This parameter specifies the maximum number of ongoing load transactions that are allowed in each database of your StarRocks cluster. A load job can contain one or more transactions. The default value is 100. When the number of load transactions running in a database reaches the maximum number that you specify, the subsequent load jobs that you submit are not scheduled. In this situation, if you submit a synchronous load job, the job is rejected. If you submit an asynchronous load job, the job is held waiting in queue.
NOTE
StarRocks counts all load jobs together and does not distinguish between synchronous load jobs and asynchronous load jobs.
label_keep_max_second
This parameter specifies the retention period of the history records for load jobs that have finished and are in the FINISHED or CANCELLED state. The default retention period spans 3 days. This parameter is valid for both synchronous load jobs and asynchronous load jobs.
BE configurations
You can configure the following parameters in the configuration file be.conf of each BE:
push_write_mbytes_per_sec
This parameter specifies the maximum write speed per tablet. The default value is 10 MB/s. In real-world business scenarios, the maximum write speed usually ranges from 10 MB/s to 30 MB/s depending on the schema used. You can adjust the value of this parameter to control the data loading speed.
write_buffer_size
This parameter specifies the maximum memory block size. The default size is 100 MB. The loaded data is first written to a memory block on the BE. When the amount of data that is loaded reaches the maximum memory block size that you specify, the data is flushed to disk. You must specify a proper maximum memory block size based on your business scenario.
- If the maximum memory block size is exceedingly small, a large number of small files may be generated on the BE. In this case, query performance degrades. You can increase the maximum memory block size to reduce the number of files generated.
- If the maximum memory block size is exceedingly large, remote procedure calls (RPCs) may time out. In this case, you can adjust the value of this parameter together with the value of the
tablet_writer_rpc_timeout_sec
parameter.
streaming_load_rpc_max_alive_time_sec
The waiting timeout period for each Writer process. The default value is 600 seconds. During the data loading process, StarRocks starts a Writer process to receive data from and write data to each tablet. If a Writer process does not receive any data within the waiting timeout period that you specify, StarRocks stops the Writer process. When your StarRocks cluster processes data at low speeds, a Writer process may not receive the next batch of data within a long period of time and therefore reports a "TabletWriter add batch with unknown id" error. In this case, you can increase the value of this parameter.
load_process_max_memory_limit_bytes
andload_process_max_memory_limit_percent
These parameters specify the maximum amount of memory that can be consumed for all load jobs on each individual BE. StarRocks identifies the smaller memory consumption among the values of the two parameters as the final memory consumption that is allowed.
load_process_max_memory_limit_bytes
: specifies the maximum memory size. The default maximum memory size is 100 GB.load_process_max_memory_limit_percent
: specifies the maximum memory usage. The default value is 30%. This parameter differs from themem_limit
parameter. Themem_limit
parameter specifies the total maximum memory usage of your StarRocks cluster, and the default value is 90% x 90%.If the memory capacity of the machine on which the BE resides is M, the maximum amount of memory that can be consumed for load jobs is calculated as follows:
M x 90% x 90% x 30%
.
Troubleshooting
For more information, see FAQ about data loading.