Byzer 面试必考题 —— 同一用户同一位置停留时长
老张最近在整理面试题的时候,发现一道题目非常具有代表性,几乎数据开发岗位的同学都耳熟能详的一道题,我们下面将演示2种写法,分别看下他们之间的异同。需求现有如下数据文件需要处理,格式:CSV,位置: hdfs://byzerwh/input.csv ,大小:100GB字段名:user_id, location_id, time, duration字段中文名:用户ID,位置ID,开始时间,停留时长(分
老张最近在整理面试题的时候,发现一道题目非常具有代表性,几乎数据开发岗位的同学都耳熟能详的一道题,我们下面将演示2种写法,分别看下他们之间的异同。
需求
现有如下数据文件需要处理,格式:CSV,位置: hdfs://byzerwh/input.csv ,大小:100GB
字段名:user_id, location_id, time, duration
字段中文名:用户ID,位置ID,开始时间,停留时长(分钟)
样例(4行):
UserA,LocationA,2022-01-01 08:00:00,60
UserA,LocationA,2022-01-01 09:00:00,60
UserA,LocationB,2022-01-01 10:00:00,60
UserA,LocationA,2022-01-01 11:00:00,60
要求
-
对同一个用户,在同一个位置,连续的多条记录进行合并
-
合并原则:开始时间取最早的,停留时长加和
-
请使用Spark、MapReduce或其他分布式计算引擎处理
解读
样例数据中的数据含义是:
用户UserA,在LocationA位置,从8点开始,停留了60分钟
用户UserA,在LocationA位置,从9点开始,停留了60分钟
用户UserA,在LocationB位置,从10点开始,停留了60分钟
用户UserA,在LocationA位置,从11点开始,停留了60分钟
该样例期待输出:
UserA,LocationA,2022-01-01 08:00:00,120
UserA,LocationB,2022-01-01 10:00:00,60
UserA,LocationA,2022-01-01 11:00:00,60
我们先来看一下使用Spark DataSet 和 RDD 来解决这个问题。
Spark RDD求解
我们观察数据可以发现,合并原则中提到的开始时间取最早这一需求,SQL范式写起来会非常复杂,如果通过编程范式,应该一个for循环就可以解决,那么我们先理清一下思路,先解决下面几个问题:
-
问题1,同一个用户,在同一个位置,连续的多条记录进行合并应该怎么做?
-
问题2,开始时间取最早怎么做?怎么记录开始时间?
如果我们是对MapReduce比较熟悉的话,这个问题其实比较好解决,无非就是把Mapper、Reducer、自定义分组排序模板代码搬过来,map发一个组合key,经过二次排序和分组后,一个迭代器就有了同一个用户,在同一个位置的数据,然后再写单线程的逻辑就很方便了。
现在我们使用spark引擎,可以依法炮制。
思路:按照按照用户ID和位置ID分组,分组之后按照时间列排序,由于数据之间的存在依赖关系,并且依赖关系比较连续,满足某种关系的数据要进行合并操作,因此使用sql部分的代码很难实现。在这使用的是将Dataset转化为RDD之后使用基于分区进行操作的方法处理数据。拿到相关的数据,按照时间顺序读取,判断,累加等进行处理。
import java.text.SimpleDateFormat
import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}
import scala.collection.mutable.ArrayBuffer
case class ResultData(userID:String,locationID:String,startTime:String,endTime:String,stayTime:Long)
object Test {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val info = spark.read
.format("csv")
.option("path", "hdfs://byzerwh/input.csv")
.load()
.toDF("userID", "locationID", "startTimes", "stayMinutes")
.as[(String, String, String, String)]
val ds: Dataset[((String, String, String), ResultData)] = info.map {
case (userID, locationID, startTimes, stayMinutes) =>
//让起始时间+停留时间=结束时间
val sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val date = sd.parse(startTimes)
val endTime = sd.format(date.getTime + (stayMinutes.trim.toInt * 60 * 1000))
((userID, locationID, startTimes), ResultData(userID, locationID, startTimes, endTime, stayMinutes.trim.toLong))
}.as[((String, String, String), ResultData)]
//按照用户ID和位置ID分组,分组之后按照时间列排序
val newDS: RDD[((String, String, String), ResultData)] = ds.rdd.repartitionAndSortWithinPartitions(new Partitioner {
override def numPartitions: Int = 4
override def getPartition(key: Any): Int = key match {
case (userID, locationID, _) => (userID.hashCode + locationID.hashCode) % numPartitions
case _ => 0
}
})
val result = newDS.mapPartitions(iter => {
val listBuffer = iter.toBuffer
val buffer = ArrayBuffer.empty[ResultData]
var resultData: ResultData = null;
var currentUserAndLoc = ""
//分区内只有一个元素的情况
if (listBuffer.size == 1) {
resultData = listBuffer(0)._2;
buffer += resultData
} else {
//分区内有多个元素
listBuffer.foreach {
case ((userID, locationID, startTimes), currentData) =>
//初始化赋值
if (resultData == null) {
resultData = ResultData(userID, locationID, startTimes, currentData.endTime, currentData.stayTime)
currentUserAndLoc = userID + "#" + locationID
} else {
if (currentUserAndLoc == userID + "#" + locationID ){
//如果当前行的起始时间与上一行的结束时间相同
if (currentData.startTime == resultData.endTime) {
//合并 修改初始值
resultData = ResultData(currentData.userID, currentData.locationID, resultData.startTime, currentData.endTime, resultData.stayTime + currentData.stayTime)
} else {
//不相同的情况下,将上一行结果添加到结果集,并修改初始值
buffer += resultData
resultData = currentData
}
} else {
//不是同一组的情况下,将上一行结果添加到结果集,并修改初始值
buffer += resultData
resultData = currentData
currentUserAndLoc = userID + "#" + locationID
}
}
}
}
buffer.toIterator
})
result.collect()
.sortBy(_.startTime)
.foreach(println)
}
}
上面的83行完成了这个需求,上面提到的问题1我们通过repartitionAndSortWithinPartitions解决了,问题2我们通过一个listBuffer做记录。我们可以发现spark RDD的方式也不是很复杂,唯一要注意的是分区不等于分组,没有MR中GroupingComparator分组函数的情况下,我们需要人为的对partition做分组处理。
如果你是一个编程人员,比较倾向于自己控制算子或者抽象比较通用的能力,那么RDD方式更灵活更适合你,毕竟有些逻辑在SQL中是不太好实现的,在byzer中,你可以将上面用spark写好的逻辑,按照ET的规范添加为一个ET插件,然后通过插件安装的方式安装的byzer引擎中使用,具体参考:插件系统 - 插件安装。
大家可能会问,为什么Spark可以做,还要用byzer ET呢?因为在byzer中我们提供了丰富的数据源和ET等插件,可以直接复用,开发者只需要关心用RDD编写的一些处理逻辑,其他的点就可以不用去关注了。给用户使用的时候调用会变得非常简单,例如 run command as SendMessage
设置好where参数就可以调用了。用户使用方式会比较统一,并且不需要再去关心如何构建和部署,直接在notebook中通过一行命令就可以调用到开发者写的逻辑。
我们思考一个问题,编程范式可以很好的解决这种需要循环处理的问题,但是不是所有人都能上手写这么复杂的代码,SQL 范式相对简单,上手难度小,我们是不是可以使用sql来解决上述问题呢?
Byzer 求解(或spark sql语法)
如果你只想用SQL的方式处理,我们可以采用下面的方式。
- 我们先本地创建一个CSV文件,内容如下:
$ cat input.csv
userID,locationID,time,duration
UserA,LocationA,2022-01-01 08:00:00,60
UserA,LocationA,2022-01-01 09:00:00,60
UserA,LocationB,2022-01-01 10:00:00,60
UserA,LocationA,2022-01-01 11:00:00,60
- 在notebook进行上传,然后我们可以在文件系统
tmp/upload
下找到刚刚上传的文件。 - 加载csv文件
load csv.`/tmp/upload/input.csv` as dura;
- 通过lag可以获取窗口的上一条,通过求差值判断是否连续。然后对连续的行做分组累加:
select user_id,
location_id,
time,
duration,
int((unix_timestamp(time)-unix_timestamp(lag(time, 1, time) over (partition by user_id, location_id order by time)))/3600) as diff
from dura as dura_difference;
select user_id,
location_id,
time,
duration,
sum(if(diff > 1, 1, 0)) over (partition by user_id,location_id order by time) as grp_id
from dura_difference as dura_regroup;
select user_id, location_id, min(time) as start_time, sum(duration) as sum_duration
from dura_regroup
group by user_id, location_id, grp_id as output;
partition by user_id, location_id order by time 表示对user_id, location_id两字段分组,然后基于time排序。
lag(time,1,time) over (partition by … order by …)
我们在lag窗口中找到当前行的前一行,找不到则使用当前行time
sum(if(diff > 1, 1, 0)) over (partition by … order by…) 是计算当前行及以前的sum累加和
这样就可以sum if over累加前面大于1的值
比如 0,0,1,1,2,2
这样0是1组,1是一组,2是一组。如果lag的结果是0的行(即time没有前一行,user_id, location_id分区的第一行)此时sum也是0,不影响我们累加和计算
- 执行验证结果
Byzer vs Spark RDD vs Spark SQL
我们来看下Byzer、Spark RDD和Spark SQL之间的优劣对比:
更多推荐
所有评论(0)