- StarRocks
- Introduction to StarRocks
- Quick Start
- Deployment
- Deployment overview
- Prepare
- Deploy
- Deploy shared-nothing StarRocks
- Deploy and use shared-data StarRocks
- Manage
- Table Design
- Understand StarRocks table design
- Table types
- Data distribution
- Data compression
- Sort keys and prefix indexes
- Data Loading
- Concepts
- Overview of data loading
- Load data from a local file system or a streaming data source using HTTP PUT
- Load data from HDFS
- Load data from cloud storage
- Load data from Apache Kafka®
- Continuously load data from Apache Kafka®
- Load data from Apache Sparkâ„¢
- Load data using INSERT
- Load data using Stream Load transaction interface
- Realtime synchronization from MySQL
- Continuously load data from Apache Flink®
- Change data through loading
- Transform data at loading
- Data Unloading
- Query Data Lakes
- Query Acceleration
- Gather CBO statistics
- Synchronous materialized views
- Asynchronous materialized views
- Colocate Join
- Lateral Join
- Query Cache
- Index
- Computing the Number of Distinct Values
- Sorted streaming aggregate
- Integrations
- Administration
- Management
- Data recovery
- User Privilege and Authentication
- Performance Tuning
- Reference
- SQL Reference
- User Account Management
- Cluster Management
- ADD SQLBLACKLIST
- ADMIN CANCEL REPAIR TABLE
- ADMIN CHECK TABLET
- ADMIN REPAIR TABLE
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER STORAGE VOLUME
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- CREATE RESOURCE GROUP
- CREATE STORAGE VOLUME
- DELETE SQLBLACKLIST
- DESC STORAGE VOLUME
- DROP FILE
- DROP RESOURCE GROUP
- DROP STORAGE VOLUME
- EXPLAIN
- INSTALL PLUGIN
- KILL
- SET
- SET DEFAULT STORAGE VOLUME
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROC
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW SQLBLACKLIST
- SHOW STORAGE VOLUMES
- SHOW TABLE STATUS
- SHOW VARIABLES
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ALTER RESOURCE
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE DATABASE
- CREATE EXTERNAL CATALOG
- CREATE FUNCTION
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE VIEW
- DROP ANALYZE
- DROP CATALOG
- DROP DATABASE
- DROP FUNCTION
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP STATS
- DROP TABLE
- DROP VIEW
- HLL
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SET CATALOG
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW FUNCTION
- SHOW META
- SHOW RESOURCES
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- CANCEL REFRESH MATERIALIZED VIEW
- CREATE ROUTINE LOAD
- DELETE
- DROP TASK
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- RESUME ROUTINE LOAD
- SELECT
- SHOW ALTER TABLE
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE CATALOG
- SHOW CREATE DATABASE
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEWS
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- SUBMIT TASK
- UPDATE
- Auxiliary Commands
- Data Types
- Keywords
- Function Reference
- Function list
- Java UDFs
- Window functions
- Lambda expression
- Aggregate Functions
- any_value
- approx_count_distinct
- array_agg
- avg
- bitmap
- bitmap_agg
- count
- corr
- covar_pop
- covar_samp
- group_concat
- grouping
- grouping_id
- hll_empty
- hll_hash
- hll_raw_agg
- hll_union
- hll_union_agg
- max
- max_by
- min
- min_by
- multi_distinct_sum
- multi_distinct_count
- percentile_approx
- percentile_cont
- percentile_disc
- retention
- stddev
- stddev_samp
- sum
- variance, variance_pop, var_pop
- var_samp
- window_funnel
- Array Functions
- all_match
- any_match
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_generate
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- arrays_overlap
- array_to_bitmap
- cardinality
- element_at
- reverse
- unnest
- Bit Functions
- Bitmap Functions
- base64_to_bitmap
- bitmap_agg
- bitmap_and
- bitmap_andnot
- bitmap_contains
- bitmap_count
- bitmap_from_string
- bitmap_empty
- bitmap_has_any
- bitmap_hash
- bitmap_intersect
- bitmap_max
- bitmap_min
- bitmap_or
- bitmap_remove
- bitmap_subset_in_range
- bitmap_subset_limit
- bitmap_to_array
- bitmap_to_base64
- bitmap_to_string
- bitmap_union
- bitmap_union_count
- bitmap_union_int
- bitmap_xor
- intersect_count
- sub_bitmap
- to_bitmap
- JSON Functions
- Overview of JSON functions and operators
- JSON operators
- JSON constructor functions
- JSON query and processing functions
- Map Functions
- Binary Functions
- Conditional Functions
- Cryptographic Functions
- Date Functions
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_diff
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_add
- hours_diff
- hours_sub
- last_day
- makedate
- microseconds_add
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- months_sub
- next_day
- now
- previous_day
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_date
- to_days
- unix_timestamp
- utc_timestamp
- week
- week_iso
- weekofyear
- weeks_add
- day_of_week_iso
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- Geographic Functions
- Math Functions
- String Functions
- append_trailing_char_if_absent
- ascii
- char
- char_length
- character_length
- concat
- concat_ws
- ends_with
- find_in_set
- group_concat
- hex
- hex_decode_binary
- hex_decode_string
- instr
- lcase
- left
- length
- locate
- lower
- lpad
- ltrim
- money_format
- null_or_empty
- parse_url
- repeat
- replace
- reverse
- right
- rpad
- rtrim
- space
- split
- split_part
- starts_with
- strleft
- strright
- str_to_map
- substring
- trim
- ucase
- unhex
- upper
- url_decode
- url_encode
- Pattern Matching Functions
- Percentile Functions
- Scalar Functions
- Struct Functions
- Table Functions
- Utility Functions
- cast function
- hash function
- AUTO_INCREMENT
- Generated columns
- System variables
- User-defined variables
- Error code
- System limits
- AWS IAM policies
- SQL Reference
- FAQ
- Benchmark
- Ecosystem Release Notes
- Developers
- Contribute to StarRocks
- Code Style Guides
- Use the debuginfo file for debugging
- Development Environment
- Trace Tools
Load data using Spark connector (recommended)
StarRocks provides a self-developed connector named StarRocks Connector for Apache Sparkâ„¢ (Spark connector for short) to help you load data into a StarRocks table by using Spark. The basic principle is to accumulate the data and then load it all at a time into StarRocks through STREAM LOAD. The Spark connector is implemented based on Spark DataSource V2. A DataSource can be created by using Spark DataFrames or Spark SQL. And both batch and structured streaming modes are supported.
NOTICE
Only users with the SELECT and INSERT privileges on a StarRocks table can load data into this table. You can follow the instructions provided in GRANT to grant these privileges to a user.
Version requirements
Spark connector | Spark | StarRocks | Java | Scala |
---|---|---|---|---|
1.1.1 | 3.2, 3.3, or 3.4 | 2.5 and later | 8 | 2.12 |
1.1.0 | 3.2, 3.3, or 3.4 | 2.5 and later | 8 | 2.12 |
NOTICE
- Please see Upgrade Spark connector for behaviour changes among different versions of the Spark connector.
- The Spark connector does not provide MySQL JDBC driver since version 1.1.1, and you need import the driver to the spark classpath manually. You can find the driver on MySQL site or Maven Central.
Obtain Spark connector
You can obtain the Spark connector JAR file in the following ways:
- Directly download the compiled Spark Connector JAR file.
- Add the Spark connector as a dependency in your Maven project and then download the JAR file.
- Compile the source code of the Spark Connector into a JAR file by yourself.
The naming format of the Spark connector JAR file is starrocks-spark-connector-${spark_version}_${scala_version}-${connector_version}.jar
.
For example, if you install Spark 3.2 and Scala 2.12 in your environment and you want to use Spark connector 1.1.0, you can use starrocks-spark-connector-3.2_2.12-1.1.0.jar
.
NOTICE
In general, the latest version of the Spark connector only maintains compatibility with the three most recent versions of Spark.
Download the compiled Jar file
Directly download the corresponding version of the Spark connector JAR from the Maven Central Repository.
Maven Dependency
In your Maven project's
pom.xml
file, add the Spark connector as a dependency according to the following format. Replacespark_version
,scala_version
, andconnector_version
with the respective versions.<dependency> <groupId>com.starrocks</groupId> <artifactId>starrocks-spark-connector-${spark_version}_${scala_version}</artifactId> <version>${connector_version}</version> </dependency>
For example, if the version of Spark in your environment is 3.2, the version of Scala is 2.12, and you choose Spark connector 1.1.0, you need to add the following dependency:
<dependency> <groupId>com.starrocks</groupId> <artifactId>starrocks-spark-connector-3.2_2.12</artifactId> <version>1.1.0</version> </dependency>
Compile by yourself
Download the Spark connector package.
Execute the following command to compile the source code of Spark connector into a JAR file. Note that
spark_version
is replaced with the corresponding Spark version.sh build.sh <spark_version>
For example, if the Spark version in your environment is 3.2, you need to execute the following command:
sh build.sh 3.2
Go to the
target/
directory to find the Spark connector JAR file, such asstarrocks-spark-connector-3.2_2.12-1.1.0-SNAPSHOT.jar
, generated upon compilation.
NOTE
The name of Spark connector which is not formally released contains the
SNAPSHOT
suffix.
Parameters
Parameter | Required | Default value | Description |
---|---|---|---|
starrocks.fe.http.url | YES | None | The HTTP URL of the FE in your StarRocks cluster. You can specify multiple URLs, which must be separated by a comma (,). Format: <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2> . Since version 1.1.1, you can also add http:// prefix to the URL, such as http://<fe_host1>:<fe_http_port1>,http://<fe_host2>:<fe_http_port2> . |
starrocks.fe.jdbc.url | YES | None | The address that is used to connect to the MySQL server of the FE. Format: jdbc:mysql://<fe_host>:<fe_query_port> . |
starrocks.table.identifier | YES | None | The name of the StarRocks table. Format: <database_name>.<table_name> . |
starrocks.user | YES | None | The username of your StarRocks cluster account. The user needs the SELECT and INSERT privileges on the StarRocks table. |
starrocks.password | YES | None | The password of your StarRocks cluster account. |
starrocks.write.label.prefix | NO | spark- | The label prefix used by Stream Load. |
starrocks.write.enable.transaction-stream-load | NO | TRUE | Whether to use Stream Load transaction interface to load data. It requires StarRocks v2.5 or later. This feature can load more data in a transaction with less memory usage, and improve performance. NOTICE: Since 1.1.1, this parameter takes effect only when the value of starrocks.write.max.retries is non-positive because Stream Load transaction interface does not support retry. |
starrocks.write.buffer.size | NO | 104857600 | The maximum size of data that can be accumulated in memory before being sent to StarRocks at a time. Setting this parameter to a larger value can improve loading performance but may increase loading latency. |
starrocks.write.buffer.rows | NO | Integer.MAX_VALUE | Supported since version 1.1.1. The maximum number of rows that can be accumulated in memory before being sent to StarRocks at a time. |
starrocks.write.flush.interval.ms | NO | 300000 | The interval at which data is sent to StarRocks. This parameter is used to control the loading latency. |
starrocks.write.max.retries | NO | 3 | Supported since version 1.1.1. The number of times that the connector retries to perform the Stream Load for the same batch of data if the load fails. NOTICE: Because Stream Load transaction interface does not support retry. If this parameter is positive, the connector always use Stream Load interface and ignore the value of starrocks.write.enable.transaction-stream-load . |
starrocks.write.retry.interval.ms | NO | 10000 | Supported since version 1.1.1. The interval to retry the Stream Load for the same batch of data if the load fails. |
starrocks.columns | NO | None | The StarRocks table column into which you want to load data. You can specify multiple columns, which must be separated by commas (,), for example, "col0,col1,col2" . |
starrocks.column.types | NO | None | Supported since version 1.1.1. Customize the column data types for Spark instead of using the defaults inferred from the StarRocks table and the default mapping. The parameter value is a schema in DDL format same as the output of Spark StructType#toDDL , such as col0 INT, col1 STRING, col2 BIGINT . Note that you only need to specify columns that need customization. One use case is to load data into columns of BITMAP or HLL type. |
starrocks.write.properties.* | NO | None | The parameters that are used to control Stream Load behavior. For example, the parameter starrocks.write.properties.format specifies the format of the data to be loaded, such as CSV or JSON. For a list of supported parameters and their descriptions, see STREAM LOAD. |
starrocks.write.properties.format | NO | CSV | The file format based on which the Spark connector transforms each batch of data before the data is sent to StarRocks. Valid values: CSV and JSON. |
starrocks.write.properties.row_delimiter | NO | \n | The row delimiter for CSV-formatted data. |
starrocks.write.properties.column_separator | NO | \t | The column separator for CSV-formatted data. |
starrocks.write.num.partitions | NO | None | The number of partitions into which Spark can write data in parallel. When the data volume is small, you can reduce the number of partitions to lower the loading concurrency and frequency. The default value for this parameter is determined by Spark. However, this method may cause Spark Shuffle cost. |
starrocks.write.partition.columns | NO | None | The partitioning columns in Spark. The parameter takes effect only when starrocks.write.num.partitions is specified. If this parameter is not specified, all columns being written are used for partitioning. |
starrocks.timezone | NO | Default timezone of JVM | Supported since 1.1.1. The timezone used to convert Spark TimestampType to StarRocks DATETIME . The default is the timezone of JVM returned by ZoneId#systemDefault() . The format can be a timezone name such as Asia/Shanghai , or a zone offset such as +08:00 . |
Data type mapping between Spark and StarRocks
The default data type mapping is as follows:
Spark data type StarRocks data type BooleanType BOOLEAN ByteType TINYINT ShortType SMALLINT IntegerType INT LongType BIGINT StringType LARGEINT FloatType FLOAT DoubleType DOUBLE DecimalType DECIMAL StringType CHAR StringType VARCHAR StringType STRING DateType DATE TimestampType DATETIME ArrayType ARRAY
NOTE:
Supported since version 1.1.1. For detailed steps, see Load data into columns of ARRAY type.You can also customize the data type mapping.
For example, a StarRocks table contains BITMAP and HLL columns, but Spark does not support the two data types. You need to customize the corresponding data types in Spark. For detailed steps, see load data into BITMAP and HLL columns. BITMAP and HLL are supported since version 1.1.1.
Upgrade Spark connector
Upgrade from version 1.1.0 to 1.1.1
- Since 1.1.1, the Spark connector does not provide
mysql-connector-java
which is the official JDBC driver for MySQL, because of the limitations of the GPL license used bymysql-connector-java
. However, the Spark connector still needs the MySQL JDBC driver to connect to StarRocks for the table metadata, so you need to add the driver to the Spark classpath manually. You can find the driver on MySQL site or Maven Central. - Since 1.1.1, the connector uses Stream Load interface by default rather than Stream Load transaction interface in version 1.1.0. If you still want to use Stream Load transaction interface, you
can set the option
starrocks.write.max.retries
to0
. Please see the description ofstarrocks.write.enable.transaction-stream-load
andstarrocks.write.max.retries
for details.
Examples
The following examples show how to use the Spark connector to load data into a StarRocks table with Spark DataFrames or Spark SQL. The Spark DataFrames supports both Batch and Structured Streaming modes.
For more examples, see Spark Connector Examples.
Preparations
Create a StarRocks table
Create a database test
and create a Primary Key table score_board
.
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
Set up your Spark environment
Note that the following examples are run in Spark 3.2.4 and use spark-shell
, pyspark
and spark-sql
. Before running the examples, make sure to place the Spark connector JAR file in the $SPARK_HOME/jars
directory.
Load data with Spark DataFrames
The following two examples explain how to load data with Spark DataFrames Batch or Structured Streaming mode.
Batch
Construct data in memory and load data into the StarRocks table.
- You can write the spark application using Scala or Python.
For Scala, run the following code snippet in spark-shell
:
// 1. Create a DataFrame from a sequence.
val data = Seq((1, "starrocks", 100), (2, "spark", 100))
val df = data.toDF("id", "name", "score")
// 2. Write to StarRocks by configuring the format as "starrocks" and the following options.
// You need to modify the options according your own environment.
df.write.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
.mode("append")
.save()
For Python, run the following code snippet in pyspark
:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("StarRocks Example") \
.getOrCreate()
# 1. Create a DataFrame from a sequence.
data = [(1, "starrocks", 100), (2, "spark", 100)]
df = spark.sparkContext.parallelize(data) \
.toDF(["id", "name", "score"])
# 2. Write to StarRocks by configuring the format as "starrocks" and the following options.
# You need to modify the options according your own environment.
df.write.format("starrocks") \
.option("starrocks.fe.http.url", "127.0.0.1:8030") \
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030") \
.option("starrocks.table.identifier", "test.score_board") \
.option("starrocks.user", "root") \
.option("starrocks.password", "") \
.mode("append") \
.save()
```
2. Query data in the StarRocks table.
```sql
MySQL [test]> SELECT * FROM `score_board`;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 1 | starrocks | 100 |
| 2 | spark | 100 |
+------+-----------+-------+
2 rows in set (0.00 sec)
```
#### Structured Streaming
Construct a streaming read of data from a CSV file and load data into the StarRocks table.
1. In the directory `csv-data`, create a CSV file `test.csv` with the following data:
```csv
3,starrocks,100
4,spark,100
```
2. You can write the Spark application using Scala or Python.
For Scala, run the following code snippet in `spark-shell`:
```Scala
import org.apache.spark.sql.types.StructType
// 1. Create a DataFrame from CSV.
val schema = (new StructType()
.add("id", "integer")
.add("name", "string")
.add("score", "integer")
)
val df = (spark.readStream
.option("sep", ",")
.schema(schema)
.format("csv")
// Replace it with your path to the directory "csv-data".
.load("/path/to/csv-data")
)
// 2. Write to StarRocks by configuring the format as "starrocks" and the following options.
// You need to modify the options according your own environment.
val query = (df.writeStream.format("starrocks")
.option("starrocks.fe.http.url", "127.0.0.1:8030")
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030")
.option("starrocks.table.identifier", "test.score_board")
.option("starrocks.user", "root")
.option("starrocks.password", "")
// replace it with your checkpoint directory
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("append")
.start()
)
```
For Python, run the following code snippet in `pyspark`:
```python
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructType, StructField
spark = SparkSession \
.builder \
.appName("StarRocks SS Example") \
.getOrCreate()
# 1. Create a DataFrame from CSV.
schema = StructType([ \
StructField("id", IntegerType()), \
StructField("name", StringType()), \
StructField("score", IntegerType()) \
])
df = spark.readStream \
.option("sep", ",") \
.schema(schema) \
.format("csv") \
# Replace it with your path to the directory "csv-data".
.load("/path/to/csv-data")
# 2. Write to StarRocks by configuring the format as "starrocks" and the following options.
# You need to modify the options according your own environment.
query = df.writeStream.format("starrocks") \
.option("starrocks.fe.http.url", "127.0.0.1:8030") \
.option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030") \
.option("starrocks.table.identifier", "test.score_board") \
.option("starrocks.user", "root") \
.option("starrocks.password", "") \
# replace it with your checkpoint directory
.option("checkpointLocation", "/path/to/checkpoint") \
.outputMode("append") \
.start()
)
```
3. Query data in the StarRocks table.
```SQL
MySQL [test]> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 4 | spark | 100 |
| 3 | starrocks | 100 |
+------+-----------+-------+
2 rows in set (0.67 sec)
```
### Load data with Spark SQL
The following example explains how to load data with Spark SQL by using the `INSERT INTO` statement in the [Spark SQL CLI](https://spark.apache.org/docs/latest/sql-distributed-sql-engine-spark-sql-cli.html).
1. Execute the following SQL statement in the `spark-sql`:
```SQL
-- 1. Create a table by configuring the data source as `starrocks` and the following options.
-- You need to modify the options according your own environment.
CREATE TABLE `score_board`
USING starrocks
OPTIONS(
"starrocks.fe.http.url"="127.0.0.1:8030",
"starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030",
"starrocks.table.identifier"="test.score_board",
"starrocks.user"="root",
"starrocks.password"=""
);
-- 2. Insert two rows into the table.
INSERT INTO `score_board` VALUES (5, "starrocks", 100), (6, "spark", 100);
```
2. Query data in the StarRocks table.
```SQL
MySQL [test]> select * from score_board;
+------+-----------+-------+
| id | name | score |
+------+-----------+-------+
| 6 | spark | 100 |
| 5 | starrocks | 100 |
+------+-----------+-------+
2 rows in set (0.00 sec)
```
## Best Practices
### Load data to Primary Key table
This section will show how to load data to StarRocks Primary Key table to achieve partial updates, and conditional updates.
You can see [Change data through loading](../loading/Load_to_Primary_Key_tables) for the detailed introduction of these features.
These examples use Spark SQL.
#### Preparations
Create a database `test` and create a Primary Key table `score_board` in StarRocks.
```SQL
CREATE DATABASE `test`;
CREATE TABLE `test`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
Partial updates
This example will show how to only update data in the column name
through loading:
Insert initial data to StarRocks table in MySQL client.
mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'spark', 100); mysql> select * from score_board; +------+-----------+-------+ | id | name | score | +------+-----------+-------+ | 1 | starrocks | 100 | | 2 | spark | 100 | +------+-----------+-------+ 2 rows in set (0.02 sec)
Create a Spark table
score_board
in Spark SQL client.- Set the option
starrocks.write.properties.partial_update
totrue
which tells the connector to do partial update. - Set the option
starrocks.columns
to"id,name"
to tell the connector which columns to write.
CREATE TABLE `score_board` USING starrocks OPTIONS( "starrocks.fe.http.url"="127.0.0.1:8030", "starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030", "starrocks.table.identifier"="test.score_board", "starrocks.user"="root", "starrocks.password"="", "starrocks.write.properties.partial_update"="true", "starrocks.columns"="id,name" );
- Set the option
Insert data into the table in Spark SQL client, and only update the column
name
.INSERT INTO `score_board` VALUES (1, 'starrocks-update'), (2, 'spark-update');
Query the StarRocks table in MySQL client.
You can see that only values for
name
change, and the values forscore
does not change.mysql> select * from score_board; +------+------------------+-------+ | id | name | score | +------+------------------+-------+ | 1 | starrocks-update | 100 | | 2 | spark-update | 100 | +------+------------------+-------+ 2 rows in set (0.02 sec)
Conditional updates
This example will show how to do conditional updates according to the values of column score
. The update for an id
takes effect only when the new value for score
is has a greater or equal to the old value.
Insert initial data to StarRocks table in MySQL client.
mysql> INSERT INTO `score_board` VALUES (1, 'starrocks', 100), (2, 'spark', 100); mysql> select * from score_board; +------+-----------+-------+ | id | name | score | +------+-----------+-------+ | 1 | starrocks | 100 | | 2 | spark | 100 | +------+-----------+-------+ 2 rows in set (0.02 sec)
Create a Spark table
score_board
in the following ways.- Set the option
starrocks.write.properties.merge_condition
toscore
which tells the connector to use the columnscore
as the condition. - Make sure that the Spark connector use Stream Load interface to load data, rather than Stream Load transaction interface, because the latter does not support this feature.
CREATE TABLE `score_board` USING starrocks OPTIONS( "starrocks.fe.http.url"="127.0.0.1:8030", "starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030", "starrocks.table.identifier"="test.score_board", "starrocks.user"="root", "starrocks.password"="", "starrocks.write.properties.merge_condition"="score" );
- Set the option
Insert data to the table in Spark SQL client, and update the row whose
id
is 1 with a smaller score value, and the row whoseid
is 2 with a larger score value.INSERT INTO `score_board` VALUES (1, 'starrocks-update', 99), (2, 'spark-update', 101);
Query the StarRocks table in MySQL client.
You can see that only the row whose
id
is 2 changes, and the row whoseid
is 1 does not change.mysql> select * from score_board; +------+--------------+-------+ | id | name | score | +------+--------------+-------+ | 1 | starrocks | 100 | | 2 | spark-update | 101 | +------+--------------+-------+ 2 rows in set (0.03 sec)
Load data into columns of BITMAP type
BITMAP
is often used to accelerate count distinct, such as counting UV, see Use Bitmap for exact Count Distinct.
Here we take the counting of UV as an example to show how to load data into columns of the BITMAP
type. BITMAP
is supported since version 1.1.1.
Create a StarRocks Aggregate table.
In the database
test
, create an Aggregate tablepage_uv
where the columnvisit_users
is defined as theBITMAP
type and configured with the aggregate functionBITMAP_UNION
.CREATE TABLE `test`.`page_uv` ( `page_id` INT NOT NULL COMMENT 'page ID', `visit_date` datetime NOT NULL COMMENT 'access time', `visit_users` BITMAP BITMAP_UNION NOT NULL COMMENT 'user ID' ) ENGINE=OLAP AGGREGATE KEY(`page_id`, `visit_date`) DISTRIBUTED BY HASH(`page_id`);
Create a Spark table.
The schema of the Spark table is inferred from the StarRocks table, and the Spark does not support the
BITMAP
type. So you need to customize the corresponding column data type in Spark, for example asBIGINT
, by configuring the option"starrocks.column.types"="visit_users BIGINT"
. When using Stream Load to ingest data, the connector uses theto_bitmap
function to convert the data ofBIGINT
type intoBITMAP
type.Run the following DDL in
spark-sql
:CREATE TABLE `page_uv` USING starrocks OPTIONS( "starrocks.fe.http.url"="127.0.0.1:8030", "starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030", "starrocks.table.identifier"="test.page_uv", "starrocks.user"="root", "starrocks.password"="", "starrocks.column.types"="visit_users BIGINT" );
Load data into StarRocks table.
Run the following DML in
spark-sql
:INSERT INTO `page_uv` VALUES (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 13), (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23), (1, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 33), (1, CAST('2020-06-23 02:30:30' AS TIMESTAMP), 13), (2, CAST('2020-06-23 01:30:30' AS TIMESTAMP), 23);
Calculate page UVs from the StarRocks table.
MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `page_uv` GROUP BY `page_id`; +---------+-----------------------------+ | page_id | count(DISTINCT visit_users) | +---------+-----------------------------+ | 2 | 1 | | 1 | 3 | +---------+-----------------------------+ 2 rows in set (0.01 sec)
NOTICE:
The connector uses
to_bitmap
function to convert data of theTINYINT
,SMALLINT
,INTEGER
, andBIGINT
types in Spark to theBITMAP
type in StarRocks, and usesbitmap_hash
function for other Spark data types.
Load data into columns of HLL type
HLL
can be used for approximate count distinct, see Use HLL for approximate count distinct.
Here we take the counting of UV as an example to show how to load data into columns of the HLL
type. HLL
is supported since version 1.1.1.
Create a StarRocks Aggregate table.
In the database
test
, create an Aggregate tablehll_uv
where the columnvisit_users
is defined as theHLL
type and configured with the aggregate functionHLL_UNION
.CREATE TABLE `hll_uv` ( `page_id` INT NOT NULL COMMENT 'page ID', `visit_date` datetime NOT NULL COMMENT 'access time', `visit_users` HLL HLL_UNION NOT NULL COMMENT 'user ID' ) ENGINE=OLAP AGGREGATE KEY(`page_id`, `visit_date`) DISTRIBUTED BY HASH(`page_id`);
Create a Spark table.
The schema of the Spark table is inferred from the StarRocks table, and the Spark does not support the
HLL
type. So you need to customize the corresponding column data type in Spark, for example asBIGINT
, by configuring the option"starrocks.column.types"="visit_users BIGINT"
. When using Stream Load to ingest data, the connector uses thehll_hash
function to convert the data ofBIGINT
type intoHLL
type.Run the following DDL in
spark-sql
:CREATE TABLE `hll_uv` USING starrocks OPTIONS( "starrocks.fe.http.url"="127.0.0.1:8030", "starrocks.fe.jdbc.url"="jdbc:mysql://127.0.0.1:9030", "starrocks.table.identifier"="test.hll_uv", "starrocks.user"="root", "starrocks.password"="", "starrocks.column.types"="visit_users BIGINT" );
Load data into StarRocks table.
Run the following DML in
spark-sql
:INSERT INTO `hll_uv` VALUES (3, CAST('2023-07-24 12:00:00' AS TIMESTAMP), 78), (4, CAST('2023-07-24 13:20:10' AS TIMESTAMP), 2), (3, CAST('2023-07-24 12:30:00' AS TIMESTAMP), 674);
Calculate page UVs from the StarRocks table.
MySQL [test]> SELECT `page_id`, COUNT(DISTINCT `visit_users`) FROM `hll_uv` GROUP BY `page_id`; +---------+-----------------------------+ | page_id | count(DISTINCT visit_users) | +---------+-----------------------------+ | 4 | 1 | | 3 | 2 | +---------+-----------------------------+ 2 rows in set (0.01 sec)
Load data into columns of ARRAY type
The following example explains how to load data into columns of the ARRAY
type.
Create a StarRocks table.
In the database
test
, create a Primary Key tablearray_tbl
that includes oneINT
column and twoARRAY
columns.CREATE TABLE `array_tbl` ( `id` INT NOT NULL, `a0` ARRAY<STRING>, `a1` ARRAY<ARRAY<INT>> ) ENGINE=OLAP PRIMARY KEY(`id`) DISTRIBUTED BY HASH(`id`) ;
Write data to StarRocks.
Because some versions of StarRocks does not provide the metadata of
ARRAY
column, the connector can not infer the corresponding Spark data type for this column. However, you can explicitly specify the corresponding Spark data type of the column in the optionstarrocks.column.types
. In this example, you can configure the option asa0 ARRAY<STRING>,a1 ARRAY<ARRAY<INT>>
.Run the following codes in
spark-shell
:val data = Seq( | (1, Seq("hello", "starrocks"), Seq(Seq(1, 2), Seq(3, 4))), | (2, Seq("hello", "spark"), Seq(Seq(5, 6, 7), Seq(8, 9, 10))) | ) val df = data.toDF("id", "a0", "a1") df.write .format("starrocks") .option("starrocks.fe.http.url", "127.0.0.1:8030") .option("starrocks.fe.jdbc.url", "jdbc:mysql://127.0.0.1:9030") .option("starrocks.table.identifier", "test.array_tbl") .option("starrocks.user", "root") .option("starrocks.password", "") .option("starrocks.column.types", "a0 ARRAY<STRING>,a1 ARRAY<ARRAY<INT>>") .mode("append") .save()
Query data in the StarRocks table.
MySQL [test]> SELECT * FROM `array_tbl`; +------+-----------------------+--------------------+ | id | a0 | a1 | +------+-----------------------+--------------------+ | 1 | ["hello","starrocks"] | [[1,2],[3,4]] | | 2 | ["hello","spark"] | [[5,6,7],[8,9,10]] | +------+-----------------------+--------------------+ 2 rows in set (0.01 sec)