BROKER LOAD
Description
StarRocks provides the MySQL-based loading method Broker Load. After you submit a load job, StarRocks asynchronously runs the job. You can use SELECT * FROM information_schema.loads
to query the job result. This feature is supported from v3.1 onwards. For more information about the background information, principles, supported data file formats, how to perform single-table loads and multi-table loads, and how to view job results, see Load data from HDFS and Load data from cloud storage.
You can load data into StarRocks tables only as a user who has the INSERT privilege on those StarRocks tables. If you do not have the INSERT privilege, follow the instructions provided in GRANT to grant the INSERT privilege to the user that you use to connect to your StarRocks cluster. The syntax is GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>}
.
Syntax
LOAD LABEL [<database_name>.]<label_name>
(
data_desc[, data_desc ...]
)
WITH BROKER
(
StorageCredentialParams
)
[PROPERTIES
(
opt_properties
)
]
Note that in StarRocks some literals are used as reserved keywords by the SQL language. Do not directly use these keywords in SQL statements. If you want to use such a keyword in an SQL statement, enclose it in a pair of backticks (`). See Keywords.
Parameters
database_name and label_name
label_name
specifies the label of the load job. For the naming conventions, see System limits.
database_name
optionally specifies the name of the database to which the destination table belongs.
Each load job has a label that is unique across the entire database. You can use the label of a load job to view the execution status of the load job and prevent repeatedly loading the same data. When a load job enters the FINISHED state, its label cannot be reused. Only the label of a load job that has entered the CANCELLED state can be reused. In most cases, the label of a load job is reused to retry that load job and load the same data, thereby implementing Exactly-Once semantics.
For label naming conventions, see System limits.
data_desc
The description of a batch of data to be loaded. Each data_desc
descriptor declares information such as the data source, ETL functions, destination StarRocks table, and destination partitions.
Broker Load supports loading multiple data files at a time. In one load job, you can use multiple data_desc
descriptors to declare multiple data files you want to load, or use one data_desc
descriptor to declare one file path from which you want to load all data files in it. Broker Load can also ensure the transactional atomicity of each load job that is run to load multiple data files. Atomicity means that the loading of multiple data files in one load job must all succeed or fail. It never happens that the loading of some data files succeeds while the loading of the other files fails.
data_desc
supports the following syntax:
DATA INFILE ("<file_path>"[, "<file_path>" ...])
[NEGATIVE]
INTO TABLE <table_name>
[PARTITION (<partition1_name>[, <partition2_name> ...])]
[TEMPORARY PARTITION (<temporary_partition1_name>[, <temporary_partition2_name> ...])]
[COLUMNS TERMINATED BY "<column_separator>"]
[ROWS TERMINATED BY "<row_separator>"]
[FORMAT AS "CSV | Parquet | ORC"]
[(format_type_options)]
[(column_list)]
[COLUMNS FROM PATH AS (<partition_field_name>[, <partition_field_name> ...])]
[SET <k1=f1(v1)>[, <k2=f2(v2)> ...]]
[WHERE predicate]
data_desc
must include the following parameters:
-
file_path
Specifies the save path of one or more data files you want to load.
You can specify this parameter as the save path of one data file. For example, you can specify this parameter as
"hdfs://<hdfs_host>:<hdfs_port>/user/data/tablename/20210411"
to load a data file named20210411
from the path/user/data/tablename
on the HDFS server.You can also specify this parameter as the save path of multiple data files by using wildcards
?
,*
,[]
,{}
, or^
. See Wildcard reference. For example, you can specify this parameter as"hdfs://<hdfs_host>:<hdfs_port>/user/data/tablename/*/*"
or"hdfs://<hdfs_host>:<hdfs_port>/user/data/tablename/dt=202104*/*"
to load the data files from all partitions or only202104
partitions in the path/user/data/tablename
on the HDFS server.NOTE
Wildcards can also be used to specify intermediate paths.
In the preceding examples, the
hdfs_host
andhdfs_port
parameters are described as follows:-
hdfs_host
: the IP address of the NameNode host in the HDFS cluster. -
hdfs_host
: the FS port of the NameNode host in the HDFS cluster. The default port number is9000
.
NOTICE
- Broker Load supports accessing AWS S3 according to the S3 or S3A protocol. Therefore, when you load data from AWS S3, you can include
s3://
ors3a://
as the prefix in the S3 URI that you pass as the file path. - Broker Load supports accessing Google GCS only according to the gs protocol. Therefore, when you load data from Google GCS, you must include
gs://
as the prefix in the GCS URI that you pass as the file path. - When you load data from Blob Storage, you must use the wasb or wasbs protocol to access your data:
- If your storage account allows access over HTTP, use the wasb protocol and write the file path as
wasb://<container_name>@<storage_account_name>.blob.core.windows.net/<path>/<file_name>/*
. - If your storage account allows access over HTTPS, use the wasbs protocol and write the file path as
wasbs://<container_name>@<storage_account_name>.blob.core.windows.net/<path>/<file_name>/*
.
- If your storage account allows access over HTTP, use the wasb protocol and write the file path as
- When you load data from Data Lake Storage Gen2, you must use the abfs or abfss protocol to access your data:
- If your storage account allows access over HTTP, use the abfs protocol and write the file path as
abfs://<container_name>@<storage_account_name>.dfs.core.windows.net/<file_name>
. - If your storage account allows access over HTTPS, use the abfss protocol and write the file path as
abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<file_name>
.
- If your storage account allows access over HTTP, use the abfs protocol and write the file path as
- When you load data from Data Lake Storage Gen1, you must use the adl protocol to access your data and write the file path as
adl://<data_lake_storage_gen1_name>.azuredatalakestore.net/<path>/<file_name>
.
-
-
INTO TABLE
Specifies the name of the destination StarRocks table.
data_desc
can also optionally include the following parameters:
-
NEGATIVE
Revokes the loading of a specific batch of data. To achieve this, you need to load the same batch of data with the
NEGATIVE
keyword specified.NOTE
This parameter is valid only when the StarRocks table is an Aggregate table and all its value columns are computed by the
sum
function. -
PARTITION
Specifies the partitions into which you want to load data. By default, if you do not specify this parameter, the source data will be loaded into all partitions of the StarRocks table.
-
TEMPORARY PARTITION
Specifies the name of the temporary partition into which you want to load data. You can specify multiple temporary partitions, which must be separated by commas (,).
-
COLUMNS TERMINATED BY
Specifies the column separator used in the data file. By default, if you do not specify this parameter, this parameter defaults to
\t
, indicating tab. The column separator you specify using this parameter must be the same as the column separator that is actually used in the data file. Otherwise, the load job will fail due to inadequate data quality, and itsState
will beCANCELLED
.Broker Load jobs are submitted according to the MySQL protocol. StarRocks and MySQL both escape characters in the load requests. Therefore, if the column separator is an invisible character such as tab, you must add a backslash () preceding the column separator. For example, you must input
\\t
if the column separator is\t
, and you must input\\n
if the column separator is\n
. Apache Hive™ files use\x01
as their column separator, so you must input\\x01
if the data file is from Hive.NOTE
- For CSV data, you can use a UTF-8 string, such as a comma (,), tab, or pipe (|), whose length does not exceed 50 bytes as a text delimiter.
- Null values are denoted by using
\N
. For example, a data file consists of three columns, and a record from that data file holds data in the first and third columns but no data in the second column. In this situation, you need to use\N
in the second column to denote a null value. This means the record must be compiled asa,\N,b
instead ofa,,b
.a,,b
denotes that the second column of the record holds an empty string.
-
ROWS TERMINATED BY
Specifies the row separator used in the data file. By default, if you do not specify this parameter, this parameter defaults to
\n
, indicating line break. The row separator you specify using this parameter must be the same as the row separator that is actually used in the data file. Otherwise, the load job will fail due to inadequate data quality, and itsState
will beCANCELLED
. This parameter is supported from v2.5.4 onwards.For the usage notes about the row separator, see the usage notes for the preceding
COLUMNS TERMINATED BY
parameter. -
FORMAT AS
Specifies the format of the data file. Valid values:
CSV
,Parquet
, andORC
. By default, if you do not specify this parameter, StarRocks determines the data file format based on the filename extension .csv, .parquet, or .orc specified in thefile_path
parameter. -
format_type_options
Specifies CSV format options when
FORMAT AS
is set toCSV
. Syntax:(
key = value
key = value
...
)NOTE
format_type_options
is supported in v3.0 and later.The following table describes the options.
Parameter Description skip_header Specifies whether to skip the first rows of the data file when the data file is in CSV format. Type: INTEGER. Default value: 0
.
In some CSV-formatted data files, the first rows at the beginning are used to define metadata such as column names and column data types. By setting theskip_header
parameter, you can enable StarRocks to skip the first rows of the data file during data loading. For example, if you set this parameter to1
, StarRocks skips the first row of the data file during data loading.
The first rows at the beginning in the data file must be separated by using the row separator that you specify in the load statement.trim_space Specifies whether to remove spaces preceding and following column separators from the data file when the data file is in CSV format. Type: BOOLEAN. Default value: false
.
For some databases, spaces are added to column separators when you export data as a CSV-formatted data file. Such spaces are called leading spaces or trailing spaces depending on their locations. By setting thetrim_space
parameter, you can enable StarRocks to remove such unnecessary spaces during data loading.
Note that StarRocks does not remove the spaces (including leading spaces and trailing spaces) within a field wrapped in a pair ofenclose
-specified characters. For example, the following field values use pipe (|
) as the column separator and double quotation marks ("
) as theenclose
-specified character:|"Love StarRocks"|
|" Love StarRocks "|
| "Love StarRocks" |
If you settrim_space
totrue
, StarRocks processes the preceding field values as follows:|"Love StarRocks"|
|" Love StarRocks "|
|"Love StarRocks"|
enclose Specifies the character that is used to wrap the field values in the data file according to RFC4180 when the data file is in CSV format. Type: single-byte character. Default value: NONE
. The most prevalent characters are single quotation mark ('
) and double quotation mark ("
).
All special characters (including row separators and column separators) wrapped by using theenclose
-specified character are considered normal symbols. StarRocks can do more than RFC4180 as it allows you to specify any single-byte character as theenclose
-specified character.
If a field value contains anenclose
-specified character, you can use the same character to escape thatenclose
-specified character. For example, you setenclose
to"
, and a field value isa "quoted" c
. In this case, you can enter the field value as"a ""quoted"" c"
into the data file.escape Specifies the character that is used to escape various special characters, such as row separators, column separators, escape characters, and enclose
-specified characters, which are then considered by StarRocks to be common characters and are parsed as part of the field values in which they reside. Type: single-byte character. Default value:NONE
. The most prevalent character is slash (\
), which must be written as double slashes (\\
) in SQL statements.
NOTE
The character specified byescape
is applied to both inside and outside of each pair ofenclose
-specified characters.
Two examples are as follows:- When you set
enclose
to"
andescape
to\
, StarRocks parses"say \"Hello world\""
intosay "Hello world"
. - Assume that the column separator is comma (
,
). When you setescape
to\
, StarRocks parsesa, b\, c
into two separate field values:a
andb, c
.
- When you set
-
column_list
Specifies the column mapping between the data file and the StarRocks table. Syntax:
(<column_name>[, <column_name> ...])
. The columns declared incolumn_list
are mapped by name onto the StarRocks table columns.NOTE
If the columns of the data file are mapped in sequence onto the columns of the StarRocks table, you do not need to specify
column_list
.If you want to skip a specific column of the data file, you only need to temporarily name that column as different from any of the StarRocks table columns. For more information, see Transform data at loading.
-
COLUMNS FROM PATH AS
Extracts the information about one or more partition fields from the file path you specify. This parameter is valid only when the file path contains partition fields.
For example, if the data file is stored in the path
/path/col_name=col_value/file1
in whichcol_name
is a partition field and can be mapped onto a column of the StarRocks table, you can specify this parameter ascol_name
. As such, StarRocks extractscol_value
values from the path and loads them into the StarRocks table column onto whichcol_name
is mapped.NOTE
This parameter is available only when you load data from HDFS.
-
SET
Specifies one or more functions that you want to use to convert a column of the data file. Examples:
- The StarRocks table consists of three columns, which are
col1
,col2
, andcol3
in sequence. The data file consists of four columns, among which the first two columns are mapped in sequence ontocol1
andcol2
of the StarRocks table and the sum of the last two columns is mapped ontocol3
of the StarRocks table. In this case, you need to specifycolumn_list
as(col1,col2,tmp_col3,tmp_col4)
and specify(col3=tmp_col3+tmp_col4)
in the SET clause to implement data conversion. - The StarRocks table consists of three columns, which are
year
,month
, andday
in sequence. The data file consists of only one column that accommodates date and time values inyyyy-mm-dd hh:mm:ss
format. In this case, you need to specifycolumn_list
as(tmp_time)
and specify(year = year(tmp_time), month=month(tmp_time), day=day(tmp_time))
in the SET clause to implement data conversion.
- The StarRocks table consists of three columns, which are
-
WHERE
Specifies the conditions based on which you want to filter the source data. StarRocks loads only the source data that meets the filter conditions specified in the WHERE clause.
WITH BROKER
In v2.3 and earlier, input WITH BROKER "<broker_name>"
to specify the broker you want to use. From v2.5 onwards, you no longer need to specify a broker, but you still need to retain the WITH BROKER
keyword.
StorageCredentialParams
The authentication information used by StarRocks to access your storage system.
HDFS
Open-source HDFS supports two authentication methods: simple authentication and Kerberos authentication. Broker Load uses simple authentication by default. Open-source HDFS also supports configuring an HA mechanism for the NameNode. If you choose open-source HDFS as your storage system, you can specify the authentication configuration and HA configuration as follows:
-
Authentication configuration
-
If you use simple authentication, configure
StorageCredentialParams
as follows:"hadoop.security.authentication" = "simple",
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"The following table describes the parameters in
StorageCredentialParams
.Parameter Description hadoop.security.authentication The authentication method. Valid values: simple
andkerberos
. Default value:simple
.simple
represents simple authentication, meaning no authentication, andkerberos
represents Kerberos authentication.username The username of the account that you want to use to access the NameNode of the HDFS cluster. password The password of the account that you want to use to access the NameNode of the HDFS cluster. -
If you use Kerberos authentication, configure
StorageCredentialParams
as follows:"hadoop.security.authentication" = "kerberos",
"kerberos_principal" = "nn/zelda1@ZELDA.COM",
"kerberos_keytab" = "/keytab/hive.keytab",
"kerberos_keytab_content" = "YWFhYWFh"The following table describes the parameters in
StorageCredentialParams
.Parameter Description hadoop.security.authentication The authentication method. Valid values: simple
andkerberos
. Default value:simple
.simple
represents simple authentication, meaning no authentication, andkerberos
represents Kerberos authentication.kerberos_principal The Kerberos principal to be authenticated. Each principal consists of the following three parts to ensure that it is unique across the HDFS cluster: username
orservicename
: The name of the principal.instance
: the name of the server that hosts the node to be authenticated in the HDFS cluster. The server name helps ensure that the principal is unique, for example, when the HDFS cluster consists of multiple DataNodes that each are independently authenticated.realm
: The name of the realm. The realm name must be capitalized.
nn/zelda1@ZELDA.COM
.kerberos_keytab The save path of the Kerberos keytab file. kerberos_keytab_content The Base64-encoded content of the the Kerberos keytab file. You can choose to specify either kerberos_keytab
orkerberos_keytab_content
.
-
-
HA configuration
You can configure an HA mechanism for the NameNode of the HDFS cluster. This way, if the NameNode is switched over to another node, StarRocks can automatically identify the new node that serves as the NameNode. This includes the following scenarios:
-
If you load data from a single HDFS cluster that has one Kerberos user configured, both broker-based loading and broker-free loading are supported.
-
To perform broker-based loading, make sure that at least one independent broker group is deployed, and place the
hdfs-site.xml
file to the{deploy}/conf
path on the broker node that serves the HDFS cluster. StarRocks will add the{deploy}/conf
path to the environment variableCLASSPATH
upon broker startup, allowing the brokers to read information about the HDFS cluster nodes. -
To perform broker-free loading, place the
hdfs-site.xml
file to the{deploy}/conf
paths of each FE node and each BE or CN node.
-
-
If you load data from a single HDFS cluster that has multiple Kerberos users configured, only broker-based loading is supported. Make sure that at least one independent broker group is deployed, and place the
hdfs-site.xml
file to the{deploy}/conf
path on the broker node that serves the HDFS cluster. StarRocks will add the{deploy}/conf
path to the environment variableCLASSPATH
upon broker startup, allowing the brokers to read information about the HDFS cluster nodes. -
If you load data from multiple HDFS clusters (regardless of whether one or multiple Kerberos users are configured), only broker-based loading is supported. Make sure that at least one independent broker group is deployed for each of these HDFS clusters, and take one of the following actions to enable the brokers to read information about the HDFS cluster nodes:
-
Place the
hdfs-site.xml
file to the{deploy}/conf
path on the broker node that serves each HDFS cluster. StarRocks will add the{deploy}/conf
path to the environment variableCLASSPATH
upon broker startup, allowing the brokers to read information about the nodes in that HDFS cluster. -
Add the following HA configuration at job creation:
"dfs.nameservices" = "ha_cluster",
"dfs.ha.namenodes.ha_cluster" = "ha_n1,ha_n2",
"dfs.namenode.rpc-address.ha_cluster.ha_n1" = "<hdfs_host>:<hdfs_port>",
"dfs.namenode.rpc-address.ha_cluster.ha_n2" = "<hdfs_host>:<hdfs_port>",
"dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"The following table describes the parameters in the HA configuration.
Parameter Description dfs.nameservices The name of the HDFS cluster. dfs.ha.namenodes.XXX The name of the NameNode in the HDFS cluster. If you specify multiple NameNode names, separate them with commas ( ,
).xxx
is the HDFS cluster name that you have specified indfs.nameservices
.dfs.namenode.rpc-address.XXX.NN The RPC address of the NameNode in the HDFS cluster. NN
is the NameNode name that you have specified indfs.ha.namenodes.XXX
.dfs.client.failover.proxy.provider The provider of the NameNode to which the client will connect. Default value: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
.
-
NOTE
You can use the SHOW BROKER statement to check for brokers that are deployed in your StarRocks cluster.
-
AWS S3
If you choose AWS S3 as your storage system, take one of the following actions:
-
To choose the instance profile-based authentication method, configure
StorageCredentialParams
as follows:"aws.s3.use_instance_profile" = "true",
"aws.s3.region" = "<aws_s3_region>" -
To choose the assumed role-based authentication method, configure
StorageCredentialParams
as follows:"aws.s3.use_instance_profile" = "true",
"aws.s3.iam_role_arn" = "<iam_role_arn>",
"aws.s3.region" = "<aws_s3_region>" -
To choose the IAM user-based authentication method, configure
StorageCredentialParams
as follows:"aws.s3.use_instance_profile" = "false",
"aws.s3.access_key" = "<iam_user_access_key>",
"aws.s3.secret_key" = "<iam_user_secret_key>",
"aws.s3.region" = "<aws_s3_region>"
The following table describes the parameters you need to configure in StorageCredentialParams
.
Parameter | Required | Description |
---|---|---|
aws.s3.use_instance_profile | Yes | Specifies whether to enable the credential methods instance profile and assumed role. Valid values: true and false . Default value: false . |
aws.s3.iam_role_arn | No | The ARN of the IAM role that has privileges on your AWS S3 bucket. If you choose assumed role as the credential method for accessing AWS S3, you must specify this parameter. |
aws.s3.region | Yes | The region in which your AWS S3 bucket resides. Example: us-west-1 . |
aws.s3.access_key | No | The access key of your IAM user. If you choose IAM user as the credential method for accessing AWS S3, you must specify this parameter. |
aws.s3.secret_key | No | The secret key of your IAM user. If you choose IAM user as the credential method for accessing AWS S3, you must specify this parameter. |
For information about how to choose an authentication method for accessing AWS S3 and how to configure an access control policy in AWS IAM Console, see Authentication parameters for accessing AWS S3.
Google GCS
If you choose Google GCS as your storage system, take one of the following actions:
-
To choose the VM-based authentication method, configure
StorageCredentialParams
as follows:"gcp.gcs.use_compute_engine_service_account" = "true"
The following table describes the parameters you need to configure in
StorageCredentialParams
.Parameter Default value Value example Description gcp.gcs.use_compute_engine_service_account false true Specifies whether to directly use the service account that is bound to your Compute Engine. -
To choose the service account-based authentication method, configure
StorageCredentialParams
as follows:"gcp.gcs.service_account_email" = "<google_service_account_email>",
"gcp.gcs.service_account_private_key_id" = "<google_service_private_key_id>",
"gcp.gcs.service_account_private_key" = "<google_service_private_key>"The following table describes the parameters you need to configure in
StorageCredentialParams
.Parameter Default value Value example Description gcp.gcs.service_account_email "" "user@hello.iam.gserviceaccount.com"
The email address in the JSON file generated at the creation of the service account. gcp.gcs.service_account_private_key_id "" "61d257bd8479547cb3e04f0b9b6b9ca07af3b7ea" The private key ID in the JSON file generated at the creation of the service account. gcp.gcs.service_account_private_key "" "-----BEGIN PRIVATE KEY----xxxx-----END PRIVATE KEY-----\n" The private key in the JSON file generated at the creation of the service account. -
To choose the impersonation-based authentication method, configure
StorageCredentialParams
as follows:-
Make a VM instance impersonate a service account:
"gcp.gcs.use_compute_engine_service_account" = "true",
"gcp.gcs.impersonation_service_account" = "<assumed_google_service_account_email>"The following table describes the parameters you need to configure in
StorageCredentialParams
.Parameter Default value Value example Description gcp.gcs.use_compute_engine_service_account false true Specifies whether to directly use the service account that is bound to your Compute Engine. gcp.gcs.impersonation_service_account "" "hello" The service account that you want to impersonate. -
Make a service account (named as meta service account) impersonate another service account (named as data service account):
"gcp.gcs.service_account_email" = "<google_service_account_email>",
"gcp.gcs.service_account_private_key_id" = "<meta_google_service_account_email>",
"gcp.gcs.service_account_private_key" = "<meta_google_service_account_email>",
"gcp.gcs.impersonation_service_account" = "<data_google_service_account_email>"The following table describes the parameters you need to configure in
StorageCredentialParams
.Parameter Default value Value example Description gcp.gcs.service_account_email "" "user@hello.iam.gserviceaccount.com"
The email address in the JSON file generated at the creation of the meta service account. gcp.gcs.service_account_private_key_id "" "61d257bd8479547cb3e04f0b9b6b9ca07af3b7ea" The private key ID in the JSON file generated at the creation of the meta service account. gcp.gcs.service_account_private_key "" "-----BEGIN PRIVATE KEY----xxxx-----END PRIVATE KEY-----\n" The private key in the JSON file generated at the creation of the meta service account. gcp.gcs.impersonation_service_account "" "hello" The data service account that you want to impersonate.
-
Other S3-compatible storage system
If you choose other S3-compatible storage system, such as MinIO, configure StorageCredentialParams
as follows:
"aws.s3.enable_ssl" = "false",
"aws.s3.enable_path_style_access" = "true",
"aws.s3.endpoint" = "<s3_endpoint>",
"aws.s3.access_key" = "<iam_user_access_key>",
"aws.s3.secret_key" = "<iam_user_secret_key>"
The following table describes the parameters you need to configure in StorageCredentialParams
.
Parameter | Required | Description |
---|---|---|
aws.s3.enable_ssl | Yes | Specifies whether to enable SSL connection. Valid values: true and false . Default value: true . |
aws.s3.enable_path_style_access | Yes | Specifies whether to enable path-style URL access. Valid values: true and false . Default value: false . For MinIO, you must set the value to true . |
aws.s3.endpoint | Yes | The endpoint that is used to connect to your S3-compatible storage system instead of AWS S3. |
aws.s3.access_key | Yes | The access key of your IAM user. |
aws.s3.secret_key | Yes | The secret key of your IAM user. |