Byzer 内置了加载流式数据的能力,本章节我们会介绍如何通过 Byzer 加载流式数据源如 Kafka 中的数据。
加载 Kafka 流式数据源
Byzer 显示的支持 Kafka 作为流式数据源,也支持将其作为普通数据源进行 AdHoc 加载,性能同样可观。
本章只介绍数据加载,想了解更多流式编程细节,请查看 使用 Byzer 处理流数据。
1. 流式加载
注意: Byzer 支持
0.10
以上 Kafka 版本
流式加载数据源,需要使用 kafka
关键字,并通过 kafka.bootstrap.servers
参数指定服务器地址,如下示例:
> LOAD kafka.`$topic_name` OPTIONS
`kafka.bootstrap.servers`="${your_servers}"
AS kafka_post_parquet;
2. AdHoc加载
AdHoc 加载数据源,需要使用 AdHoc
关键字,并通过 kafka.bootstrap.servers
参数指定服务器地址,如下示例:
> LOAD adHocKafka.`$topic_name` WHERE
kafka.bootstrap.servers="${your_servers}"
and multiplyFactor="2"
AS table1;
> SELECT count(*) FROM table1 WHERE value LIKE "%yes%" AS output;
multiplyFactor
代表设置两个线程扫描同一个分区,同时提升两倍的并行度,加快速度。
Byzer 还提供一些高级参数来指定范围,它们是成组出现的:
通过时间
timeFormat:
指定日期格式 startingTime:
指定开始时间 endingTime:
指定结束时间
通过offset
staringOffset:
指定起始位置 staringOffset:
指定结束位置
通过offset需要指定每个分区的起始结束,比较麻烦,使用较少。
加载 MockStream 流式数据源
Byzer 显示的支持 MockStream。它可以用来模拟数据源,广泛应用于测试场景中。
本章只介绍数据加载,想了解更多流式编程细节,请查看 使用 Byzer 处理流数据。
模拟输入数据源
下面是个简单的例子:
-- 模拟数据
> SET data='''
{"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
''';
-- 将数据加载成表
> LOAD jsonStr.`data` as datasource;
-- 将表转化成流式数据源
> LOAD mockStream.`datasource` options
stepSizeRange="0-3"
AS newkafkatable1;
stepSizeRange
控制每个周期发送的数据条数,例子中 0-3
代表 0 到 3 条数据。
所有评论(0)