Java UDF
自 2.2.0 版本起,StarRocks 支持使用 Java 语言编写用户定义函数(User Defined Function,简称 UDF)。
自 3.0 版本起,StarRocks 支持 Global UDF,您只需要在相关的 SQL 语句(CREATE/SHOW/DROP)中加上 GLOBAL
关键字,该语句即可全局生效,无需逐个为每个数据库执行此语句。您可以根据业务场景开发自定义函数,扩展 StarRocks 的函数能力。
本文介绍如何编写和使用 UDF。
目前 StarRocks 支持的 UDF 包括用户自定义标量函数(Scalar UDF)、用户自定义聚合函数(User Defined Aggregation Function,UDAF)、用户自定义窗口函数(User Defined Window Function,UDWF)、用户自定义表格函数(User Defined Table Function,UDTF)。
前提条件
使用 StarRocks 的 Java UDF 功能前,您需要:
- 安装 Apache Maven 以创建并编写相关 Java 项目。
- 在服务器上安装 JDK 1.8。
- 开启 UDF 功能。在 FE 配置文件 fe/conf/fe.conf 中设置配置项
enable_udf
为true
,并重启 FE 节点使配置项生效。详细操作以及配置项列表参考配置参数。
开发并使用 UDF
您需要创建 Maven 项目并使用 Java 语言编写相应功能。
步骤一:创建 Maven 项目
-
创建 Maven 项目,项目的基本目录结构如下:
project
|--pom.xml
|--src
| |--main
| | |--java
| | |--resources
| |--test
|--target
步骤二:添加依赖
在 pom.xml 中添加如下依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>udf</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
步骤三:开发 UDF
您需要使用 Java 语言开发相应 UDF。
开发 Scalar UDF
Scalar UDF,即用户自定义标量函数,可以对单行数据进行操作,输出单行结果。当您在查询时使用 Scalar UDF,每行数据最终都会按行出现在结果集中。典型的标量函数包括 UPPER
、LOWER
、ROUND
、ABS
。
以下示例以提取 JSON 数据功能为例进行说明。例如,业务场景中,JSON 数据中某个字段的值可能是 JSON 字符串而不是 JSON 对象,因此在提取 JSON 字符串时,SQL 语句需要嵌套调用 GET_JSON_STRING
,即 GET_JSON_STRING(GET_JSON_STRING('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key"), "$.k0")
。
为简化 SQL 语句,您可以开发一个 UDF,直接提取 JSON 字符串,例如:MY_UDF_JSON_GET('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key.k0")
。
package com.starrocks.udf.sample;
import com.alibaba.fastjson.JSONPath;
public class UDFJsonGet {
public final String evaluate(String obj, String key) {
if (obj == null || key == null) return null;
try {
// JSONPath 库可以全部展开,即使某个字段的值是 JSON 格式的字符串
return JSONPath.read(obj, key).toString();
} catch (Exception e) {
return null;
}
}
}
用户自定义类必须实现如下方法:
说明
方法中请求参数和返回参数的数据类型,需要和步骤六中的
CREATE FUNCTION
语句中声明的相同,且两者的类型映射关系需要符合类型映射关系。
方法 | 含义 |
---|---|
TYPE1 evaluate(TYPE2, ...) | evaluate 方法为 UDF 调用入口,必须是 public 成员方法。 |
开发 UDAF
UDAF,即用户自定义的聚合函数,对多行数据进行操作,输出单行结果。典型的聚合函数包括 SUM
、COUNT
、MAX
、MIN
,这些函数对于每个 GROUP BY 分组中多行数据进行聚合后,只输出一行结果。
以下示例以 MY_SUM_INT
函数为例进行说明。与内置函数 SUM
(返回值为 BIGINT 类型)区别在于,MY_SUM_INT
函数支持传入参数和返回参数的类型为 INT。
package com.starrocks.udf.sample;
public class SumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
}
public State create() {
return new State();
}
public void destroy(State state) {
}
public final void update(State state, Integer val) {
if (val != null) {
state.counter+= val;
}
}
public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}
public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}
public Integer finalize(State state) {
return state.counter;
}
}
用户自定义类必须实现如下方法:
说明
方法中传入参数和返回参数的数据类型,需要和步骤六中的
CREATE FUNCTION
语句中声明的相同,且两者的类型映射关系需要符合类型映射关系。
方法 | 含义 |
---|---|
State create() | 创建 State。 |
void destroy(State) | 销毁 State。 |
void update(State, ...) | 更新 State 。其中第一个参数是 State,其余的参数是函数声明的输入参数,可以为 1 个或多个。 |
void serialize(State, ByteBuffer) | 序列化 State。 |
void merge(State, ByteBuffer) | 合并 State 和反序列化 State。 |
TYPE finalize(State) | 通过 State 获取函数的最终结果。 |
并且,开发 UDAF 函数时,您需要使用缓冲区类 java.nio.ByteBuffer
和局部变量 serializeLength
,用于保存和表示中间结果,指定中间结果的序列化长度。
类和局部变量 | 说明 |
---|---|
java.nio.ByteBuffer() | 缓冲区类,用于保存中间结果。 并且,由于中间结果在不同执行节点间传输时,会进行序列化和反序列化,因此还需要使用 serializeLength 指定中间结果序列化后的长度。 |
serializeLength() | 中间结果序列化后的长度,单位为 Byte。 serializeLength 的数据类型固定为 INT。 例如,示例中 State { int counter = 0; public int serializeLength() { return 4; }} 包含对中间结果序列化后的说明,即,中间结果的数据类型为 INT,序列化长度为 4 Byte。您也可以按照业务需求进行调整,例如中间结果序列化后的数据类型 LONG,序列化长度为 8 Byte,则需要传入 State { long counter = 0; public int serializeLength() { return 8; }} 。 |
注意
java.nio.ByteBuffer
序列化相关事项:
- 不支持依赖 ByteBuffer 的 remaining() 方法来反序列化 State。
- 不支持对 ByteBuffer 调用 clear()方法。
serializeLength
需要与实际写入数据的长度保持一致,否则序列化和反序列化过程中会造成结果错误。
开发 UDWF
UDWF,即用户自定义窗口函数。跟普通聚合函数不同的是,窗口函数针对一组行(一个窗口)计算值,并为每行返回一个结果。一般情况下,窗口函数包含 OVER
子句,将数据行拆分成多个分组,窗口函数基于每一行数据所在的组(一个窗口)进行计算,并为每行返回一个结果。
以下示例以 MY_WINDOW_SUM_INT
函数为例进行说明。与内置函数 SUM
(返回类型为 BIGINT)区别在于,MY_WINDOW_SUM_INT
函数支持传入参数和返回参数的类型为 INT。
package com.starrocks.udf.sample;
public class WindowSumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
@Override
public String toString() {
return "State{" +
"counter=" + counter +
'}';
}
}
public State create() {
return new State();
}
public void destroy(State state) {
}
public void update(State state, Integer val) {
if (val != null) {
state.counter+=val;
}
}
public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}
public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}
public Integer finalize(State state) {
return state.counter;
}
public void reset(State state) {
state.counter = 0;
}
public void windowUpdate(State state,
int peer_group_start, int peer_group_end,
int frame_start, int frame_end,
Integer[] inputs) {
for (int i = (int)frame_start; i < (int)frame_end; ++i) {
state.counter += inputs[i];
}
}
}
用户自定义类必须实现 UDAF 所需要的方法(窗口函数是特殊聚合函数)、以及 windowUpdate() 方法。
说明 方法中请求参数和返回参数的数据类型,需要和步骤六中的
CREATE FUNCTION
语句中声明的相同,且两者的类型映射关系需要符合类型映射关系。
需要额外实现的方法
void windowUpdate(State state, int, int, int , int, ...)
方法的含义
更新窗口数据。窗口函数的详细说明,请参见窗口函数。输入每一行数据,都会获取到对应窗口信息来更新中间结果。
- peer_group_start:是当前分区开始的位置。
分区:OVER子句中 PARTITION BY 指定分区列, 分区列的值相同的行被视为在同一个分区内。 - peer_group_end:当前分区结束的位置。
- frame_start:当前窗口框架(window frame)起始位置。
窗口框架:window frame 子句指定了运算范围,以当前行为准,前后若干行作为窗口函数运算的对象。例如 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING,表示运算范围为当前行和它前后各一行数据。 - frame_end:当前窗口框架(window frame)结束位置。
- inputs:表示一个窗口中输入的数据,为包装类数组。包装类需要对应输入数据的类型,本示例中输入数据类型为 INT,因此包装类数组为 Integer[]。
开发 UDTF
UDTF,即用户自定义表值函数,读入一行数据,输出多个值可视为一张表。表值函数常用于实现行转列。
说明 目前 UDTF 只支持返回多行单列。
以下示例以 MY_UDF_SPLIT
函数为例进行说明。MY_UDF_SPLIT
函数支持分隔符为空格,传入参数和返回参数的类型为 STRING。
package com.starrocks.udf.sample;
public class UDFSplit{
public String[] process(String in) {
if (in == null) return null;
return in.split(" ");
}
}
用户自定义类必须实现如下方法: