- Introduction
- Quick Start
- Table Design
- Data Loading
- Data Export
- Using StarRocks
- Reference
- SQL Reference
- User Account Management
- Cluster Management
- ADMIN CANCEL REPAIR
- ADMIN CHECK TABLET
- ADMIN REPAIR
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- DROP FILE
- INSTALL PLUGIN
- SHOW BACKENDS
- SHOW BROKER
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW TABLE STATUS
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER TABLE
- ALTER VIEW
- BACKUP
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE DATABASE
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- CREATE FUNCTION
- DROP DATABASE
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- DROP FUNCTION
- HLL
- RECOVER
- RESTORE
- SHOW RESOURCES
- SHOW FUNCTION
- TRUNCATE TABLE
- DML
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- DELETE
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- RESUME ROUTINE LOAD
- ROUTINE LOAD
- SELECT
- SHOW ALTER
- SHOW BACKUP
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- Data Type
- Auxiliary Commands
- Function Reference
- Date Functions
- Geographic Functions
- String Functions
- Aggregation Functions
- Bitmap Functions
- Array Functions
- cast function
- hash function
- Crytographic Functions
- Math Functions
- Utility Functions
- System variables
- Error code
- System limits
- SQL Reference
- Administration
- FAQ
- Deployment
- Data Migration
- SQL
- Others FAQs
- Benchmark
- Developers
- Contribute to StarRocks
- Code Style Guides
- Use the debuginfo file for debugging
- Development Environment
- Trace Tools
- Integration
Broker Load
StarRocks supports importing data from external storage systems such as Apache HDFS, Amazon S3, etc. The supported file formats are CSV, ORC File, Parquet, etc. The data volume is in the range of tens to hundreds of GB.
In broker load, StarRocks reads data from the corresponding data sources (e.g. HDFS, S3) through the deployed broker program, and uses its own computing resources to pre-process and import the data. This is an asynchronous import method that the user needs to create the import job via the MySQL protocol and view the import result by command.
This section introduces broker load basics, examples, best practices, and frequently asked questions.
Terminology Explanation
Broker: Broker is a standalone stateless process that encapsulates the file system interface and provides StarRocks with the ability to read files from remote storage systems.
Plan: Import execution plan. BE executes an import execution plan to import data into the StarRocks system.
Fundamentals
After the user submits the import job, the FE generates and distributes the corresponding plan to multiple BEs based on the data volume. Each BE executes a part of the import job. Once all the BEs finish importing, the FE will determine whether the import is successful or not.
The following diagram shows the main flow of broker load.
Supported Remote File Systems
- Apache HDFS
- Amazon S3
- Alibaba Cloud OSS
- Tencent COS
Import Example
Broker build
Broker Load needs a broker to access the remote storage, so a broker needs to be built first.
You can refer to the manual deployment (Deploy Broker).
Create import job
Syntax:
LOAD LABEL db_name.label_name
(data_desc, ...)
WITH BROKER broker_name broker_properties
[PROPERTIES (key1=value1, ... )]
data_desc:
DATA INFILE ('file_path', ...)
[NEGATIVE]
INTO TABLE tbl_name
[PARTITION (p1, p2)]
[COLUMNS TERMINATED BY column_separator ]
[FORMAT AS file_type]
[(col1, ...)]
[COLUMNS FROM PATH AS (colx, ...)]
[SET (k1=f1(xx), k2=f2(xx))]
[WHERE predicate]
broker_properties:
(key2=value2, ...)
Example 1 import from Apache HDFS:
LOAD LABEL db1.label1
(
DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
INTO TABLE tbl1
COLUMNS TERMINATED BY ","
(tmp_c1, tmp_c2)
SET
(
id=tmp_c2,
name=tmp_c1
),
DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file2")
INTO TABLE tbl2
COLUMNS TERMINATED BY ","
(col1, col2)
where col1 > 1
)
WITH BROKER 'broker'
(
"username" = "hdfs_username",
"password" = "hdfs_password"
)
PROPERTIES
(
"timeout" = "3600"
);
Example 2 import from Alibaba Cloud OSS:
LOAD LABEL example_db.label12
(
DATA INFILE("oss://my_bucket/input/file.csv")
INTO TABLE `my_table`
(k1, k2, k3)
)
WITH BROKER my_broker
(
"fs.oss.accessKeyId" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
"fs.oss.accessKeySecret" = "yyyyyyyyyyyyyyyyyyyy",
"fs.oss.endpoint" = "oss-cn-zhangjiakou-internal.aliyuncs.com"
)
Here we introduce the parameters shown in the command.
Label:
The label of the import job. Each import job has a unique label inside the database. A label is a user-defined name in the import command. It allows the user to view the execution of the corresponding import job and it can be used to prevent the same data from being imported repeatedly. When the status of the import job is FINISHED
, the corresponding label cannot be used again. When the status of the import job is CANCELLED
, the Label can be used again.
Data Description Parameters:
Data description parameters refer to the parameters of the data_desc
section in the statement. The data_desc
statement declares the information including data source address, ETL function, target table, and partition involved in this import job. Detailed description of data description parameters are as follows:
- Multi-Table Import
Broker load supports multiple tables involved in one import job, which can be achieved by declaring multiple tables with multiple data_desc
. Each data-desc
shows the address of a data source belonging to that table. Multiple file-paths can be declared for importing the same table. Broker load guarantees the atomicity of imports.
- file_path
The file path can be specified to a single file or to all files in a directory using *
. Intermediate directories can also be matched with wildcards.
The wildcards that can be used are ? * [] {} ^
, wildcard usage rules reference.
For example:
Users can match all files in all partitions under tablename
by hdfs://hdfs_host:hdfs_port/user/data/tablename/*/*
.
Users can match all files in all April
partitions under tablename
by hdfs://hdfs_host:hdfs_port/user/data/tablename/dt=202104*/*
.
- negative
Data_desc
also allows you to reverse the imported data. This function is applicable when the aggregated columns in the data table are all of the SUM
type. If you want to undo a batch of imported data, you can import the same batch of data with the negative
parameter. StarRocks will automatically invert the data on the aggregated columns, and yet remove the imported data.
partition
data_desc
allows you to specify the partition of the table to where data is imported. If the data to be imported does not belong to the specified partition, it will not be imported and considered as "wrong data". For data that you do not want to import nor to record as "wrong data", you can usewhere predicate
to filter it.column separator
Specify the column separator in the import file. The default is t
.
If it is an invisible character, you need to add x
as a prefix and use hexadecimal to represent the separator. For example, the separator x01
of the hive file is specified as \x01
.
- file type
Specify the type of the imported file (e.g. parquet, orc, csv). The default is csv.
The parquet type can also be recognized by the file suffix .parquet or .parq.
- COLUMNS FROM PATH AS
Extracts the partition fields in the file path.
Example: If the imported file is /path/col_name=col_value/dt=20210101/file1
, and col_name/dt
is a column in the table, set the following statement to import col_value
and 20210101
into the columns col_name
and dt
respectively.
...
(col1, col2)
COLUMNS FROM PATH AS (col_name, dt)
- set column mapping
The SET
statement in data_desc
is responsible for setting a column’s conversion function, which supports all equivalent functions. This statement is needed if the columns of the original data do not correspond to the columns in the table.
- where predicate
The WHERE
statement in data_desc
is responsible for filtering the data that has been transformed. The filtered data is not counted in the tolerance rate. If multiple conditions about the same table are declared in multiple data-desc
, they are merged with AND
.
Import job parameters:
Import job parameters are parameters that belong to the opt_properties
section, and are applied to the entire import job. See the following for parameter details.
- timeout
The timeout value (in seconds) for importing jobs. You can set the timeout value for each import in opt_properties
. The import job will be CANCELLED
if it is not completed within the set time limit. The default import timeout for broker load is 4 hours.
Note: Normally, users do not need to set this parameter unless the import cannot be completed within the default time.
The recommended timeout value is calculated as follows.
Timeout value > ((Total file size (MB) * Number of tables to be imported and related Roll up tables) / (10 * Number of import concurrency))
number of import concurrency
is described in the import system configuration at the end of the document. 10
indicates the current default speed limit for BE import (i.e. 10MB/s).
For example, for a 1GB data to be imported to a table which contains 2 rollup tables, with an import concurrency of 3, the minimum value of timeout is (1 * 1024 * 3 ) / (10 * 3) = 102 seconds.
Since each StarRocks cluster has its unique machine environment and different concurrent query tasks, the slowest import speed of a StarRocks cluster needs to be speculated by the user based on the historical import job speed.
- max_filter_ratio
The maximum fault tolerance rate of the import job with a default value of 0 and a range of 0 to 1. When the error rate of the import job exceeds this value, the job will fail.
Users can set this value greater than 0 to ensure data import despite erroneous rows.
The calculation formula is
max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )
dpp.abnorm.ALL
is the number of rows with unqualified data quality, such as type mismatch, column mismatch, length mismatch, etc.
dpp.norm.ALL
is the number of rows with correct data during the import. Users can use the SHOW LOAD
command to check the correct amount of data for the import job.
Number of rows in the original file = dpp.abnorm.ALL + dpp.norm.ALL
- exec_mem_limit
Import memory limit (in bytes), with a default value of 2GB.
- strict_mode
The strict_mode
of broker load can be turned on by setting "strict_mode" = "true". The default is off.
Strict mode means to strictly filter column type conversion during import. The strict filtering policy is as follows.
1.For column type conversion, the wrong data will be filtered out under strict mode. Here the wrong data refers to data which is not null originally but turned into null after being converted.
However, this policy does not apply to the following scenarios :
- For an imported column generated by a conversion function, strict mode has no effect on it.
- An imported column containing a range restriction can be converted to a target type regardless of its range restriction. The strict mode will have no effect on it. For example, if the data type of the target column is
decimal(1,0)
, but the original data being imported is10
, the import can satisfy the type conversion but not the range restriction. In this case, strict mode has no effect on the import.
Checking import job status
Broker load is asynchronous, and users can specify labels in the SHOW LOAD
command to check execution status. It should be noted that the SHOW LOAD
command can only be used to view load jobs being asynchronously imported. Synchronous load jobs, such as stream load, cannot be viewed with the SHOW LOAD
command.
Example:
mysql> show load where label = 'label1'\G
*************************** 1. row ***************************
JobId: 76391
Label: label1
State: FINISHED
Progress: ETL:N/A; LOAD:100%
Type: BROKER
EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
TaskInfo: cluster:N/A; timeout(s):10800; max_filter_ratio:5.0E-5
ErrorMsg: N/A
CreateTime: 2019-07-27 11:46:42
EtlStartTime: 2019-07-27 11:46:44
EtlFinishTime: 2019-07-27 11:46:44
LoadStartTime: 2019-07-27 11:46:44
LoadFinishTime: 2019-07-27 11:50:16
URL: http://192.168.1.1:8040/api/_load_error_log?file=__shard_4/error_log_insert_stmt_4bb00753932c491a-a6da6e2725415317_4bb00753932c491a_a6da6e2725415317
JobDetails: {"Unfinished backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}
The following describes the parameters returned by the SHOW LOAD
command.
- JobId: The unique ID of the imported job. The JobId is different for each imported job and is automatically generated by the system. Unlike labels, JobId will never be the same, while labels can be reused after the import job fails.
- Label: Identifier of the imported job.
- State: The current state of the import job. The two main states of an import –-
PENDING
andLOADING
-- occur during the broker load.PENDING
indicates the import job is waiting to be executed, whileLOADING
indicates the job is being executed. - There are two final stages of an import job –
CANCELLED
andFINISHED
; both indicate the import job is completed.CANCELLED
indicates an import failure, whileFINISHED
indicates an import success. - Progress: The progress description of the import job. There are two types of progress –
ETL
andLOAD
, both of which correspond to the two phases of the import process.ETL
is fixed toN/A
since the broker load does not have this stage presently, while the progress range ofload
is 0~100%.Load progress = (the number of tables currently completed import / the total number of tables designed for this import job) * 100%
- The load progress will be 99% after all data have been imported and changed to 100% after the import takes effect.
Note: The import progress is not linear, so if the progress does not change for a period of time, it does not mean that the import is not executing.
- Type: The type of the imported job. Type for broker load takes the value
BROKER
. - EtlInfo: Contains parameters of the imported data volume, such as
unselected.rows
,dpp.norm.ALL
anddpp.abnorm.ALL
. The first parameter can be used to determine how many rows are filtered by thewhere
condition, and the last two parameters verify that the error rate of the current import job does not exceedmax-filter-ratio
. The sum of the three parameters is the total number of rows in the original data volume. - TaskInfo: Mainly shows parameters of the current import job, that is, the parameters specified by the user when creating the Broker Load import job. Those include cluster, timeout and max-filter-ratio.
- ErrorMsg: If the status of the import job is
CANCELLED
, the reason for failure is displayed by ErrorMsg. Each ErrorMsg includes type and msg. N/A is displayed if the import job is successful.
The meaning of the values of type :
USER-CANCEL: The job that is cancelled by the user.
ETL-RUN-FAIL: The job that failed in the ETL phase.
ETL-QUALITY-UNSATISFIED: Data quality failed, i.e. the error data rate exceeded max-filter-ratio.
LOAD-RUN-FAIL: The job that failed in the LOADING phase.
TIMEOUT: The job that failed to complete within the timeout period.
UNKNOWN: Unknown import error.
CreateTime/EtlStartTime/EtlFinishTime/LoadStartTime/LoadFinishTime: This value represents the import creation time, ETL phase start time, ETL phase completion time, Loading phase start time and the entire import job completion time.
Broker Load import has no ETL phase, so its
EtlStartTime
,EtlFinishTime
,LoadStartTime
are set to the same value.If the import job stays at
CreateTime
for a long time andLoadStartTime
is N/A, the import job is currently heavily stacked and the user should reduce the frequency of import commits.
LoadFinishTime - CreateTime = time consumed by the entire import task
LoadFinishTime - LoadStartTime = execution time of the entire Broker load import job = time consumed by the entire import job - wait time of the import job
.
- URL: Sample error data for the import job, which can be obtained by accessing the URL address. When no error data exists for this import, the URL field is N/A.
- JobDetails: Shows the detailed status of the job, including the number of imported files, the total size (bytes), the number of subtasks, the number of raw rows processed, the BE node Id of running subtasks, and the BE node id of incomplete jobs.
{"Unfinished backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"ScannedRows":2390016,"TaskNumber":1,"All backends":{"9c3441027ff948a0-8287923329a2b6a7":[10002]},"FileNumber":1,"FileSize":1073741824}
The original number of rows processed is updated every 5 seconds. This number of rows is only used to show the current progress, and does not represent the actual number of rows processed in total. The total number of rows processed is displayed in EtlInfo.
Cancel import job
When the status of the broker load job is not CANCELLED
or FINISHED
, it can be cancelled manually by the user. To cancel, specify the label of the import job.
Related Configuration
Parallelism
A job can be split into one or more subtasks, and the subtasks are executed in parallel. The splitting is determined by the DataDescription in the LOAD
statement. For example,
- When multiple
DataDescriptions
correspond to the import of multiple different tables, each will be split into one subtask. - When multiple
DataDescriptions
correspond to the import of different partitions of the same table, each will also be split into one subtask.
Each task is also split into one or more instances, which are then equally distributed to BEs for parallel execution. The splitting =is determined by the following FE configuration.
min_bytes_per_broker_scanner
: The minimum amount of data to be processed by a single instance, with a default value of 64MB.
max_broker_concurrency
: The maximum number of concurrent instances for a single job, with a default value of 100.
load_parallel_instance_num
: The number of concurrent instances on a single BE, with a default value of 1.
Total number of instances = min(total import file size / minimum amount of data handled by a single instance, maximum number of concurrent instances for a single job, number of concurrent instances on a single BE * number of BEs)
In general, a job has only one DataDescription
and will be split into only one task. The task will be split into instances equal to the number of BEs and then assigned to those BEs for parallel execution.
Frequently Asked Questions
Q: Error reported for data quality issue:
ETL_QUALITY_UNSATISFIED; msg:quality not good enough to cancel
.A: Please refer to Import Overview/FAQ`.
Q: Import error:
failed to send batch
orTabletWriter add batch with unknown id
.A: Modify
query_timeout
andstreaming_load_rpc_max_alive_time_sec
appropriately. Please refer to Import Overview/Common System Configuration/BE Configuration .Q:Import error:
LOAD-RUN-FAIL; msg:Invalid Column Name:xxx
A:If the data is in parquet or ORC format, keep the column name in the file header consistent with the one in the StarRocks table, for example:
~~~sql
(tmp_c1,tmp_c2)
SET
(
id=tmp_c2,
name=tmp_c1
)
~~~
This means the column with (tmp_c1, tmp_c2) as column name in parquet or ORC file is mapped to (id, name) column in the StarRocks table. If SET
is not declared, the columns are imported in order.
Note: If you use ORC files generated directly from some versions of Hive, the table header in the ORC file is not Hive meta data but
(_col0, _col1, _col2, ...)
, which may lead toInvalid Column Name error
. In that case, useset
for mapping.
Q:Other errors.
A: For other issues such as jobs that take a long time to complete, you can go to
log/be.INFO
in BE and search forkafka error
for specific reasons.Q: How to configure Hadoop HA
A: The following configuration is used to access an HDFS cluster deployed in HA mode.
dfs.nameservices
: Customized name of the HDFS service, e.g.dfs.nameservices
=my_ha
.dfs.ha.namenodes.xxx
: Customized name of the namenode as thexxx
, multiple names can be separated by commas, e.g. "dfs.ha.namenodes.my_ha" = "my_nn".
dfs.namenode.rpc-address.xxx.nn
: The rpc address for the namenode where nn indicates the name of the configured namenode in dfs.ha.namenodes.xxx
. e.g. "dfs.namenode.rpc-address.my_ha.my_nn" = "host:port".
dfs.client.failover.proxy.provider
: The provider of the client to connect to the namenode with a default value of org.apache.hadoop.hdfs.server.namenode.ha. ConfiguredFailoverProxyProvider
.
For example:
(
"dfs.nameservices" = "my-ha",
"dfs.ha.namenodes.my-ha" = "my-namenode1, my-namenode2",
"dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
"dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
HA mode can be combined with the previous two authentication methods for cluster access. For example, accessing HA HDFS through simple authentication:
(
"username"="user",
"password"="passwd",
"dfs.nameservices" = "my-ha",
"dfs.ha.namenodes.my-ha" = "my_namenode1, my_namenode2",
"dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port",
"dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port",
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
)
HDFS cluster configuration can be written in the hdfs-site.xml file
. When using the Broker process to read information of an HDFS cluster, fill in the file path name and authentication information of the cluster.
Q: How to configure Hadoop ViewFS (federation)
A: Copy the ViewFS related configuration
core-site.xml
andhdfs-site.xml
tobroker/conf
.
If there is a custom FileSystem, copy the relevant jar to broker/lib
.