StarRocks supports multiple data models (refer to Table Design chapter) for different business scenarios. The data import function aims to clean and transform the raw data according to the specific model and load it into StarRocks.
StarRocks supports a variety of import methods. Users can choose the most suitable method based on the data size and import frequency. This section introduces the basic concepts, principles, system configuration, suitable scenarios of different import methods, as well as best practices and frequently asked questions.
Note: It is recommended to read this whole section first, and then dive into the details of your selected import method.
Choose the import methods based on your data source.
- Offline data import. If the data source is Hive/HDFS, Broker Load is recommended. If there are many data tables, consider using Hive external table for direct query.The performance may be worse than
Broker Load, but it doesn’t require data relocation. If a table has a large data volume, or needs the global data dictionary for precise deduplication, choose Spark Load.
- Real-time data import. It is recommended to import log and binlog data to StarRocks via Routine load after they are synchronized to Kafka. StarRocks has a standard Flink-connector to facilitate the use of Flink jobs.
- Write to StarRocks programmatically. It is recommended to use Stream Load.
- Text file import. It is recommended to use
- Mysql data import. It is recommend to use MySQL external table to import (
insert into new_table select * from external_table)
- Other data sources to import. It is recommended to use DataX import. StarRocks provides DataX-StarRocks-writer *StarRocks internal import. It is recommended to use insert into tablename select inside StarRocks, which can work with an external scheduler for simple ETL processing.
- Import job: The import job reads the source data and performs cleaning and transformation, then imports the data into the StarRocks system. Once the import is complete, the data is ready to be queried.
- Label: Each import job has a unique label that identifies them. The label can be specified by the user or generated automatically by the system. The Label is unique within a database, i.e., it can be used for only one successful import job. The label cannot be used again unless the corresponding import job is failed. This mechanism ensures that the data corresponding to each label is imported At-Most-Once.
- Atomicity: All import methods in StarRocks provide atomicity guarantee, i.e. data within the same import job is either all valid or none;there is no data being partially imported. The valid data does not include data that is filtered due to data quality issues such as type conversion errors. See the data quality issues listed in the FAQ section.
- MySQL Protocol/HTTP Protocol: StarRocks provides two protocol interfaces: MySQL protocol and HTTP protocol. Some imports use the MySQL protocol interface to submit jobs, and others use the HTTP protocol interface instead.
- Broker Load: Reads data from external data sources (e.g. HDFS) and imports it to StarRocks. The Broker uses its own computational resources to pre-process the data for import.
- Spark Load: StarRocks completes the import by reading an intermediate file that has been pre-processed and generated by an external resource such as Spark. This is an asynchronous import method – users need to initiate the import through the MySQL protocol and view the import result using the command.
- FE: Frontend, the node where StarRocks stores metadata and performs scheduling. It is mainly responsible for generating import execution plans and scheduling import jobs.
- BE: Backend, the computing and storage node of StarRocks. It is mainly responsible for the ETL and storage of data in the import process.
- Tablet: Logical partitioning of StarRocks tables. A table can be divided into multiple tablets according to partitioning and bucketing rules (refer to [Data Distribution](section Data Distribution.md)).
Import execution flow.
An import job is divided into five main phases.
Not required. This phase is when the user submits the import job and waits for the FE to schedule an execution.
This step is included in
Not required. This phase performs pre-processing of data, including cleaning, partitioning, sorting, aggregation, and etc.
This step is included in
Spark Load, which uses external computing resources to complete ETL.
Data is first cleaned and converted in this stage, and then sent to the BE for processing. Now all data is imported and waiting to take effect, the status of the import job is
After all data has taken effect, the status of the job becomes
FINISHEDis the final phase, after which the data can be queried.
The job can be cancelled at any time before its status changes to
Data import format：
- Integer (TINYINT, SMALLINT, INT, BIGINT, LARGEINT):
1, 1000, 1234
- Floating-point (FLOAT, DOUBLE, DECIMAL):
1.1, 0.23, .356
- Date (DATE, DATETIME):
2017-10-03, 2017-06-13 12:34:03
- String (CHAR, VARCHAR):
I am a student, a
- NULL value:
Introduction of import methods
StarRocks provides five import methods to support different data sources (such as HDFS, Kafka, local files, etc.) and different sync rules (asynchronous or synchronous).
All import methods support CSV data format. The
Broker Load also supports parquet and ORC data formats.
Broker Loadaccesses and reads the external data source through the Broker process and then creates an import job to StarRocks using the MySQL protocol. The submitted job is executed asynchronously and the user can view the import results with the
Broker Loadis suitable for source data in storage systems that are accessible to the Broker process (e.g. HDFS). Supported data volume is up to tens to hundreds of GB of data.
To save computing resources and improve the performance of importing large data volumes,
Spark Loadenables pre-processing of data with external Spark resources.
Spark Loadis an asynchronous import method that requires to use the MySQL protocol to initiate import jobs. Users can view the import results with the
Spark Loadis suitable for initial migration of large data volumes (up to TB level) to StarRocks. The data needs to be stored in a Spark-accessible storage system (e.g. HDFS).
Stream Loadis a synchronous import method. Users can send a request via the HTTP protocol to import a local file or data stream into StarRocks and wait for the system to return the results to determine whether the import was successful.
Stream Loadis suitable for importing local files or importing data from a data stream through a program.
Routine Load automates data import from a specified data source. Users can submit a Routine Import job via the MySQL protocol, generating a resident thread that reads data from a data source (such as Kafka) and imports it into StarRocks without interruption.
Similar to the
Insertstatement in MySQL, StarRocks provides
INSERT INTO tbl SELECT ... ;to read data from a StarRocks table and import it to another table, and
INSERT INTO tbl VALUES(...) ;to insert a single piece of data.
Synchronous and Asynchronous
StarRocks currently has two types of import methods: synchronous and asynchronous.
Note: If an external program accesses StarRocks' import function, you need to determine which type of import method is used first, and then determine the access method.
Synchronous import means that StarRocks executes synchronously while the user creates the import job and returns the result after the execution is finished.
The import methods of synchronous type are:
Stream Loadand Insert.
- The user (external system) creates the import job.
- StarRocks returns the import result.
- The user (external system) views the import result. If the import result is a failure, the import job can be created again.
The asynchronous import method means that after the user creates the import job, StarRocks returns a confirmation message. The message does not mean that the data has been imported successfully. The import job will be executed asynchronously, and the user needs to send the view command to get the status. If the creation fails, the user should read the error message to determine whether the job needs to be created again.
The asynchronous types of import methods include
Broker Load and
Step1: The user (external system) creates the import job. Step2: StarRocks returns the result of the creation job. Step3: If the job succeeds, go to step 4. If it fails, repeat from step 1. Step4: The user (external system) polls to see the job status (FINISHED or CANCELLED).
When the source data is stored in HDFS and the data volume is tens of GB to hundreds of GB, the
Broker Loadmethod can be used. Make sure that the deployed Broker has access to the HDFS data source. The import job is executed asynchronously, and users can view the result by
When the source data is stored in HDFS and the data volume reaches the terabyte level, the
Spark Loadmethod can be used. Make sure that the deployed Spark has access to the HDFS data source. The import job is executed asynchronously, and users can view the result by
For other external data sources, as long as the Broker or Spark can read the data source, the corresponding method can also be used to import data.
Local File Import
When the source data is stored in local files and the data volume is less than 10GB, the
Stream Loadmethod can be used to import data quickly. The HTTP protocol is used to create the import job. The job is executed synchronously, and the return value of the HTTP request will indicate the import result.
When the data comes from a streaming data source such as Kafka and needs to be imported to StarRocks in real time, the
ROUTINE LOADmethod can be used. Users need to create routine import jobs via the MySQL protocol and StarRocks can continuously read and import data from Kafka.
Insert Into Import
When testing and processing temporary data, the
INSERT INTROmethod can be used to write data to StarRocks tables. Users can use
INSERT INTO tbl SELECT ...;statement to read data from a StarRocks table and imports it into another table, and use
INSERT INTO tbl VALUES(...) ;statement to insert a single piece of data into a specific table.
To prevent imports from taking up too much memory and causing system OOM, users can set parameters to limit the memory consumed by import jobs. Different import methods limit memory in slightly different ways, see each import method for details.
An import job is usually distributed across multiple BEs. The memory parameters limit the memory usage of an import job on a single BE, not on the entire cluster.
Also, each BE sets the total memory limit that can be used for the import job. See the section "4.1.5 General System Configuration" for details. This configuration limits the overall memory usage for all import jobs running on that BE.
A stricter memory limit may affect the efficiency of the import, as the import process may frequently write data from memory to disk when the memory limit is reached. However, an overly lenient memory limit may lead to system OOM when the import concurrency is high. In conclusion, the memory parameters need to be set properly based on your requirements and use cases.
General System Configuration
This section explains the system configuration that is available for all import methods.
The following system configurations belong to FE and can be modified by FE's configuration file (
max_load_timeout_second and min_load_timeout_second
You can set the maximum and minimum range of values for the import timeout, both in seconds. The default maximum timeout is 3 days, and the minimum timeout is 1 second. This parameter is common to all types of import jobs.
The default value for the maximum number of import jobs that can be held in the waiting queue is 100. If the number of PENDING import jobs in the FE reaches this value, new import requests will be rejected. This configuration is valid only for asynchronous imports.
The default value for the maximum number of running import jobs in each database (regardless of import type) is 100. When the number of running import jobs exceeds this value subsequent imports will not be executed. If it is a synchronous job, the job will be rejected; if it is an asynchronous job, the job will wait in the queue.
This parameter determines how long the records of completed (FINISHED or CANCELLED) import jobs can be kept in the StarRocks. The default is 3 days. This parameter is common for all types of import jobs.
The following belongs to the system configuration of BE, which can be modified by the BE configuration file (
Imported data is first written to a memory block on the BE, and then written to disk only if this memory block reaches its threshold (100MB by default). Too small a threshold may result in a large number of small files on the BE. This threshold can be increased to reduce the number of files. However, too large a threshold may lead to RPC timeouts, in which case you can adjust the value of this parameter based on your business needs.
During the import process, StarRocks opens a writer for each tablet to receive and write data. This parameter specifies the timeout for the writer to wait. The default value is 600 seconds. If the Writer does not receive any data within the time specified by the parameter, the writer will be automatically destroyed. If the system processing speed is slow, the writer may not receive the next batch of data any soon, resulting in an import error
TabletWriter add batch with unknown id.
These two parameters are maximum memory and maximum memory percentage, respectively, limiting the maximum amount of memory that can be used for import jobs on a single BE. The system will take the smaller value of the two parameters as the memory usage limit.
The percentage of the total memory limit for a BE is 30 by default. (The total memory limit
mem_limit defaults to 80%, indicating the percentage to its physical memory). That is, assuming the physical memory is M, the default import memory limit is
M * 80% * 30%.
load_process_max_memory_limit_bytesis 100 GB by default.
System variable configurations
You can configure the following system variable:
The query timeout duration. Unit: seconds. Value range:
259200. Default value:
300. This variable will act on all query statements in the current connection, as well as INSERT statements.
When users import data to StarRocks, they usually use programmatic docking. The following are some guidelines.
- Choose the appropriate import method: choose the import method according to the data size, import frequency and data source. For example, if the original data is stored on HDFS, use
Broker Loadto import.
- Confirm the protocol of the import method: If you choose
Broker Load, the external system needs to be able to submit and view the import jobs regularly using the MySQL protocol.
- Confirm the type of import method: The import method can be synchronous or asynchronous. If the import is asynchronous, the external system must invoke the
show loadcommand to see whether the import is successful.
- Develop a Label generation strategy: The Label generation strategy must meet the principle of uniqueness and fixity for each batch of data.
- Ensure Exactly-Once: External system needs to ensure At-Least-Once of data import. StarRocks' Label mechanism can ensure At-Most-Once of data import, so that the overall data import can be guaranteed Exactly-Once.
Label Already Exists
This means that an import job with the same Label has been successfully imported or is being executed in the same database. Users need to check if there is a Label conflict between different import methods or if the job has been submitted repeatedly. The steps to check for Label duplicates are as follows.
- Since the import Label in the StarRocks system does not distinguish between import methods, there is an issue with other import methods using the same Label.
- Check if a
FINISHEDimport job with the same Label already exists by
SHOW LOAD WHERE LABEL = "xxx", where xxx is the Label string to be checked.
Error reported for data quality issue:
ETL_QUALITY_UNSATISFIED; msg:quality not good enough to cancel
Error data can be viewed by URL in
SHOW LOAD. Common types of errors are:
convert csv string to INT failed. The error occurs when converting the string to the corresponding type, for example, when converting "abc" to a number.
the length of input is too long than schema.The length is incorrect, such as a fixed-length string exceeding the length set in the table, or an int exceeding 4 bytes.
actual column number is less than schema column number.The number of columns in a row of the import file is less than the specified number of columns after the specified delimiter, probably because the delimiter is incorrect.
The actual column number is more than schema column number.The number of columns in a row of the import file is bigger than the specified number of columns after the specified delimiter.
The frac part length longer than schema scale.The fractional part of a decimal column in the import file is longer than the specified length.
The int part length longer than schema precision.The length of the integer part of a decimal column in the imported file exceeds the specified length.
The length of decimal value is overflow.The length of a decimal column in the import file exceeds the specified length.
There is no corresponding partition for this key.The value of the partition column in a row of the import file is not within the partition range.