老张最近在整理面试题的时候,发现一道题目非常具有代表性,几乎数据开发岗位的同学都耳熟能详的一道题,我们下面将演示2种写法,分别看下他们之间的异同。

需求

现有如下数据文件需要处理,格式:CSV,位置: hdfs://byzerwh/input.csv ,大小:100GB

字段名:user_id, location_id, time, duration

字段中文名:用户ID,位置ID,开始时间,停留时长(分钟)

样例(4行):

UserA,LocationA,2022-01-01 08:00:00,60

UserA,LocationA,2022-01-01 09:00:00,60

UserA,LocationB,2022-01-01 10:00:00,60

UserA,LocationA,2022-01-01 11:00:00,60

要求

  • 对同一个用户,在同一个位置,连续的多条记录进行合并

  • 合并原则:开始时间取最早的,停留时长加和

  • 请使用Spark、MapReduce或其他分布式计算引擎处理

解读

样例数据中的数据含义是:

用户UserA,在LocationA位置,从8点开始,停留了60分钟

用户UserA,在LocationA位置,从9点开始,停留了60分钟

用户UserA,在LocationB位置,从10点开始,停留了60分钟

用户UserA,在LocationA位置,从11点开始,停留了60分钟

该样例期待输出:

UserA,LocationA,2022-01-01 08:00:00,120

UserA,LocationB,2022-01-01 10:00:00,60

UserA,LocationA,2022-01-01 11:00:00,60

我们先来看一下使用Spark DataSet 和 RDD 来解决这个问题。

Spark RDD求解

我们观察数据可以发现,合并原则中提到的开始时间取最早这一需求,SQL范式写起来会非常复杂,如果通过编程范式,应该一个for循环就可以解决,那么我们先理清一下思路,先解决下面几个问题:

  • 问题1,同一个用户,在同一个位置,连续的多条记录进行合并应该怎么做?

  • 问题2,开始时间取最早怎么做?怎么记录开始时间?

如果我们是对MapReduce比较熟悉的话,这个问题其实比较好解决,无非就是把Mapper、Reducer、自定义分组排序模板代码搬过来,map发一个组合key,经过二次排序和分组后,一个迭代器就有了同一个用户,在同一个位置的数据,然后再写单线程的逻辑就很方便了。

现在我们使用spark引擎,可以依法炮制。

思路:按照按照用户ID和位置ID分组,分组之后按照时间列排序,由于数据之间的存在依赖关系,并且依赖关系比较连续,满足某种关系的数据要进行合并操作,因此使用sql部分的代码很难实现。在这使用的是将Dataset转化为RDD之后使用基于分区进行操作的方法处理数据。拿到相关的数据,按照时间顺序读取,判断,累加等进行处理。

import java.text.SimpleDateFormat

import org.apache.spark.Partitioner
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SparkSession}

import scala.collection.mutable.ArrayBuffer

case class ResultData(userID:String,locationID:String,startTime:String,endTime:String,stayTime:Long)
object Test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
    import spark.implicits._
    import org.apache.spark.sql.functions._
    val info = spark.read
      .format("csv")
      .option("path", "hdfs://byzerwh/input.csv")
      .load()
      .toDF("userID", "locationID", "startTimes", "stayMinutes")
      .as[(String, String, String, String)]

    val ds: Dataset[((String, String, String), ResultData)] = info.map {
      case (userID, locationID, startTimes, stayMinutes) =>
        //让起始时间+停留时间=结束时间
        val sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        val date = sd.parse(startTimes)
        val endTime = sd.format(date.getTime + (stayMinutes.trim.toInt * 60 * 1000))
        ((userID, locationID, startTimes), ResultData(userID, locationID, startTimes, endTime, stayMinutes.trim.toLong))
    }.as[((String, String, String), ResultData)]

    //按照用户ID和位置ID分组,分组之后按照时间列排序
    val newDS: RDD[((String, String, String), ResultData)] = ds.rdd.repartitionAndSortWithinPartitions(new Partitioner {
      override def numPartitions: Int = 4

      override def getPartition(key: Any): Int = key match {
        case (userID, locationID, _) => (userID.hashCode + locationID.hashCode) % numPartitions
        case _ => 0
      }
    })
    val result = newDS.mapPartitions(iter => {
      val listBuffer = iter.toBuffer
      val buffer = ArrayBuffer.empty[ResultData]
      var resultData: ResultData = null;
      var currentUserAndLoc = ""
      //分区内只有一个元素的情况
      if (listBuffer.size == 1) {
        resultData = listBuffer(0)._2;
        buffer += resultData
      } else {
        //分区内有多个元素
        listBuffer.foreach {
          case ((userID, locationID, startTimes), currentData) =>
            //初始化赋值
            if (resultData == null) {
              resultData = ResultData(userID, locationID, startTimes, currentData.endTime, currentData.stayTime)
              currentUserAndLoc = userID + "#" + locationID
            } else {
              if (currentUserAndLoc == userID + "#" + locationID ){
                  //如果当前行的起始时间与上一行的结束时间相同
                 if (currentData.startTime == resultData.endTime) {
                   //合并 修改初始值
                   resultData = ResultData(currentData.userID, currentData.locationID, resultData.startTime, currentData.endTime, resultData.stayTime + currentData.stayTime)
                 } else {
                   //不相同的情况下,将上一行结果添加到结果集,并修改初始值
                   buffer += resultData
                   resultData = currentData
                 }
              } else {
                //不是同一组的情况下,将上一行结果添加到结果集,并修改初始值
                buffer += resultData
                resultData = currentData
                currentUserAndLoc = userID + "#" + locationID
              }
              
            }
        }
      }
      buffer.toIterator
    })
    result.collect()
      .sortBy(_.startTime)
      .foreach(println)
  }
}

上面的83行完成了这个需求,上面提到的问题1我们通过repartitionAndSortWithinPartitions解决了,问题2我们通过一个listBuffer做记录。我们可以发现spark RDD的方式也不是很复杂,唯一要注意的是分区不等于分组,没有MR中GroupingComparator分组函数的情况下,我们需要人为的对partition做分组处理。

如果你是一个编程人员,比较倾向于自己控制算子或者抽象比较通用的能力,那么RDD方式更灵活更适合你,毕竟有些逻辑在SQL中是不太好实现的,在byzer中,你可以将上面用spark写好的逻辑,按照ET的规范添加为一个ET插件,然后通过插件安装的方式安装的byzer引擎中使用,具体参考:插件系统 - 插件安装

大家可能会问,为什么Spark可以做,还要用byzer ET呢?因为在byzer中我们提供了丰富的数据源和ET等插件,可以直接复用,开发者只需要关心用RDD编写的一些处理逻辑,其他的点就可以不用去关注了。给用户使用的时候调用会变得非常简单,例如 run command as SendMessage设置好where参数就可以调用了。用户使用方式会比较统一,并且不需要再去关心如何构建和部署,直接在notebook中通过一行命令就可以调用到开发者写的逻辑。

我们思考一个问题,编程范式可以很好的解决这种需要循环处理的问题,但是不是所有人都能上手写这么复杂的代码,SQL 范式相对简单,上手难度小,我们是不是可以使用sql来解决上述问题呢?

Byzer 求解(或spark sql语法)

如果你只想用SQL的方式处理,我们可以采用下面的方式。

  1. 我们先本地创建一个CSV文件,内容如下:
$ cat input.csv

userID,locationID,time,duration
UserA,LocationA,2022-01-01 08:00:00,60
UserA,LocationA,2022-01-01 09:00:00,60
UserA,LocationB,2022-01-01 10:00:00,60
UserA,LocationA,2022-01-01 11:00:00,60
  1. 在notebook进行上传,然后我们可以在文件系统 tmp/upload 下找到刚刚上传的文件。
  2. 加载csv文件
load csv.`/tmp/upload/input.csv` as dura;
  1. 通过lag可以获取窗口的上一条,通过求差值判断是否连续。然后对连续的行做分组累加:
select user_id,
       location_id,
       time,
       duration, 
       int((unix_timestamp(time)-unix_timestamp(lag(time, 1, time) over (partition by user_id, location_id order by time)))/3600) as diff
from dura as dura_difference;

select user_id,
                location_id,
                time,
                duration,
                sum(if(diff > 1, 1, 0)) over (partition by user_id,location_id order by time) as grp_id
         from dura_difference as dura_regroup;

select user_id, location_id, min(time) as start_time, sum(duration) as sum_duration
from dura_regroup
group by user_id, location_id, grp_id as output;

partition by user_id, location_id order by time 表示对user_id, location_id两字段分组,然后基于time排序。
lag(time,1,time) over (partition by … order by …)
我们在lag窗口中找到当前行的前一行,找不到则使用当前行time

sum(if(diff > 1, 1, 0)) over (partition by … order by…) 是计算当前行及以前的sum累加和
这样就可以sum if over累加前面大于1的值
比如 0,0,1,1,2,2
这样0是1组,1是一组,2是一组。如果lag的结果是0的行(即time没有前一行,user_id, location_id分区的第一行)此时sum也是0,不影响我们累加和计算

  1. 执行验证结果

在这里插入图片描述

Byzer vs Spark RDD vs Spark SQL

我们来看下Byzer、Spark RDD和Spark SQL之间的优劣对比:

在这里插入图片描述

Logo

更多推荐