编辑

Spark StarRocks Connector

Spark StarRocks Connector 可以支持通过 Spark 读取 StarRocks 中存储的数据。

  • 当前版本只支持从StarRocks中读取数据。
  • 可以将StarRocks表映射为DataFrame或者RDD,推荐使用DataFrame
  • 支持在StarRocks端完成数据过滤,减少数据传输量。

版本要求

ConnectorSparkStarRocksJavaScala
1.0.02.x1.18+82.11
1.0.03.x1.18+82.12

编译部署

Spark StarRocks Connector

使用示例

Spark Demo

SQL

CREATE TEMPORARY VIEW spark_starrocks
USING starrocks
OPTIONS(
  "table.identifier" = "$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME",
  "fenodes" = "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESTFUL_PORT",
  "user" = "$YOUR_STARROCKS_USERNAME",
  "password" = "$YOUR_STARROCKS_PASSWORD"
);

SELECT * FROM spark_starrocks;

DataFrame

val starrocksSparkDF = spark.read.format("starrocks")
  .option("starrocks.table.identifier", "$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME")
  .option("starrocks.fenodes", "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESTFUL_PORT")
  .option("user", "$YOUR_STARROCKS_USERNAME")
  .option("password", "$YOUR_STARROCKS_PASSWORD")
  .load()

starrocksSparkDF.show(5)

RDD

import com.starrocks.connector.spark._
val starrocksSparkRDD = sc.starrocksRDD(
  tableIdentifier = Some("$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME"),
  cfg = Some(Map(
    "starrocks.fenodes" -> "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESTFUL_PORT",
    "starrocks.request.auth.user" -> "$YOUR_STARROCKS_USERNAME",
    "starrocks.request.auth.password" -> "$YOUR_STARROCKS_PASSWORD"
  ))
)

starrocksSparkRDD.collect()

配置

通用配置项

KeyDefault ValueComment
starrocks.fenodes--StarRocks FE http 地址,支持多个地址,使用逗号分隔
starrocks.table.identifier--StarRocks 表名,如:db1.tbl1
starrocks.request.retries3向StarRocks发送请求的重试次数
starrocks.request.connect.timeout.ms30000向StarRocks发送请求的连接超时时间
starrocks.request.read.timeout.ms30000向StarRocks发送请求的读取超时时间
starrocks.request.query.timeout.s3600查询StarRocks的超时时间,默认值为1小时,-1表示无超时限制
starrocks.request.tablet.sizeInteger.MAX_VALUE一个RDD Partition对应的StarRocks Tablet个数。此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对StarRocks造成更大的压力。
starrocks.batch.size1024一次从BE读取数据的最大行数。增大此数值可减少Spark与StarRocks之间建立连接的次数。从而减轻网络延迟所带来的的额外时间开销。
starrocks.exec.mem.limit2147483648单个查询的内存限制。默认为 2GB,单位为字节
starrocks.deserialize.arrow.asyncfalse是否支持异步转换Arrow格式到spark-starrocks-connector迭代所需的RowBatch
starrocks.deserialize.queue.size64异步转换Arrow格式的内部处理队列,当starrocks.deserialize.arrow.async为true时生效
starrocks.filter.query--过滤读取数据的表达式,此表达式透传给 StarRocks。StarRocks 使用此表达式完成源端数据过滤。

SQL 和 Dataframe 专有配置

KeyDefault ValueComment
user--访问StarRocks的用户名
password--访问StarRocks的密码
starrocks.filter.query.in.max.count100谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。

RDD 专有配置

KeyDefault ValueComment
starrocks.request.auth.user--访问StarRocks的用户名
starrocks.request.auth.password--访问StarRocks的密码
starrocks.read.field--读取StarRocks表的列名列表,多列之间使用逗号分隔

StarRocks 和 Spark 列类型映射关系

StarRocks TypeSpark Type
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
LARGEINTDataTypes.StringType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DECIMALDecimalType
DATEDataTypes.StringType
DATETIMEDataTypes.StringType
CHARDataTypes.StringType
VARCHARDataTypes.StringType
ARRAYUnsupported datatype
HLLUnsupported datatype
BITMAPUnsupported datatype
  • 注:Connector中,将DATEDATETIME映射为String。由于StarRocks底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用 String 类型直接返回对应的时间可读文本。