跳到主要内容
版本:Candidate-3.4

Java UDF

自 2.2.0 版本起,StarRocks 支持使用 Java 语言编写用户定义函数(User Defined Function,简称 UDF)。

自 3.0 版本起,StarRocks 支持 Global UDF,您只需要在相关的 SQL 语句(CREATE/SHOW/DROP)中加上 GLOBAL 关键字,该语句即可全局生效,无需逐个为每个数据库执行此语句。您可以根据业务场景开发自定义函数,扩展 StarRocks 的函数能力。

本文介绍如何编写和使用 UDF。

目前 StarRocks 支持的 UDF 包括用户自定义标量函数(Scalar UDF)、用户自定义聚合函数(User Defined Aggregation Function,UDAF)、用户自定义窗口函数(User Defined Window Function,UDWF)、用户自定义表格函数(User Defined Table Function,UDTF)。

前提条件

使用 StarRocks 的 Java UDF 功能前,您需要:

  • 安装 Apache Maven 以创建并编写相关 Java 项目。
  • 在服务器上安装 JDK 1.8。
  • 开启 UDF 功能。在 FE 配置文件 fe/conf/fe.conf 中设置配置项 enable_udftrue,并重启 FE 节点使配置项生效。详细操作以及配置项列表参考配置参数

开发并使用 UDF

您需要创建 Maven 项目并使用 Java 语言编写相应功能。

步骤一:创建 Maven 项目

  1. 创建 Maven 项目,项目的基本目录结构如下:

    project
    |--pom.xml
    |--src
    | |--main
    | | |--java
    | | |--resources
    | |--test
    |--target

步骤二:添加依赖

pom.xml 中添加如下依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>udf</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>

步骤三:开发 UDF

您需要使用 Java 语言开发相应 UDF。

开发 Scalar UDF

Scalar UDF,即用户自定义标量函数,可以对单行数据进行操作,输出单行结果。当您在查询时使用 Scalar UDF,每行数据最终都会按行出现在结果集中。典型的标量函数包括 UPPERLOWERROUNDABS

以下示例以提取 JSON 数据功能为例进行说明。例如,业务场景中,JSON 数据中某个字段的值可能是 JSON 字符串而不是 JSON 对象,因此在提取 JSON 字符串时,SQL 语句需要嵌套调用 GET_JSON_STRING,即 GET_JSON_STRING(GET_JSON_STRING('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key"), "$.k0")

为简化 SQL 语句,您可以开发一个 UDF,直接提取 JSON 字符串,例如:MY_UDF_JSON_GET('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key.k0")

package com.starrocks.udf.sample;

import com.alibaba.fastjson.JSONPath;

public class UDFJsonGet {
public final String evaluate(String obj, String key) {
if (obj == null || key == null) return null;
try {
// JSONPath 库可以全部展开,即使某个字段的值是 JSON 格式的字符串
return JSONPath.read(obj, key).toString();
} catch (Exception e) {
return null;
}
}
}

用户自定义类必须实现如下方法:

说明

方法中请求参数和返回参数的数据类型,需要和步骤六中的 CREATE FUNCTION 语句中声明的相同,且两者的类型映射关系需要符合类型映射关系

方法含义
TYPE1 evaluate(TYPE2, ...)evaluate 方法为 UDF 调用入口,必须是 public 成员方法。

开发 UDAF

UDAF,即用户自定义的聚合函数,对多行数据进行操作,输出单行结果。典型的聚合函数包括 SUMCOUNTMAXMIN,这些函数对于每个 GROUP BY 分组中多行数据进行聚合后,只输出一行结果。

以下示例以 MY_SUM_INT 函数为例进行说明。与内置函数 SUM(返回值为 BIGINT 类型)区别在于,MY_SUM_INT 函数支持传入参数和返回参数的类型为 INT。

package com.starrocks.udf.sample;

public class SumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
}

public State create() {
return new State();
}

public void destroy(State state) {
}

public final void update(State state, Integer val) {
if (val != null) {
state.counter+= val;
}
}

public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}

public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}

public Integer finalize(State state) {
return state.counter;
}
}

用户自定义类必须实现如下方法:

说明

方法中传入参数和返回参数的数据类型,需要和步骤六中的 CREATE FUNCTION 语句中声明的相同,且两者的类型映射关系需要符合类型映射关系

方法含义
State create()创建 State。
void destroy(State)销毁 State。
void update(State, ...)更新 State 。其中第一个参数是 State,其余的参数是函数声明的输入参数,可以为 1 个或多个。
void serialize(State, ByteBuffer)序列化 State。
void merge(State, ByteBuffer)合并 State 和反序列化 State。
TYPE finalize(State)通过 State 获取函数的最终结果。

并且,开发 UDAF 函数时,您需要使用缓冲区类 java.nio.ByteBuffer 和局部变量 serializeLength,用于保存和表示中间结果,指定中间结果的序列化长度。

类和局部变量说明
java.nio.ByteBuffer()缓冲区类,用于保存中间结果。 并且,由于中间结果在不同执行节点间传输时,会进行序列化和反序列化,因此还需要使用 serializeLength 指定中间结果序列化后的长度。
serializeLength()中间结果序列化后的长度,单位为 Byte。 serializeLength 的数据类型固定为 INT。 例如,示例中 State { int counter = 0; public int serializeLength() { return 4; }} 包含对中间结果序列化后的说明,即,中间结果的数据类型为 INT,序列化长度为 4 Byte。您也可以按照业务需求进行调整,例如中间结果序列化后的数据类型 LONG,序列化长度为 8 Byte,则需要传入 State { long counter = 0; public int serializeLength() { return 8; }}

注意

java.nio.ByteBuffer 序列化相关事项:

  • 不支持依赖 ByteBuffer 的 remaining() 方法来反序列化 State。
  • 不支持对 ByteBuffer 调用 clear()方法。
  • serializeLength 需要与实际写入数据的长度保持一致,否则序列化和反序列化过程中会造成结果错误。

开发 UDWF

UDWF,即用户自定义窗口函数。跟普通聚合函数不同的是,窗口函数针对一组行(一个窗口)计算值,并为每行返回一个结果。一般情况下,窗口函数包含 OVER 子句,将数据行拆分成多个分组,窗口函数基于每一行数据所在的组(一个窗口)进行计算,并为每行返回一个结果。

以下示例以 MY_WINDOW_SUM_INT 函数为例进行说明。与内置函数 SUM(返回类型为 BIGINT)区别在于,MY_WINDOW_SUM_INT 函数支持传入参数和返回参数的类型为 INT。

package com.starrocks.udf.sample;

public class WindowSumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
@Override
public String toString() {
return "State{" +
"counter=" + counter +
'}';
}
}

public State create() {
return new State();
}

public void destroy(State state) {

}

public void update(State state, Integer val) {
if (val != null) {
state.counter+=val;
}
}

public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}

public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}

public Integer finalize(State state) {
return state.counter;
}

public void reset(State state) {
state.counter = 0;
}

public void windowUpdate(State state,
int peer_group_start, int peer_group_end,
int frame_start, int frame_end,
Integer[] inputs) {
for (int i = (int)frame_start; i < (int)frame_end; ++i) {
state.counter += inputs[i];
}
}
}

用户自定义类必须实现 UDAF 所需要的方法(窗口函数是特殊聚合函数)、以及 windowUpdate() 方法。

说明 方法中请求参数和返回参数的数据类型,需要和步骤六中的 CREATE FUNCTION 语句中声明的相同,且两者的类型映射关系需要符合类型映射关系

需要额外实现的方法

void windowUpdate(State state, int, int, int , int, ...)

方法的含义

更新窗口数据。窗口函数的详细说明,请参见窗口函数。输入每一行数据,都会获取到对应窗口信息来更新中间结果。

  • peer_group_start:是当前分区开始的位置。
    分区:OVER子句中 PARTITION BY 指定分区列, 分区列的值相同的行被视为在同一个分区内。
  • peer_group_end:当前分区结束的位置。
  • frame_start:当前窗口框架(window frame)起始位置。
    窗口框架:window frame 子句指定了运算范围,以当前行为准,前后若干行作为窗口函数运算的对象。例如 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING,表示运算范围为当前行和它前后各一行数据。
  • frame_end:当前窗口框架(window frame)结束位置。
  • inputs:表示一个窗口中输入的数据,为包装类数组。包装类需要对应输入数据的类型,本示例中输入数据类型为 INT,因此包装类数组为 Integer[]。

开发 UDTF

UDTF,即用户自定义表值函数,读入一行数据,输出多个值可视为一张表。表值函数常用于实现行转列。

说明 目前 UDTF 只支持返回多行单列。

以下示例以 MY_UDF_SPLIT 函数为例进行说明。MY_UDF_SPLIT 函数支持分隔符为空格,传入参数和返回参数的类型为 STRING。

package com.starrocks.udf.sample;

public class UDFSplit{
public String[] process(String in) {
if (in == null) return null;
return in.split(" ");
}
}

用户自定义类必须实现如下方法:

说明 方法中请求参数和返回参数的数据类型,需要和步骤六中的 CREATE FUNCTION 语句中声明的相同,且两者的类型映射关系需要符合类型映射关系

方法含义
TYPE[] process()process() 方法为 UDTF 调用入口,需要返回数组。

步骤四:打包 Java 项目

通过以下命令打包 Java 项目。

mvn package

target 目录下会生成两个文件:udf-1.0-SNAPSHOT.jarudf-1.0-SNAPSHOT-jar-with-dependencies.jar

步骤五:上传项目

将文件 udf-1.0-SNAPSHOT-jar-with-dependencies.jar 上传至 FE 和 BE 能访问的 HTTP 服务器,并且 HTTP 服务需要一直开启。

mvn deploy 

您可以通过 Python 创建一个简易的 HTTP 服务器,并将文件上传至该服务器中。

说明 步骤六中, FE 会对 UDF 所在 Jar 包进行校验并计算校验值,BE 会下载 UDF 所在 Jar 包并执行。

步骤六:在 StarRocks 中创建 UDF

StarRocks 内提供了两种 Namespace 的 UDF:一种是 Database 级 Namespace,一种是 Global 级 Namespace。

  • 如果您没有特殊的 UDF 可见性隔离需求,您可以直接选择创建 Global UDF。在引用 Global UDF 时,直接调用Function Name 即可,无需任何 Catalog 和 Database 作为前缀,访问更加便捷。
  • 如果您有特殊的 UDF 可见性隔离需求,或者需要在不同 Database下创建同名 UDF,那么你可以选择在 Database 内创建 UDF。此时,如果您的会话在某个 Database 内,您可以直接调用 Function Name 即可;如果您的会话在其他 Catalog 和 Database 下,那么您需要带上 Catalog 和 Database 前缀,例如: catalog.database.function

注意

创建 Global UDF 需要有 System 级的 CREATE GLOBAL FUNCTION 权限;创建数据库级别的 UDF 需要有数据库级的 CREATE FUNCTION 权限;使用 UDF 需要有对应 UDF 的 USAGE 权限。关于如何赋权,参见 GRANT

JAR 包上传完成后,您需要在 StarRocks 中,按需创建相对应的 UDF。如果创建 Global UDF,只需要在 SQL 语句中带上 GLOBAL 关键字即可。

语法

CREATE [GLOBAL][AGGREGATE | TABLE] FUNCTION function_name(arg_type [, ...])
RETURNS return_type
[PROPERTIES ("key" = "value" [, ...]) ]

参数说明

参数必选说明
GLOBAL如需创建全局 UDF,需指定该关键字。从 3.0 版本开始支持。
AGGREGATE如要创建 UDAF 和 UDWF,需指定该关键字。
TABLE如要创建 UDTF,需指定该关键字。
function_name函数名,可以包含数据库名称,比如,db1.my_func。如果 function_name 中包含了数据库名称,那么该 UDF 会创建在对应的数据库中,否则该 UDF 会创建在当前数据库。新函数名和参数不能与目标数据库中已有的函数相同,否则会创建失败;如只有函数名相同,参数不同,则可以创建成功。
arg_type函数的参数类型。具体支持的数据类型,请参见类型映射关系
return_type函数的返回值类型。具体支持的数据类型,请参见类型映射关系
properties函数相关属性。创建不同类型的 UDF 需配置不同的属性,详情和示例请参考以下示例。

创建 Scalar UDF

执行如下命令,在 StarRocks 中创建先前示例中的 Scalar UDF。

CREATE [GLOBAL] FUNCTION MY_UDF_JSON_GET(string, string) 
RETURNS string
PROPERTIES (
"symbol" = "com.starrocks.udf.sample.UDFJsonGet",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);
参数描述
symbolUDF 所在项目的类名。格式为<package_name>.<class_name>
type用于标记所创建的 UDF 类型。取值为 StarrocksJar,表示基于 Java 的 UDF。
fileUDF 所在 Jar 包的 HTTP 路径。格式为http://<http_server_ip>:<http_server_port>/<jar_package_name>
isolation(可选)如需在 UDF 执行中共享函数实例并支持静态变量,请将其设置为 "shared"。

创建 UDAF

执行如下命令,在 StarRocks 中创建先前示例中的 UDAF。

CREATE [GLOBAL] AGGREGATE FUNCTION MY_SUM_INT(INT) 
RETURNS INT
PROPERTIES
(
"symbol" = "com.starrocks.udf.sample.SumInt",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

PROPERTIES 里的参数说明与 创建 Scalar UDF 相同。

创建 UDWF

执行如下命令,在 StarRocks 中创建先前示例中的 UDWF。

CREATE [GLOBAL] AGGREGATE FUNCTION MY_WINDOW_SUM_INT(Int)
RETURNS Int
PROPERTIES
(
"analytic" = "true",
"symbol" = "com.starrocks.udf.sample.WindowSumInt",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

analytic:所创建的函数是否为窗口函数,固定取值为 true。其他参数说明与 创建 Scalar UDF 相同。

创建 UDTF

执行如下命令,在 StarRocks 中创建先前示例中的 UDTF。

CREATE [GLOBAL] TABLE FUNCTION MY_UDF_SPLIT(string)
RETURNS string
PROPERTIES
(
"symbol" = "com.starrocks.udf.sample.UDFSplit",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

PROPERTIES 里的参数说明与 创建 Scalar UDF 相同。

步骤七:使用 UDF

创建完成后,您可以测试使用您开发的 UDF。

使用 Scalar UDF

执行如下命令,使用先前示例中的 Scalar UDF 函数。

SELECT MY_UDF_JSON_GET('{"key":"{\\"in\\":2}"}', '$.key.in');

使用 UDAF

执行如下命令,使用先前示例中的 UDAF。

SELECT MY_SUM_INT(col1);

使用 UDWF

执行如下命令,使用先前示例中的 UDWF。

SELECT MY_WINDOW_SUM_INT(intcol) 
OVER (PARTITION BY intcol2
ORDER BY intcol3
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM test_basic;

使用 UDTF

执行如下命令,使用先前示例中的 UDTF。

-- 假设存在表 t1,其列 a、b、c1 信息如下。
SELECT t1.a,t1.b,t1.c1 FROM t1;
> output:
1,2.1,"hello world"
2,2.2,"hello UDTF."

-- 使用 MY_UDF_SPLIT() 函数。
SELECT t1.a,t1.b, MY_UDF_SPLIT FROM t1, MY_UDF_SPLIT(t1.c1);
> output:
1,2.1,"hello"
1,2.1,"world"
2,2.2,"hello"
2,2.2,"UDTF."

说明

  • 第一个 MY_UDF_SPLIT 为调用 MY_UDF_SPLIT 后生成的列别名。
  • 暂不支持使用 AS t2(f1) 的方式指定表格函数返回表的表别名和列别名。

查看 UDF 信息

运行以下命令查看 UDF 信息。

SHOW [GLOBAL] FUNCTIONS;

更多信息,请参见SHOW FUNCTIONS

删除 UDF

运行以下命令删除指定的 UDF。

DROP [GLOBAL] FUNCTION <function_name>(arg_type [, ...]);

更多信息,请参见DROP FUNCTION

类型映射关系

SQL TYPEJava TYPE
BOOLEANjava.lang.Boolean
TINYINTjava.lang.Byte
SMALLINTjava.lang.Short
INTjava.lang.Integer
BIGINTjava.lang.Long
FLOATjava.lang.Float
DOUBLEjava.lang.Double
STRING/VARCHARjava.lang.String

参数配置

JVM 参数配置:在 be/conf/be.conf 中配置如下参数,可以控制 JVM 的内存使用。如果使用 JDK 8,请配置 JAVA_OPTS。如果 JDK 版本高于 JDK 8,则配置 JAVA_OPTS_FOR_JDK_9_AND_LATER

JAVA_OPTS="-Xmx12G"

JAVA_OPTS_FOR_JDK_9_AND_LATER="-Xmx12G"

FAQ

Q:开发 UDF 时是否可以使用静态变量?不同 UDF 间的静态变量间否会互相影响?

A:支持在开发 UDF 时使用静态变量,且不同 UDF 间(即使类同名),静态变量是互相隔离的,不会互相影响。