• Byzer 内置常用 UDF
  • Byzer 支持使用其他语言动态扩展 UDF
    • 支持语言:Scala/Python/Java 的自定义 UDF,
    • 动态扩展:无需打包重启应用,只需要在上下文中使用 Byzer 语法注册 UDF,即可使用

当然,我们也支持在启动时注册自定义 UDF 到 Byzer 中。

内置常见 UDF

Byzer 内置了很多功能强大、开箱即用的 UDF,如 http 请求、数据类型转换 UDF 等。

http 请求

http 请求可以让 Byzer 脚本变得更加强大,因为这可以集合所有内部或者外部 API 来完成某项工作。

Byzer 提供了一系列功能较为全面的 http 请求函数。

crawler_request

crawler_request(url) - 通过 get 方法请求 url,返回请求到的 html 网页

例子

> SELECT crawler_request("https://www.csdn.com") AS h AS html;
<!doctype html> <html lang="zh" data-server-rendered="true"> <head> <title>CSDN - 专业开发者社区</title> ...

crawler_http

crawler_http(url, method, map("k1","v1","k2","v2")) - 请求 url,返回请求到的 html 网页

参数

  • method - 支持 POST/GET
  • map - key/value 格式的参数。方法为 POST 时,参数使用 URLEncode

例子

> SELECT crawler_request_image("https://www.csdn.com", "get", map()) AS h AS html;
<!doctype html> <html lang="zh" data-server-rendered="true"> <head> <title>CSDN - 专业开发者社区</title> ...

crawler_auto_extract_body

crawler_auto_extract_body(html) - 抽取 html 网页中的内容

例子

> SELECT crawler_request("https://www.csdn.com") AS h AS html;
> SELECT crawler_auto_extract_body(h) AS b FROM html AS body;
专家推荐疯狂试探mysql单表insert极限:已实现每秒插入8.5w条数据 一个demo让你将多线程运用到实际项目 ...

crawler_auto_extract_title

crawler_auto_extract_title(html) - 抽取 html 网页中的标题

例子

> SELECT crawler_request("https://www.csdn.com") AS h AS html;
> SELECT crawler_auto_extract_title(h) AS t FROM html AS title;
CSDN - 专业开发者社区

crawler_request_image

crawler_request_image(url) - 通过 get 方法请求 url,获取 base64 编码格式的图片

例子

> SELECT crawler_request_image("https://pic4.zhimg.com/v2-1d0e51461a3eb098ac84ab0f6d3ce99c_xl.jpg") AS html AS content


crawler_extract_xpath

crawler_md5(html, xpath) - 返回 xpath 路径表达式下的 html 的节点信息

例子

> SELECT crawler_request("https://www.csdn.com") AS h AS html; 
> SELECT crawler_extract_xpath(h, "/html/head/title") AS t FROM html as x_title;
<title>CSDN - 专业开发者社区</title>

crawler_md5

crawler_md5(str) - 返回 str 的 md5 信息摘要

例子

> SELECT crawler_request("https://www.csdn.com") AS h AS html; 
> SELECT crawler_md5(h) AS m FROM HTML AS md5;
6dd43840dc3389a9639e7e6449a80f4f

 

常用函数

array_concat

array_concat(array(a1, a2, ..., an)) - 多个字符串数组拼接成一个数组,并且展开

例子

> SELECT array_number_concat(array(array("a","b"), array("c","d")) AS arr;
[ "a", "b", "c", "d" ]

array_intersect

array_intersect(a1, a2) - 返回数组的交集

例子

> SELECT array_intersect(array("a","b","c"),array("a","d","e")) AS ai;

array_index

array_index(array, element) - 返回数组中元素的下标

例子

> SELECT array_index(array("a","b","c","d","e"),"b") AS index;
1

array_number_concat

array_number_concat(array(a1, a2, ..., an)) - 多个数字数组拼接成一个数组,并且展开

例子

> SELECT array_number_concat(array(array(1,2), array(3,4)) AS arr;
[ 1, 2, 3, 4 ]

array_number_to_string

array_number_to_string(array) - 将数组内的元素类型转换为 string

例子

> SELECT array_number_to_string(array(1,2,3,4)) AS arr;
[ "1", "2", "3", "4" ]

array_onehot

array_onehot(array, colNums) - 返回 matrix 结构的 one hot 编码,编码是按列存储的

参数

  • array:希望进行 one hot 编码的分类值,必须是 int 类型。另外,分类值的数量即为矩阵的行数
  • colNums:分类的最大数量,即是矩阵列的数量

例子

> SELECT array_onehot(array(1,2,3),4) AS ma1;
{ "type": 1, "numRows": 3, "numCols": 4, "values": [ 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1 ], "isTransposed": false }

> SELECT array_onehot(array(1,4),12) AS ma2;
 "type": 1, "numRows": 2, "numCols": 12, "values": [ 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 ], "isTransposed": false }

array_slice

array_slice(array, from, to) - 返回数组中,从下标 from 到下标 to 的子数组。当 to = -1 时,取到数组结尾

例子

> SELECT array_slice(array("a","b","c","d","e"),3,-1) AS sub;
[ "d", "e" ]

array_string_to_double

array_number_to_string(array) - 将数组内的元素类型转换为 double

例子

> SELECT array_string_to_double(array("1.1","2.2","3.3","4.4")) AS arr;
[ 1.1, 2.2, 3.3, 4.4 ]

array_string_to_float

array_string_to_float(array) - 将数组内的元素类型转换为 float

例子

> SELECT array_string_to_float(array("1.1","2.2","3.3","4.4")) AS arr;
[ 1.1, 2.2, 3.3, 4.4 ]

array_string_to_int

array_string_to_int(array) - 将数组内的元素类型转换为 int

例子

> SELECT array_string_to_int(array("1","2","3","4")) AS arr;
[ 1, 2, 3, 4 ]

matrix_array

matrix_array(matrix) - 将矩阵转为二维数组

例子

> SELECT matrix_array(array_onehot(array(1,2),4)) AS ma;
[ [ 0, 1, 0, 0 ], [ 0, 0, 1, 0 ] ]

matrix_dense

matrix_dense(array(a1, a2, ..., an)) - 生成一个紧凑矩阵

例子

> SELECT matrix_dense(array(array(1.0, 2.0, 3.0), array(2.0, 3.0, 4.0))) AS md;
{ "type": 1, "numRows": 2, "numCols": 3, "values": [ 1, 2, 2, 3, 3, 4 ], "isTransposed": false }

matrix_sum

matrix_sum(matrix) - 对数组的列值进行求和

例子

> SELECT matrix_sum(matrix_dense(array(array(1.0, 2.0, 3.0), array(2.0, 3.0, 4.0))), 0) AS ms;
{ "type": 1, "values": [ 3, 5, 7 ] }

vec_argmax

vec_argmax(vector) - 找到向量里面最大值所在的位置(下标从 0 开始)

例子

> SELECT vec_argmax(vec_dense(array(1.0,2.0,7.0))) AS index;
2

vec_dense

vec(array) - 生成一个紧凑向量

例子

> SELECT vec_dense(array(1.0,2.0,7.0)) as vec;
{ "type": 1, "values": [ 1, 2, 7 ] }

vec_sparse

vec_sparse(size , map(k1,v1,k2,v2)) - 生成一个稀疏向量

参数

  • size:向量的长度
  • map:稀疏向量的下标以及数值,注意下标从 0 开始

例子

> SELECT vec_sparse(3, map(1,2,2,4)) AS vs;
{ "type": 0, "size": 3, "indices": [ 1, 2 ], "values": [ 2, 4 ] }

vec_concat

vec_concat(array(v1,v2, ..., vn)) - 拼接多个向量成为一个向量

例子

> SELECT vec_concat(array(vec_dense(array(1.0,2.0)),vec_dense(array(3.0,4.0)))) AS vc;
{ "type": 1, "values": [ 1, 2, 3, 4 ] }

vec_cosine

vec_cosine(v1, v2) - 计算 consine 向量夹角

例子

> SELECT vec_cosine(vec_dense(array(1.0,2.0)),vec_dense(array(1.0,1.0))) AS vc;
0.9486832980505138

vec_slice

vec_slice(vector, indices) - 根据下标获取子 vector

例子

> SELECT vec_slice(vec_dense(array(1.0,2.0,3.0,4.0)),array(0,1,2)) AS vs;
{ "type": 1, "values": [ 1, 2, 3 ] }

vec_array

vec_array(vector) - 将向量转化为数组

例子

> SELECT vec_array(vec_dense(array(1.0,2.0))) as va;
[ 1, 2 ]

vec_mk_string

vec_mk_string(splitter, vector) - 使用 splitter 拼接向量,并返回字符串

例子

> SELECT vec_mk_string("*",vec_dense(array(1.0,2.0))) AS vms;
1.0*2.0

vec_wise_mul

vec_wise_mul(v1, v2) - 计算向量 v1, v2 对应矢量值的乘积,返回结果向量

例子

> SELECT vec_dense(array(2.5,2.0,1.0)) AS v1, vec_dense(array(3.0,2.0,1.0)) AS v2 AS data1;
> SELECT vec_wise_mul(v1, v2) AS vwm FROM data1 AS data2;
{ "type": 1, "values": [ 7.5, 4, 1 ] }

vec_wise_add

vec_wise_add(v1, v2) - 计算向量 v1, v2 对应矢量值的和,返回结果向量

例子

> SELECT vec_dense(array(2.5,2.0,1.0)) AS v1, vec_dense(array(3.0,2.0,1.0)) AS v2 AS data1;
> SELECT vec_wise_add(v1, v2) AS vwm FROM data1 AS data2;
{ "type": 1, "values": [ 5.5, 4, 2 ] }

vec_wise_dif

vec_wise_dif(v1, v2) - 计算向量 v1, v2 对应矢量值的差,返回结果向量

例子

> SELECT vec_dense(array(2.5,3.0,1.0)) AS v1, vec_dense(array(3.0,2.0,1.0)) AS v2 AS data1;
> SELECT vec_wise_dif(v1, v2) AS vwm FROM data1 AS data2;
{ "type": 1, "values": [ -0.5, 1, 0 ] }

vec_wise_mod

vec_wise_mod(v1, v2) - 向量 v1 的矢量值对 v2 的矢量值取模

例子

> SELECT vec_dense(array(11,7,3)) AS v1, vec_dense(array(2,3,4)) AS v2 AS data1;
> SELECT vec_wise_mod(v1, v2) AS vwm FROM data1 AS data2;
{ "type": 1, "values": [ 1, 1, 3 ] }

vec_inplace_add

vec_inplace_add(vector, addend) - vector 每个矢量值加上 addend,返回结果向量

例子

> SELECT vec_dense(array(2.5, 2.0, 1.0)) AS vd AS data1;
> SELECT vec_inplace_add(vd, 4.4) AS via FROM data1 AS data2;
{ "type": 1, "values": [ 6.9, 6.4, 5.4 ] }

vec_inplace_ew_mul

vec_inplace_ew_mul(vector, multiplier) - vector 每个矢量值乘 multiplier,返回结果向量

例子

> SELECT vec_dense(array(2.5, 2.0, 1.0)) AS vd AS data1;
> SELECT vec_inplace_ew_mul(vd, 4.4) AS niem FROM data1 AS data2;
{ "type": 1, "values": [ 11, 8.8, 4.4 ] }

vec_ceil

vec_ceil(vector) - 将 vector 矢量值向上取整

例子

> SELECT vec_dense(array(2.5, 2.4, 1.6)) AS vd AS data1;
> SELECT vec_ceil(vd) AS vc FROM data1 AS data2;
{ "type": 1, "values": [ 3, 3, 2 ] }

vec_floor

vec_floor(vector) - 将 vector 矢量值向上取整

例子

> SELECT vec_dense(array(2.5, 2.4, 1.6)) AS vd AS data1;
> SELECT vec_floor(vd) AS vc FROM data1 AS data2;
{ "type": 1, "values": [ 2, 2, 1 ] }

vec_mean

vec_mean(vector) - 获取向量矢量值的平均值

例子

> SELECT vec_mean(vec_dense(array(1.0,2.0,7.0,2.0))) AS vm;
3

vec_stddev

vec_stddev(vector) - 获取向量标准差

例子

> SELECT vec_stddev(vec_dense(array(3.0, 4.0, 5.0))) AS vs;
1

ngram

ngram(array, size) - 以 size 为窗口大小,返回滑动窗口的序列

例子

> SELECT ngram(array("a","b","c","d","e"),3) AS ngr;
[ "a b c", "b c d", "c d e" ]

keepChinese

keepChinese(str, keepPunctuation, include) - 对文本字段做处理,只保留中文字符

参数

  • str:待处理字符串
  • keepPunctuation:是否保留标点符号 true/false
  • include:指定保留字符,保留字符会出现在结果集中

例子

> SET query = "你◣◢︼【】┅┇☽☾✚〓▂▃▄▅▆▇█▉▊▋▌▍▎▏↔↕☽☾の·▸◂▴▾┈┊好◣◢︼【】┅┇☽☾✚〓▂▃▄▅▆▇█▉▊▋▌▍▎▏↔↕☽☾の·▸◂▴▾┈┊啊,..。,!?katty";
> SELECT keepChinese("${query}",false,array()) AS ch;
结果: 你好啊

sleep

sleep() - 休眠函数,单位为ms,无返回

例子

> SELETC sleep(1000) AS s1;

uuid

uuid() - 返回一个唯一的字符串,去掉了"-"

例子

> SELECT uuid() AS u1;
b4fd697ce5dc4ff48694a5e2e1804d81

 

动态创建 UDF/UDAF

Byzer 支持使用 Python、Java、Scala 编写UDF/UDAF。 无需打包或重启,只需运行注册 UDF 的 Byzer 代码,就可以即时生效。 极大的方便用户扩展 Byzer 的功能。

UDF注册

Byzer 提供 register 语法注册 UDF。你可以用以下两种方式使用它。

方法一

先将脚本注册为虚拟表,再将表注册为UDF。

下面是一个使用 scala 语言编写 UDF 并注册的例子:

-- script
> SET plusFun='''
def apply(a:Double,b:Double)={
   a + b
}
''';

-- register as a table
> LOAD script.`plusFun` AS scriptTable;

-- register as UDF
> REGISTER ScriptUDF.`scriptTable` AS plusFun OPTIONS lang = "scala";

方法二

Byzer 支持在一个语句中完成 UDF 的注册的所有步骤。

在这种方式中,我们必须手动指定脚本的编写语言,以及 UDF 的种类。文末有我们支持的语言以及 UDF 列表。

下面是一个使用 scala 语言编写并注册的例子。

> REGISTER ScriptUDF.`` AS plusFun WHERE
and lang="scala"
and udfType="udf"
and code='''
def apply(a:Double,b:Double)={
   a + b
}
''';

总结

适用范围

方法一方便做代码分割,UDF 申明可以放在单独文件,注册动作可以放在另外的文件,通过 include 来完成整合。

方法二相较于方法一更为简洁明了,适合数量较少的 UDF 注册。

参数设置

方法一使用 OPTIONS 关键字连接参数,方法二使用 WHERE 关键字连接参数。

目前支持的参数有:

  • lang: Scala/Java/Python
  • udfType: UDF/UDAF
  • code: UDF 代码
  • className: code中自定义类名(仅Java)
  • methodName: code中自定义函数名

UDF使用

无论使用哪种方式注册,你都可以开箱即用的使用注册过的 UDF。下面是一个使用上面注册过的 UDF 的例子。

> SELECT plusFun(1,2) AS sum;
3

支持的语言/UDF种类

  • Scala:UDF/UDAF
  • Java:UDF
  • Python:UDF

 

Python UDF

使用 Python 语言开发 UDF 时,需要在 register 语句中指定如下信息:

  • 指定 lang 为 Python
  • 指定 udfType 为 UDF

对于 Python UDF,特别说明以下几点:

  1. Byzer 支持 Python 版本为 2.7.1
  2. Python 不支持任何 native 库,比如 numpy.
  3. Python 必要使用 dataType 参数指定返回值的类型(例子1) 目前我们支持的 Python UDF 返回类型只能是如下类型或者他们的组合
    • string
    • float
    • double
    • integer
    • short
    • date
    • binary
    • map
    • array
  4. 为了弥补 Python UDF 的不足,Byzer 提供了专门的交互式 Python 语法以及大规模数据处理的 Python 语法。在 Python 专门章节 我们会提供更详细的介绍。

因此,我们建议对于 Python 尽可能只做简单的文本解析处理,以及使用原生自带的库。

例子

> REGISTER ScriptUDF.`` AS echoFun WHERE
and lang="python"
and dataType="map(string,string)"
and code='''
def apply(self,m):
    return m
 ''';

使用

> SELECT echoFun(map("a","b")) AS res;
{ "a": "b" }

 

Scala UDF

使用 Scala 语言开发 UDF 时,需要在 register 语句中指定如下信息:

  • 指定 lang 为 Scala
  • 指定 udfType 为 UDF

例子

> REGISTER ScriptUDF.`` AS plusFun WHERE
and lang="scala"
and udfType="udf"
and code='''
def apply(a:Double,b:Double)={
a + b
}
''';

使用

> SELECT plusFun(1,2) AS sum;
3

 

Scala UDAF

使用 Scala 语言开发 UDAF 时,需要在 register 语句中指定如下信息:

  • 指定 lang 为 Scala
  • 指定 udfType 为 UDAF

例子

> SET plusFun='''
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
class SumAggregation extends UserDefinedAggregateFunction with Serializable{
    def inputSchema: StructType = new StructType().add("a", LongType)
    def bufferSchema: StructType =  new StructType().add("total", LongType)
    def dataType: DataType = LongType
    def deterministic: Boolean = true
    def initialize(buffer: MutableAggregationBuffer): Unit = {
      buffer.update(0, 0l)
    }
    def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      val sum   = buffer.getLong(0)
      val newitem = input.getLong(0)
      buffer.update(0, sum + newitem)
    }
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))
    }
    def evaluate(buffer: Row): Any = {
      buffer.getLong(0)
    }
}
''';

> LOAD script.`plusFun` AS scriptTable;

> REGISTER ScriptUDF.`scriptTable` AS plusFun options
className="SumAggregation"
and udfType="udaf";

使用

> SET data='''
{"a":1}
{"a":1}
{"a":1}
{"a":1}
''';
> LOAD jsonStr.`data` AS dataTable;

> SELECT a,plusFun(a) AS res FROM dataTable GROUP BY a AS output;
| a | res |
|===|=====|
| 1 |  4  |

 

Java UDF

使用 Java 语言开发 UDF 时,需要在 register 语句中指定如下信息:

  • 指定 lang 为 java
  • 指定 udfType 为 udf
  • 指定 className 为 UDF 类名

另外,还需要额外注意几点:

  • 传递的代码必须是一个 Java 类,系统默认会寻找 apply() 方法做为运行的 UDF
  • 需要指定 className/methodName 进行声明(如例子), 不指定类名将导致 Java UDF 无法编译。
  • 暂时不支持包名

例子

REGISTER ScriptUDF.`` AS echoFun WHERE 
and lang="java"
and udfType="udf"
and className="Test"
and methodName="test"
and code='''
import java.util.HashMap;
import java.util.Map;
public class Test {
    public Map<String, String> test(String s) {
      Map m = new HashMap<>();
      m.put(s, s);
      return m;
  }
}
''';

使用:

SET data='''{"a":"a"}''';
LOAD jsonStr.`data` AS dataTable;

SELECT echoFun(a) AS res FROM dataTable AS output;
Logo

更多推荐