热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

sparksql和sparkcore实现地域报表分析

文章目录计算逻辑sparksql实现代码sparkcore实现代码读取的普通日志文件读取parquet文件计算逻辑sparksql实现代码package com.dmp.repor


文章目录

    • 计算逻辑
    • spark sql实现代码
    • spark core实现代码
    • 读取的普通日志文件
    • 读取parquet文件


计算逻辑

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

spark sql实现代码

package com.dmp.report
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object AreaAnalysRpt {
  def main(args: Array[String]): Unit = {
    // 0 校验参数个数
    if (args.length != 1) {
      println(
        """
          |cn.dmp.report.AreaAnalyseRpt
          |参数:
          | logInputPath
        """.stripMargin)
      sys.exit()
    }
    // 1 接受程序参数
    val Array(logInputPath) = args
    // 2 创建sparkconf->sparkContext
    val sparkConf = new SparkConf()
    sparkConf.setAppName(s"${this.getClass.getSimpleName}")
    sparkConf.setMaster("local[*]")
    // RDD 序列化到磁盘 worker与worker之间的数据传输
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(sparkConf)
    val sQLContext = new SQLContext(sc)
    // 读取parquet文件
    val parquetData: DataFrame = sQLContext.read.parquet(logInputPath)
    // dataframe -> table
    parquetData.registerTempTable("log")
    //业务逻辑
    sQLContext.sql(
      """
        |select
        |provincename, cityname,
        |sum(case when requestmode=1 and processnode >=2 then 1 else 0 end) 有效请求,
        |sum(case when requestmode=1 and processnode =3 then 1 else 0 end) 广告请求,
        |sum(case when iseffective=1 and isbilling=1 and isbid=1 and adorderid !=0 then 1 else 0 end) 参与竞价数,
        |sum(case when iseffective=1 and isbilling=1 and iswin=1 then 1 else 0 end) 竞价成功数,
        |sum(case when requestmode=2 and iseffective=1 then 1 else 0 end) 展示数,
        |sum(case when requestmode=3 and iseffective=1 then 1 else 0 end) 点击数,
        |sum(case when iseffective=1 and isbilling=1 and iswin=1 then 1.0*adpayment/1000 else 0 end) 广告成本,
        |sum(case when iseffective=1 and isbilling=1 and iswin=1 then 1.0*winprice/1000 else 0 end) 广告消费
        |from log
        |group by provincename, cityname
      """.stripMargin
    ).show()
  }
}

spark core实现代码

处理计算指标的逻辑类

package cn.dmp.utils
object RptUtils {
    /**
      * List(原始请求,有效请求,广告请求)
      */
    def caculateReq(reqMode: Int, prcNode: Int): List[Double] = {
        if (reqMode == 1 && prcNode == 1) {
            List[Double](1, 0, 0)
        } else if (reqMode == 1 && prcNode == 2) {
            List[Double](1, 1, 0)
        } else if (reqMode == 1 && prcNode == 3) {
            List[Double](1, 1, 1)
        } else List[Double](0, 0, 0)
    }
    /**
      * List(参与竞价,竞价成功,消费,成本)
      */
    def caculateRtb(effTive: Int, bill: Int, bid: Int, orderId: Int, win: Int, winPrice: Double, adPayMent: Double): List[Double] = {
        if (effTive == 1 && bill == 1 && bid == 1 && orderId != 0) {
            List[Double](1, 0, 0, 0)
        } else if (effTive == 1 && bill == 1 && win == 1) {
            List[Double](0, 1, winPrice / 1000.0, adPayMent / 1000.0)
        } else List[Double](0, 0, 0, 0)
    }
    /**
      * List(广告展示,点击)
      */
    def caculateShowClick(reqMode: Int, effTive: Int): List[Double] = {
        if (reqMode == 2 && effTive == 1) {
            List[Double](1, 0)
        } else if (reqMode == 3 && effTive == 1) {
            List[Double](0, 1)
        } else List[Double](0, 0)
    }
}

读取的普通日志文件

package com.dmp.report
import com.dmp.beans.Log
import com.dmp.utils.RptUtils
import org.apache.spark.{SparkConf, SparkContext}
/*
  F:\牛牛学堂大数据24期\09-实训实战-9天\dmp&&移动项目\dmp\2016-10-01_06_p1_invalid.1475274123982.log.FINISH.bz2
  C:\Users\admin\Desktop\result3
  *
  */
object AreaAnalysRpt2 {
  def main(args: Array[String]): Unit = {
    // 0 校验参数个数
    if (args.length != 2) {
      println(
        """
          |cn.dmp.report.AreaAnalyseRpt
          |参数:
          | logInputPath
          | resultOutputPath
        """.stripMargin)
      sys.exit()
    }
    // 1 接受程序参数
    val Array(logInputPath, resultOutputPath) = args
    // 2 创建sparkconf->sparkContext
    val sparkConf = new SparkConf()
    sparkConf.setAppName(s"${this.getClass.getSimpleName}")
    sparkConf.setMaster("local[*]")
    // RDD 序列化到磁盘 worker与worker之间的数据传输
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(sparkConf)
    sc.textFile(logInputPath)
      .map(_.split(",", -1))
      .filter(_.length >= 85)
      .map(arr => {
        val log = Log(arr)
        val req = RptUtils.caculateReq(log.requestmode, log.processnode)
        val rtb = RptUtils.caculateRtb(log.iseffective, log.isbilling, log.isbid, log.adorderid, log.iswin, log.winprice, log.adpayment)
        val showClick = RptUtils.caculateShowClick(log.requestmode, log.iseffective)
        // (省,地市,媒体,渠道,操作系统,网络类型,...,List(9个指标数据))
        ((log.provincename, log.cityname), req ++ rtb ++ showClick)
      }).reduceByKey((list1, list2) => {
      //将两个list拉链组合在一个,相同位置的元素组成一个tuple,map操作进行累加
      list1.zip(list2).map(t => t._1 + t._2)
    })// 会打印一个(),使用map将数据进行整理
      .map(t => t._1._1 + ","+t._1._2+","+t._2.mkString)
      .saveAsTextFile(resultOutputPath)
  }
}

读取parquet文件

package com.dmp.report
import com.dmp.utils.RptUtils
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
/*
 C:\Users\admin\Desktop\result1\
  C:\Users\admin\Desktop\result3
  *
  */
object AreaAnalysRpt3 {
  def main(args: Array[String]): Unit = {
    // 0 校验参数个数
    if (args.length != 2) {
      println(
        """
          |cn.dmp.report.AreaAnalyseRpt
          |参数:
          | logInputPath
          | resultOutputPath
        """.stripMargin)
      sys.exit()
    }
    // 1 接受程序参数
    val Array(logInputPath, resultOutputPath) = args
    // 2 创建sparkconf->sparkContext
    val sparkConf = new SparkConf()
    sparkConf.setAppName(s"${this.getClass.getSimpleName}")
    sparkConf.setMaster("local[*]")
    // RDD 序列化到磁盘 worker与worker之间的数据传输
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(sparkConf)
    val sQLContext = new SQLContext(sc)
    val parquetData = sQLContext.read.parquet(logInputPath)
    parquetData.map(row => {
      //从row中获取具体字段
      // 是不是原始请求,有效请求,广告请求 List(原始请求,有效请求,广告请求)
      val reqMode = row.getAs[Int]("requestmode")
      val prcNode = row.getAs[Int]("processnode")
      // 参与竞价, 竞价成功  List(参与竞价,竞价成功, 消费, 成本)
      val effTive = row.getAs[Int]("iseffective")
      val bill = row.getAs[Int]("isbilling")
      val bid = row.getAs[Int]("isbid")
      val orderId = row.getAs[Int]("adorderid")
      val win = row.getAs[Int]("iswin")
      val winPrice = row.getAs[Double]("winprice")
      val adPayMent = row.getAs[Double]("adpayment")
      //      转换为list
      val reqList = RptUtils.caculateReq(reqMode, prcNode)
      val rtbList = RptUtils.caculateRtb(effTive, bill, bid, orderId, win, winPrice, adPayMent)
      val showClickList = RptUtils.caculateShowClick(reqMode, effTive)
      //返回元组
      // 返回元组
      ((row.getAs[String]("provincename"), row.getAs[String]("cityname")), reqList ++ rtbList ++ showClickList)
    }).reduceByKey((list1, list2) => {
      list1.zip(list2).map(t => t._1 + t._2)
    }).map(t => t._1._1 + "," + t._1._2 + "," +t._2.mkString(","))
      .saveAsTextFile(resultOutputPath)
    sc.stop()
  }
}

               


推荐阅读
  • 本文介绍了一种轻巧方便的工具——集算器,通过使用集算器可以将文本日志变成结构化数据,然后可以使用SQL式查询。集算器利用集算语言的优点,将日志内容结构化为数据表结构,SPL支持直接对结构化的文件进行SQL查询,不再需要安装配置第三方数据库软件。本文还详细介绍了具体的实施过程。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文讨论了在Spring 3.1中,数据源未能自动连接到@Configuration类的错误原因,并提供了解决方法。作者发现了错误的原因,并在代码中手动定义了PersistenceAnnotationBeanPostProcessor。作者删除了该定义后,问题得到解决。此外,作者还指出了默认的PersistenceAnnotationBeanPostProcessor的注册方式,并提供了自定义该bean定义的方法。 ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • FeatureRequestIsyourfeaturerequestrelatedtoaproblem?Please ... [详细]
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • Android开发实现的计时器功能示例
    本文分享了Android开发实现的计时器功能示例,包括效果图、布局和按钮的使用。通过使用Chronometer控件,可以实现计时器功能。该示例适用于Android平台,供开发者参考。 ... [详细]
  • 本文介绍了在Linux下安装和配置Kafka的方法,包括安装JDK、下载和解压Kafka、配置Kafka的参数,以及配置Kafka的日志目录、服务器IP和日志存放路径等。同时还提供了单机配置部署的方法和zookeeper地址和端口的配置。通过实操成功的案例,帮助读者快速完成Kafka的安装和配置。 ... [详细]
  • 解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法
    本文介绍了解决nginx启动报错epoll_wait() reported that client prematurely closed connection的方法,包括检查location配置是否正确、pass_proxy是否需要加“/”等。同时,还介绍了修改nginx的error.log日志级别为debug,以便查看详细日志信息。 ... [详细]
  • iOS Swift中如何实现自动登录?
    本文介绍了在iOS Swift中如何实现自动登录的方法,包括使用故事板、SWRevealViewController等技术,以及解决用户注销后重新登录自动跳转到主页的问题。 ... [详细]
  • 本文由编程笔记小编整理,主要介绍了使用Junit和黄瓜进行自动化测试中步骤缺失的问题。文章首先介绍了使用cucumber和Junit创建Runner类的代码,然后详细说明了黄瓜功能中的步骤和Steps类的实现。本文对于需要使用Junit和黄瓜进行自动化测试的开发者具有一定的参考价值。摘要长度:187字。 ... [详细]
  • 从Oracle安全移植到国产达梦数据库的DBA实践与攻略
    随着我国对信息安全和自主可控技术的重视,国产数据库在党政机关、军队和大型央企等行业中得到了快速应用。本文介绍了如何降低从Oracle到国产达梦数据库的技术门槛,保障用户现有业务系统投资。具体包括分析待移植系统、确定移植对象、数据迁移、PL/SQL移植、校验移植结果以及应用系统的测试和优化等步骤。同时提供了移植攻略,包括待移植系统分析和准备移植环境的方法。通过本文的实践与攻略,DBA可以更好地完成Oracle安全移植到国产达梦数据库的工作。 ... [详细]
  • 本文介绍了一个React Native新手在尝试将数据发布到服务器时遇到的问题,以及他的React Native代码和服务器端代码。他使用fetch方法将数据发送到服务器,但无法在服务器端读取/获取发布的数据。 ... [详细]
  • 使用eclipse创建一个Java项目的步骤
    本文介绍了使用eclipse创建一个Java项目的步骤,包括启动eclipse、选择New Project命令、在对话框中输入项目名称等。同时还介绍了Java Settings对话框中的一些选项,以及如何修改Java程序的输出目录。 ... [详细]
author-avatar
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有