需求: ① 过滤掉空的数据 ② 计算出每个用户累计到当前月的充值金额 思路:开窗累加,按照用户id进行分区,按照时间排序,指定窗口大小为上无边界,到当前行 ③ 实现:
spark.sql("""|select|lower(uid) as uid,|regexp_replace(dt,'(\\/+)',"-") as dayTime,|num|from userinfo|where regexp_replace(dt,'(\\/+)',"-") is not null""".stripMargin).createOrReplaceTempView("t1")spark.sql("""|select|uid,|dayTime,|num,|sum(num)over(partition by uid order by dayTime asc rows between unbounded preceding and current row)|from t1""".stripMargin).show()
指标分析结果:
+---+---------+---+------------------------------------------------------------------------------------------------------------------+ |uid| dayTime|num|sum(num) OVER (PARTITION BY uid ORDER BY dayTime ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)| +---+---------+---+------------------------------------------------------------------------------------------------------------------+ |u01|2017-1-21| 5| 5| |u01|2017-1-23| 6| 11| |u01|2017-2-21| 8| 19| |u01|2017-2-22| 4| 23| |u02|2017-1-23| 6| 6| |u02|2017-1-23| 6| 12| |u04|2017-1-20| 3| 3| |u03|2017-1-22| 8| 8| +---+---------+---+------------------------------------------------------------------------------------------------------------------+
val spark: SparkSession = SparkSession.builder().appName("SparkSql2Hive").master("local[2]").enableHiveSupport()//开启支持hive.config("spark.sql.warehouse.dir","hdfs://hadoop102:9000/user/hive/warehouse").getOrCreate()