Skip to main content
Version: Candidate-3.4

Apache Iceberg Lakehouse

Overview

  • Deploy Object Storage, Apache Spark, Iceberg catalog, and StarRocks using Docker compose
  • Load New York City Green Taxi data for the month of May 2023 into the Iceberg data lake
  • Configure StarRocks to access the Iceberg catalog
  • Query the data with StarRocks where the data sits

DLA

In addition to efficient analytics of local data, StarRocks can work as the compute engine to analyze data stored in data lakes such as Apache Hudi, Apache Iceberg, and Delta Lake. One of the key features of StarRocks is its external catalog, which acts as the linkage to an externally maintained metastore. This functionality provides users with the capability to query external data sources seamlessly, eliminating the need for data migration. As such, users can analyze data from different systems such as HDFS and Amazon S3, in various file formats such as Parquet, ORC, and CSV, etc.

The preceding figure shows a data lake analytics scenario where StarRocks is responsible for data computing and analysis, and the data lake is responsible for data storage, organization, and maintenance. Data lakes allow users to store data in open storage formats and use flexible schemas to produce reports on "single source of truth" for various BI, AI, ad-hoc, and reporting use cases. StarRocks fully leverages the advantages of its vectorization engine and CBO, significantly improving the performance of data lake analytics.

Prerequisites

Docker

  • Docker
  • 5 GB RAM assigned to Docker
  • 20 GB free disk space assigned to Docker

SQL client

You can use the SQL client provided in the Docker environment, or use one on your system. Many MySQL compatible clients will work, and this guide covers the configuration of DBeaver and MySQL WorkBench.

curl

curl is used to download the datasets. Check to see if you have it installed by running curl or curl.exe at your OS prompt. If curl is not installed, get curl here.


StarRocks terminology

FE

Frontend nodes are responsible for metadata management, client connection management, query planning, and query scheduling. Each FE stores and maintains a complete copy of metadata in its memory, which guarantees indiscriminate services among the FEs.

BE

Backend (BE) nodes are responsible for both data storage and executing query plans in shared-nothing deployments. When an external catalog (like the Iceberg catalog used in this guide) is used, the BE nodes can cache the data from the external catalog to accelerate queries.


The environment

There are six containers (services) used in this guide, and all are deployed with Docker compose. The services and their responsibilities are:

ServiceResponsibilities
starrocks-feMetadata management, client connections, query plans and scheduling
starrocks-beRunning query plans
restProviding the Iceberg catalog (metadata service)
spark-icebergAn Apache Spark environment to run PySpark in
mcMinIO configuration (MinIO command line client)
minioMinIO Object Storage

Download the Docker configuration and NYC Green Taxi data

In order to provide an environment with the three necessary containers StarRocks provides a Docker compose file. Download the compose file and dataset with curl.

The Docker compose file:

mkdir iceberg
cd iceberg
curl -O https://raw.githubusercontent.com/StarRocks/demo/master/documentation-samples/iceberg/docker-compose.yml

And the dataset:

curl -O https://raw.githubusercontent.com/StarRocks/demo/master/documentation-samples/iceberg/datasets/green_tripdata_2023-05.parquet

Start the environment in Docker

tip

Run this command, and any other docker compose commands, from the directory containing the docker-compose.yml file.

docker compose up -d
[+] Building 0.0s (0/0)                     docker:desktop-linux
[+] Running 6/6
✔ Container iceberg-rest Started 0.0s
✔ Container minio Started 0.0s
✔ Container starrocks-fe Started 0.0s
✔ Container mc Started 0.0s
✔ Container spark-iceberg Started 0.0s
✔ Container starrocks-be Started

Check the status of the environment

Check the progress of the services. It should take around 30 seconds for the FE and BE to become healthy.

Run docker compose ps until the FE and BE show a status of healthy. The rest of the services do not have healthcheck configurations, but you will be interacting with them and will know whether or not they are working:

tip

If you have jq installed and prefer a shorter list from docker compose ps try:

docker compose ps --format json | jq '{Service: .Service, State: .State, Status: .Status}'
docker compose ps
SERVICE         CREATED         STATUS                   PORTS
rest 4 minutes ago Up 4 minutes 0.0.0.0:8181->8181/tcp
mc 4 minutes ago Up 4 minutes
minio 4 minutes ago Up 4 minutes 0.0.0.0:9000-9001->9000-9001/tcp
spark-iceberg 4 minutes ago Up 4 minutes 0.0.0.0:8080->8080/tcp, 0.0.0.0:8888->8888/tcp, 0.0.0.0:10000-10001->10000-10001/tcp
starrocks-be 4 minutes ago Up 4 minutes (healthy) 0.0.0.0:8040->8040/tcp
starrocks-fe 4 minutes ago Up 4 minutes (healthy) 0.0.0.0:8030->8030/tcp, 0.0.0.0:9020->9020/tcp, 0.0.0.0:9030->9030/tcp

PySpark

There are several ways to interact with Iceberg, this guide uses PySpark. If you are not familiar with PySpark there is documentation linked from the More Information section, but every command that you need to run is provided below.

Green Taxi dataset

Copy the data to the spark-iceberg container. This command will copy the dataset file to the /opt/spark/ directory in the spark-iceberg service:

docker compose \
cp green_tripdata_2023-05.parquet spark-iceberg:/opt/spark/

Launch PySpark

This command will connect to the spark-iceberg service and run the command pyspark:

docker compose exec -it spark-iceberg pyspark
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.5.0
/_/

Using Python version 3.9.18 (main, Nov 1 2023 11:04:44)
Spark context Web UI available at http://6ad5cb0e6335:4041
Spark context available as 'sc' (master = local[*], app id = local-1701967093057).
SparkSession available as 'spark'.
>>>

Read the dataset into a dataframe

A dataframe is park of Spark SQL, and provides a data structure similar to a database table or a spreadsheet.

The Green Taxi data is provided by NYC Taxi and Limousine Commission in Parquet format. Load the file from the /opt/spark directory and inspect the first few records by SELECTing the first few columns of the first three rows of data. These commands should be run in the pyspark session. The commands:

  • Read the dataset file from disk into a dataframe named df
  • Display the schema of the Parquet file
df = spark.read.parquet("/opt/spark/green_tripdata_2023-05.parquet")
df.printSchema()
root
|-- VendorID: integer (nullable = true)
|-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
|-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
|-- store_and_fwd_flag: string (nullable = true)
|-- RatecodeID: long (nullable = true)
|-- PULocationID: integer (nullable = true)
|-- DOLocationID: integer (nullable = true)
|-- passenger_count: long (nullable = true)
|-- trip_distance: double (nullable = true)
|-- fare_amount: double (nullable = true)
|-- extra: double (nullable = true)
|-- mta_tax: double (nullable = true)
|-- tip_amount: double (nullable = true)
|-- tolls_amount: double (nullable = true)
|-- ehail_fee: double (nullable = true)
|-- improvement_surcharge: double (nullable = true)
|-- total_amount: double (nullable = true)
|-- payment_type: long (nullable = true)
|-- trip_type: long (nullable = true)
|-- congestion_surcharge: double (nullable = true)

>>>

Examine the first few (seven) columns of the first few (three) rows of data:

df.select(df.columns[:7]).show(3)
+--------+--------------------+---------------------+------------------+----------+------------+------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|
+--------+--------------------+---------------------+------------------+----------+------------+------------+
| 2| 2023-05-01 00:52:10| 2023-05-01 01:05:26| N| 1| 244| 213|
| 2| 2023-05-01 00:29:49| 2023-05-01 00:50:11| N| 1| 33| 100|
| 2| 2023-05-01 00:25:19| 2023-05-01 00:32:12| N| 1| 244| 244|
+--------+--------------------+---------------------+------------------+----------+------------+------------+
only showing top 3 rows

Write to a table

The table created in this step will be in the catalog that will be made available in StarRocks in the next step.

  • Catalog: demo
  • Database: nyc
  • Table: greentaxis
df.writeTo("demo.nyc.greentaxis").create()

Configure StarRocks to access the Iceberg Catalog

You can exit from PySpark now, or you can open a new terminal to run the SQL commands. If you do open a new terminal change your directory into the quickstart directory that contains the docker-compose.yml file before continuing.

Connect to StarRocks with a SQL client

SQL Clients

These three clients are tested with this tutorial, you only need one:

  • mysql CLI: You can run this from the Docker environment or your machine.
  • DBeaver is available as a community version and a Pro version.
  • MySQL Workbench

Configuring the client

The easiest way to use the mysql CLI is to run it from the StarRocks container starrocks-fe:

docker compose exec starrocks-fe \
mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
tip

All docker compose commands must be run from the directory containing the docker-compose.yml file.

If you would like to install the mysql CLI expand mysql client install below:

mysql client install
  • macOS: If you use Homebrew and do not need MySQL Server run brew install mysql to install the CLI.
  • Linux: Check your repository system for the mysql client. For example, yum install mariadb.
  • Microsoft Windows: Install the MySQL Community Server and run the provided client, or run mysql from WSL.

You can now exit the PySpark session and connect to StarRocks.

tip

Run this command from the directory containing the docker-compose.yml file.

If you are using a client other than the mysql CLI, open that now.

docker compose exec starrocks-fe \
mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
StarRocks >

Create an external catalog

The external catalog is the configuration that allows StarRocks to operate on the Iceberg data as if it was in StarRocks databases and tables. The individual configuration properties will be detailed after the command.

CREATE EXTERNAL CATALOG 'iceberg'
COMMENT "External catalog to Apache Iceberg on MinIO"
PROPERTIES
(
"type"="iceberg",
"iceberg.catalog.type"="rest",
"iceberg.catalog.uri"="http://iceberg-rest:8181",
"iceberg.catalog.warehouse"="warehouse",
"aws.s3.access_key"="admin",
"aws.s3.secret_key"="password",
"aws.s3.endpoint"="http://minio:9000",
"aws.s3.enable_path_style_access"="true",
"client.factory"="com.starrocks.connector.iceberg.IcebergAwsClientFactory"
);

PROPERTIES

PropertyDescription
typeIn this example the type is iceberg. Other options include Hive, Hudi, Delta Lake, and JDBC.
iceberg.catalog.typeIn this example rest is used. Tabular provides the Docker image used and Tabular uses REST.
iceberg.catalog.uriThe REST server endpoint.
iceberg.catalog.warehouseThe identifier of the Iceberg catalog. In this case the warehouse name specified in the compose file is warehouse.
aws.s3.access_keyThe MinIO key. In this case the key and password are set in the compose file to admin
aws.s3.secret_keyand password.
aws.s3.endpointThe MinIO endpoint.
aws.s3.enable_path_style_accessWhen using MinIO for Object Storage this is required. MinIO expects this format http://host:port/<bucket_name>/<key_name>
client.factoryBy setting this property to use iceberg.IcebergAwsClientFactory the aws.s3.access_key and aws.s3.secret_key parameters are used for authentication.
SHOW CATALOGS;
+-----------------+----------+------------------------------------------------------------------+
| Catalog | Type | Comment |
+-----------------+----------+------------------------------------------------------------------+
| default_catalog | Internal | An internal catalog contains this cluster's self-managed tables. |
| iceberg | Iceberg | External catalog to Apache Iceberg on MinIO |
+-----------------+----------+------------------------------------------------------------------+
2 rows in set (0.03 sec)
SET CATALOG iceberg;
SHOW DATABASES;
tip

The database that you see was created in your PySpark session. When you added the CATALOG iceberg, the database nyc became visible in StarRocks.

+--------------------+
| Database |
+--------------------+
| information_schema |
| nyc |
+--------------------+
2 rows in set (0.07 sec)
USE nyc;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
SHOW TABLES;
+---------------+
| Tables_in_nyc |
+---------------+
| greentaxis |
+---------------+
1 rows in set (0.05 sec)
DESCRIBE greentaxis;
tip

Compare the schema that StarRocks uses with the output of df.printSchema() from the earlier PySpark session. The Spark timestamp_ntz data types are represented as StarRocks DATETIME etc.

+-----------------------+---------------------+------+-------+---------+-------+---------+
| Field | Type | Null | Key | Default | Extra | Comment |
+-----------------------+---------------------+------+-------+---------+-------+---------+
| VendorID | INT | Yes | false | NULL | | NULL |
| lpep_pickup_datetime | DATETIME | Yes | false | NULL | | NULL |
| lpep_dropoff_datetime | DATETIME | Yes | false | NULL | | NULL |
| store_and_fwd_flag | VARCHAR(1073741824) | Yes | false | NULL | | NULL |
| RatecodeID | BIGINT | Yes | false | NULL | | NULL |
| PULocationID | INT | Yes | false | NULL | | NULL |
| DOLocationID | INT | Yes | false | NULL | | NULL |
| passenger_count | BIGINT | Yes | false | NULL | | NULL |
| trip_distance | DOUBLE | Yes | false | NULL | | NULL |
| fare_amount | DOUBLE | Yes | false | NULL | | NULL |
| extra | DOUBLE | Yes | false | NULL | | NULL |
| mta_tax | DOUBLE | Yes | false | NULL | | NULL |
| tip_amount | DOUBLE | Yes | false | NULL | | NULL |
| tolls_amount | DOUBLE | Yes | false | NULL | | NULL |
| ehail_fee | DOUBLE | Yes | false | NULL | | NULL |
| improvement_surcharge | DOUBLE | Yes | false | NULL | | NULL |
| total_amount | DOUBLE | Yes | false | NULL | | NULL |
| payment_type | BIGINT | Yes | false | NULL | | NULL |
| trip_type | BIGINT | Yes | false | NULL | | NULL |
| congestion_surcharge | DOUBLE | Yes | false | NULL | | NULL |
+-----------------------+---------------------+------+-------+---------+-------+---------+
20 rows in set (0.03 sec)
tip

Some of the SQL queries in the StarRocks documentation end with \G instead of a semicolon. The \G causes the mysql CLI to render the query results vertically.

Many SQL clients do not interpret vertical formatting output, so you should replace \G with ; if you are not using the mysql CLI.

Query with StarRocks

Verify pickup datetime format

SELECT lpep_pickup_datetime FROM greentaxis LIMIT 10;
+----------------------+
| lpep_pickup_datetime |
+----------------------+
| 2023-05-01 00:52:10 |
| 2023-05-01 00:29:49 |
| 2023-05-01 00:25:19 |
| 2023-05-01 00:07:06 |
| 2023-05-01 00:43:31 |
| 2023-05-01 00:51:54 |
| 2023-05-01 00:27:46 |
| 2023-05-01 00:27:14 |
| 2023-05-01 00:24:14 |
| 2023-05-01 00:46:55 |
+----------------------+
10 rows in set (0.07 sec)

Find the busy hours

This query aggregates the trips on the hour of the day and shows that the busiest hour of the day is 18:00.

SELECT COUNT(*) AS trips,
hour(lpep_pickup_datetime) AS hour_of_day
FROM greentaxis
GROUP BY hour_of_day
ORDER BY trips DESC;
+-------+-------------+
| trips | hour_of_day |
+-------+-------------+
| 5381 | 18 |
| 5253 | 17 |
| 5091 | 16 |
| 4736 | 15 |
| 4393 | 14 |
| 4275 | 19 |
| 3893 | 12 |
| 3816 | 11 |
| 3685 | 13 |
| 3616 | 9 |
| 3530 | 10 |
| 3361 | 20 |
| 3315 | 8 |
| 2917 | 21 |
| 2680 | 7 |
| 2322 | 22 |
| 1735 | 23 |
| 1202 | 6 |
| 1189 | 0 |
| 806 | 1 |
| 606 | 2 |
| 513 | 3 |
| 451 | 5 |
| 408 | 4 |
+-------+-------------+
24 rows in set (0.08 sec)

Summary

This tutorial exposed you to the use of a StarRocks external catalog to show you that you can query your data where it sits using the Iceberg REST catalog. Many other integrations are available using Hive, Hudi, Delta Lake, and JDBC catalogs.

In this tutorial you:

  • Deployed StarRocks and an Iceberg/PySpark/MinIO environment in Docker
  • Configured a StarRocks external catalog to provide access to the Iceberg catalog
  • Loaded taxi data provided by New York City into the Iceberg data lake
  • Queried the data with SQL in StarRocks without copying the data from the data lake

More information

StarRocks Catalogs

Apache Iceberg documentation and Quickstart (includes PySpark)

The Green Taxi Trip Records dataset is provided by New York City subject to these terms of use and privacy policy.