Byzer 内置了加载流式数据的能力,本章节我们会介绍如何通过 Byzer 加载流式数据源如 Kafka 中的数据。

加载 Kafka 流式数据源

Byzer 显示的支持 Kafka 作为流式数据源,也支持将其作为普通数据源进行 AdHoc 加载,性能同样可观。

本章只介绍数据加载,想了解更多流式编程细节,请查看 使用 Byzer 处理流数据

1. 流式加载

注意: Byzer 支持 0.10 以上 Kafka 版本

流式加载数据源,需要使用 kafka 关键字,并通过 kafka.bootstrap.servers 参数指定服务器地址,如下示例:

> LOAD kafka.`$topic_name` OPTIONS
  `kafka.bootstrap.servers`="${your_servers}"
  AS kafka_post_parquet;

2. AdHoc加载

AdHoc 加载数据源,需要使用 AdHoc 关键字,并通过 kafka.bootstrap.servers 参数指定服务器地址,如下示例:

> LOAD adHocKafka.`$topic_name` WHERE 
  kafka.bootstrap.servers="${your_servers}"
  and multiplyFactor="2" 
  AS table1;
  
> SELECT count(*) FROM table1 WHERE value LIKE "%yes%" AS output;

multiplyFactor 代表设置两个线程扫描同一个分区,同时提升两倍的并行度,加快速度。

Byzer 还提供一些高级参数来指定范围,它们是成组出现的:

通过时间

timeFormat:指定日期格式 startingTime:指定开始时间 endingTime:指定结束时间

通过offset

staringOffset:指定起始位置 staringOffset:指定结束位置

通过offset需要指定每个分区的起始结束,比较麻烦,使用较少。

 

加载 MockStream 流式数据源

Byzer 显示的支持 MockStream。它可以用来模拟数据源,广泛应用于测试场景中。

本章只介绍数据加载,想了解更多流式编程细节,请查看 使用 Byzer 处理流数据

模拟输入数据源

下面是个简单的例子:

-- 模拟数据
> SET data='''
{"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
''';

-- 将数据加载成表
> LOAD jsonStr.`data` as datasource;

-- 将表转化成流式数据源
> LOAD mockStream.`datasource` options
  stepSizeRange="0-3"
  AS newkafkatable1;

stepSizeRange 控制每个周期发送的数据条数,例子中 0-3 代表 0 到 3 条数据。

Logo

更多推荐