- StarRocks
- Introduction to StarRocks
- Quick Start
- Deployment
- Deployment overview
- Prepare
- Deploy
- Deploy shared-nothing StarRocks
- Deploy and use shared-data StarRocks
- Manage
- Table Design
- Understand StarRocks table design
- Table types
- Data distribution
- Data compression
- Sort keys and prefix indexes
- Data Loading
- Concepts
- Overview of data loading
- Load data from a local file system or a streaming data source using HTTP PUT
- Load data from HDFS
- Load data from cloud storage
- Load data from Apache Kafka®
- Continuously load data from Apache Kafka®
- Load data from Apache Spark™
- Load data using INSERT
- Load data using Stream Load transaction interface
- Realtime synchronization from MySQL
- Continuously load data from Apache Flink®
- Change data through loading
- Transform data at loading
- Data Unloading
- Query Data Lakes
- Query Acceleration
- Gather CBO statistics
- Synchronous materialized views
- Asynchronous materialized views
- Colocate Join
- Lateral Join
- Query Cache
- Index
- Computing the Number of Distinct Values
- Sorted streaming aggregate
- Integrations
- Administration
- Management
- Data recovery
- User Privilege and Authentication
- Performance Tuning
- Reference
- SQL Reference
- User Account Management
- Cluster Management
- ADD SQLBLACKLIST
- ADMIN CANCEL REPAIR TABLE
- ADMIN CHECK TABLET
- ADMIN REPAIR TABLE
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ALTER RESOURCE GROUP
- ALTER STORAGE VOLUME
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE FILE
- CREATE RESOURCE GROUP
- CREATE STORAGE VOLUME
- DELETE SQLBLACKLIST
- DESC STORAGE VOLUME
- DROP FILE
- DROP RESOURCE GROUP
- DROP STORAGE VOLUME
- EXPLAIN
- INSTALL PLUGIN
- KILL
- SET
- SET DEFAULT STORAGE VOLUME
- SHOW BACKENDS
- SHOW BROKER
- SHOW COMPUTE NODES
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW PLUGINS
- SHOW PROC
- SHOW PROCESSLIST
- SHOW RESOURCE GROUP
- SHOW SQLBLACKLIST
- SHOW STORAGE VOLUMES
- SHOW TABLE STATUS
- SHOW VARIABLES
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER MATERIALIZED VIEW
- ALTER TABLE
- ALTER VIEW
- ALTER RESOURCE
- ANALYZE TABLE
- BACKUP
- CANCEL ALTER TABLE
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE ANALYZE
- CREATE DATABASE
- CREATE EXTERNAL CATALOG
- CREATE FUNCTION
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE
- CREATE TABLE AS SELECT
- CREATE TABLE LIKE
- CREATE VIEW
- DROP ANALYZE
- DROP CATALOG
- DROP DATABASE
- DROP FUNCTION
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP STATS
- DROP TABLE
- DROP VIEW
- HLL
- KILL ANALYZE
- RECOVER
- REFRESH EXTERNAL TABLE
- RESTORE
- SET CATALOG
- SHOW ANALYZE JOB
- SHOW ANALYZE STATUS
- SHOW FUNCTION
- SHOW META
- SHOW RESOURCES
- TRUNCATE TABLE
- USE
- DML
- ALTER LOAD
- ALTER ROUTINE LOAD
- BROKER LOAD
- CANCEL LOAD
- CANCEL EXPORT
- CANCEL REFRESH MATERIALIZED VIEW
- CREATE ROUTINE LOAD
- DELETE
- DROP TASK
- EXPORT
- GROUP BY
- INSERT
- PAUSE ROUTINE LOAD
- REFRESH MATERIALIZED VIEW
- RESUME ROUTINE LOAD
- SELECT
- SHOW ALTER TABLE
- SHOW ALTER MATERIALIZED VIEW
- SHOW BACKUP
- SHOW CATALOGS
- SHOW CREATE CATALOG
- SHOW CREATE DATABASE
- SHOW CREATE MATERIALIZED VIEW
- SHOW CREATE TABLE
- SHOW CREATE VIEW
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW MATERIALIZED VIEWS
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD
- SHOW ROUTINE LOAD TASK
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- SUBMIT TASK
- UPDATE
- Auxiliary Commands
- Data Types
- Keywords
- Function Reference
- Function list
- Java UDFs
- Window functions
- Lambda expression
- Aggregate Functions
- any_value
- approx_count_distinct
- array_agg
- avg
- bitmap
- bitmap_agg
- count
- corr
- covar_pop
- covar_samp
- group_concat
- grouping
- grouping_id
- hll_empty
- hll_hash
- hll_raw_agg
- hll_union
- hll_union_agg
- max
- max_by
- min
- min_by
- multi_distinct_sum
- multi_distinct_count
- percentile_approx
- percentile_cont
- percentile_disc
- retention
- stddev
- stddev_samp
- sum
- variance, variance_pop, var_pop
- var_samp
- window_funnel
- Array Functions
- all_match
- any_match
- array_agg
- array_append
- array_avg
- array_concat
- array_contains
- array_contains_all
- array_cum_sum
- array_difference
- array_distinct
- array_filter
- array_generate
- array_intersect
- array_join
- array_length
- array_map
- array_max
- array_min
- array_position
- array_remove
- array_slice
- array_sort
- array_sortby
- array_sum
- arrays_overlap
- array_to_bitmap
- cardinality
- element_at
- reverse
- unnest
- Bit Functions
- Bitmap Functions
- base64_to_bitmap
- bitmap_agg
- bitmap_and
- bitmap_andnot
- bitmap_contains
- bitmap_count
- bitmap_from_string
- bitmap_empty
- bitmap_has_any
- bitmap_hash
- bitmap_intersect
- bitmap_max
- bitmap_min
- bitmap_or
- bitmap_remove
- bitmap_subset_in_range
- bitmap_subset_limit
- bitmap_to_array
- bitmap_to_base64
- bitmap_to_string
- bitmap_union
- bitmap_union_count
- bitmap_union_int
- bitmap_xor
- intersect_count
- sub_bitmap
- to_bitmap
- JSON Functions
- Overview of JSON functions and operators
- JSON operators
- JSON constructor functions
- JSON query and processing functions
- Map Functions
- Binary Functions
- Conditional Functions
- Cryptographic Functions
- Date Functions
- add_months
- adddate
- convert_tz
- current_date
- current_time
- current_timestamp
- date
- date_add
- date_diff
- date_format
- date_slice
- date_sub, subdate
- date_trunc
- datediff
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- days_add
- days_diff
- days_sub
- from_days
- from_unixtime
- hour
- hours_add
- hours_diff
- hours_sub
- last_day
- makedate
- microseconds_add
- microseconds_sub
- minute
- minutes_add
- minutes_diff
- minutes_sub
- month
- monthname
- months_add
- months_diff
- months_sub
- next_day
- now
- previous_day
- quarter
- second
- seconds_add
- seconds_diff
- seconds_sub
- str_to_date
- str2date
- time_slice
- time_to_sec
- timediff
- timestamp
- timestampadd
- timestampdiff
- to_date
- to_days
- unix_timestamp
- utc_timestamp
- week
- week_iso
- weekofyear
- weeks_add
- day_of_week_iso
- weeks_diff
- weeks_sub
- year
- years_add
- years_diff
- years_sub
- Geographic Functions
- Math Functions
- String Functions
- append_trailing_char_if_absent
- ascii
- char
- char_length
- character_length
- concat
- concat_ws
- ends_with
- find_in_set
- group_concat
- hex
- hex_decode_binary
- hex_decode_string
- instr
- lcase
- left
- length
- locate
- lower
- lpad
- ltrim
- money_format
- null_or_empty
- parse_url
- repeat
- replace
- reverse
- right
- rpad
- rtrim
- space
- split
- split_part
- starts_with
- strleft
- strright
- str_to_map
- substring
- trim
- ucase
- unhex
- upper
- url_decode
- url_encode
- Pattern Matching Functions
- Percentile Functions
- Scalar Functions
- Struct Functions
- Table Functions
- Utility Functions
- cast function
- hash function
- AUTO_INCREMENT
- Generated columns
- System variables
- User-defined variables
- Error code
- System limits
- AWS IAM policies
- SQL Reference
- FAQ
- Benchmark
- Ecosystem Release Notes
- Developers
- Contribute to StarRocks
- Code Style Guides
- Use the debuginfo file for debugging
- Development Environment
- Trace Tools
[Preview] Continuously load data from Apache® Pulsar™
As of StarRocks version 2.5, Routine Load supports continuously loading data from Apache® Pulsar™. Pulsar is distributed, open source pub-sub messaging and streaming platform with a store-compute separation architecture. Loading data from Pulsar via Routine Load is similar to loading data from Apache Kafka. This topic uses CSV-formatted data as an example to introduce how to load data from Apache Pulsar via Routine Load.
Supported data file formats
Routine Load supports consuming CSV and JSON formatted data from a Pulsar cluster.
NOTE
As for data in CSV format, StarRocks supports UTF-8 encoded strings within 50 bytes as column separators. Commonly used column separators include comma (,), tab and pipe (|).
Pulsar-related concepts
Topics in Pulsar are named channels for transmitting messages from producers to consumers. Topics in Pulsar are divided into partitioned topics and non-partitioned topics.
- Partitioned topics are a special type of topic that are handled by multiple brokers, thus allowing for higher throughput. A partitioned topic is actually implemented as N internal topics, where N is the number of partitions.
- Non-partitioned topics are a normal type of topic that are served only by a single broker, which limits the maximum throughput of the topic.
The message ID of a message is assigned by BookKeeper instances as soon as the message is persistently stored. Message ID indicates a message' s specific position in a ledger and is unique within a Pulsar cluster.
Pulsar supports consumers specifying the initial position through consumer.seek(messageId). But compared to the Kafka consumer offset which is a long integer value, the message ID consists of four parts: ledgerId:entryID:partition-index:batch-index
.
Therefore, you can't get the Message ID directly from the message. As a result, at present, Routine Load does not support specifying initial position when loading data from Pulsar, but only supports consuming data from the beginning or end of a partition.
A subscription is a named configuration rule that determines how messages are delivered to consumers. Pulsar also supports consumers simultaneously subscribing to multiple topics. A topic can have multiple subscriptions.
The type of a subscription is defined when a consumer connects to it, and the type can be changed by restarting all consumers with a different configuration. Four subscription types are available in Pulsar:
exclusive
(default): Only a single consumer is allowed to attach to the subscription. Only one customer is allowed to consume messages.shared
: Multiple consumers can attach to the same subscription. Messages are delivered in a round robin distribution across consumers, and any given message is delivered to only one consumer.failover
: Multiple consumers can attach to the same subscription. A master consumer is picked for a non-partitioned topic or each partition of a partitioned topic and receives messages. When the master consumer disconnects, all (non-acknowledged and subsequent) messages are delivered to the next consumer in line.key_shared
: Multiple consumers can attach to the same subscription. Messages are delivered in a distribution across consumers and message with same key or same ordering key are delivered to only one consumer.
Note:
Currently Routine Load uses exclusive type.
Create a Routine Load job
The following examples describe how to consume CSV-formatted messages in Pulsar, and load the data into StarRocks by creating a Routine Load job. For detailed instruction and reference, see CREATE ROUTINE LOAD.
CREATE ROUTINE LOAD load_test.routine_wiki_edit_1 ON routine_wiki_edit
COLUMNS TERMINATED BY ",",
ROWS TERMINATED BY "\n",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
WHERE event_time > "2022-01-01 00:00:00",
PROPERTIES
(
"desired_concurrent_number" = "1",
"max_batch_interval" = "15000",
"max_error_number" = "1000"
)
FROM PULSAR
(
"pulsar_service_url" = "pulsar://localhost:6650",
"pulsar_topic" = "persistent://tenant/namespace/topic-name",
"pulsar_subscription" = "load-test",
"pulsar_partitions" = "load-partition-0,load-partition-1",
"pulsar_initial_positions" = "POSITION_EARLIEST,POSITION_LATEST",
"property.auth.token" = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD5Y"
);
When Routine Load is created to consume data from Pulsar, most input parameters except for data_source_properties
are the same as consuming data from Kafka . For descriptions about parameters except data_source_properties data_source_properties
, see CREATE ROUTINE LOAD.
The parameters related to data_source_properties
and their descriptions are as follows:
Parameter | Required | Description |
---|---|---|
pulsar_service_url | Yes | The URL that is used to connect to the Pulsar cluster. Format: "pulsar://ip:port" or "pulsar://service:port" .Example: "pulsar_service_url" = "pulsar://``localhost:6650``" |
pulsar_topic | Yes | Subscribed topic. Example: "pulsar_topic" = "persistent://tenant/namespace/topic-name" |
pulsar_subscription | Yes | Subscription configured for the topic.Example: "pulsar_subscription" = "my_subscription" |
pulsar_partitions, pulsar_initial_positions | No | pulsar_partitions : Subscribed partitions in the topic.pulsar_initial_positions : initial positions of partitions specified by pulsar_partitions . The initial positions must correspond to the partitions in pulsar_partitions . Valid values:POSITION_EARLIEST (Default value): Subscription starts from the earliest available message in the partition. POSITION_LATEST : Subscription starts from the latest available message in the partition.Note:If pulsar_partitions is not specified, the topic's all partitions are subscribed.If both pulsar_partitions and property.pulsar_default_initial_position are specified, the pulsar_partitions value overrides property.pulsar_default_initial_position value.If neither pulsar_partitions nor property.pulsar_default_initial_position is specified, subscription starts from the latest available message in the partition.Example:"pulsar_partitions" = "my-partition-0,my-partition-1,my-partition-2,my-partition-3", "pulsar_initial_positions" = "POSITION_EARLIEST,POSITION_EARLIEST,POSITION_LATEST,POSITION_LATEST" |
Routine Load supports the following custom parameters for Pulsar.
Parameter | Required | Description |
---|---|---|
property.pulsar_default_initial_position | No | The default initial positions when the topic's partitions are subscribed. The parameter takes effect when pulsar_initial_positions is not specified. Its valid values are the same as the valid values of pulsar_initial_positions .Example: "``property.pulsar_default_initial_position" = "POSITION_EARLIEST" |
property.auth.token | No | If Pulsar enables authenticating clients using security tokens, you need the token string to verify your identity.Example: "p``roperty.auth.token" = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD" |
Check a load job and task
Check a load job
Execute the SHOW ROUTINE LOAD statement to check the status of the load job routine_wiki_edit_1
. StarRocks returns the execution state State
, the statistical information (including the total rows consumed and the total rows loaded) Statistics
, and the progress of the load job progress
.
When you check a Routine Load job that consumes data from Pulsar, most returned parameters except for progress
are the same as consuming data from Kafka. progress
refers to backlog, that is the number of unacked messages in a partition.
MySQL [load_test] > SHOW ROUTINE LOAD for routine_wiki_edit_1 \G
*************************** 1. row ***************************
Id: 10142
Name: routine_wiki_edit_1
CreateTime: 2022-06-29 14:52:55
PauseTime: 2022-06-29 17:33:53
EndTime: NULL
DbName: default_cluster:test_pulsar
TableName: test1
State: PAUSED
DataSourceType: PULSAR
CurrentTaskNum: 0
JobProperties: {"partitions":"*","rowDelimiter":"'\n'","partial_update":"false","columnToColumnExpr":"*","maxBatchIntervalS":"10","whereExpr":"*","timezone":"Asia/Shanghai","format":"csv","columnSeparator":"','","json_root":"","strict_mode":"false","jsonpaths":"","desireTaskConcurrentNum":"3","maxErrorNum":"10","strip_outer_array":"false","currentTaskConcurrentNum":"0","maxBatchRows":"200000"}
DataSourceProperties: {"serviceUrl":"pulsar://localhost:6650","currentPulsarPartitions":"my-partition-0,my-partition-1","topic":"persistent://tenant/namespace/topic-name","subscription":"load-test"}
CustomProperties: {"auth.token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD"}
Statistic: {"receivedBytes":5480943882,"errorRows":0,"committedTaskNum":696,"loadedRows":66243440,"loadRowsRate":29000,"abortedTaskNum":0,"totalRows":66243440,"unselectedRows":0,"receivedBytesRate":2400000,"taskExecuteTimeMs":2283166}
Progress: {"my-partition-0(backlog): 100","my-partition-1(backlog): 0"}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
1 row in set (0.00 sec)
Check a load task
Execute the SHOW ROUTINE LOAD TASK statement to check the load tasks of the load job routine_wiki_edit_1
, such as how many tasks are running, the Kafka topic partitions that are consumed and the consumption progress DataSourceProperties
, and the corresponding Coordinator BE node BeId
.
MySQL [example_db]> SHOW ROUTINE LOAD TASK WHERE JobName = "routine_wiki_edit_1" \G
Alter a load job
Before altering a load job, you must pause it by using the PAUSE ROUTINE LOAD statement. Then you can execute the ALTER ROUTINE LOAD. After altering it, you can execute the RESUME ROUTINE LOAD statement to resume it, and check its status by using the SHOW ROUTINE LOAD statement.
When Routine Load is used to consume data from Pulsar, most returned parameters except for data_source_properties
are the same as consuming data from Kafka.
Take note of the following points:
- Among the
data_source_properties
related parameters, onlypulsar_partitions
,pulsar_initial_positions
, and custom Pulsar parametersproperty.pulsar_default_initial_position
andproperty.auth.token
are currently supported to be modified. The parameterspulsar_service_url
,pulsar_topic
, andpulsar_subscription
cannot be modified. - If you need to modify the partition to be consumed and the matched initilal position, you need to make sure that you specify the partition using
pulsar_partitions
when you create the Routine Load job, and only the intial positionpulsar_initial_positions
of the specified partition can be modified. - If you specify only Topic
pulsar_topic
when creating a Routine Load job, but not partitionspulsar_partitions
, you can modify the starting position of all partitions under topic viapulsar_default_initial_position
.