热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spark直接将日志写入指定文件实例_7个实例全面掌握SparkMapReduce(上篇)

本文是在学习“7个实例全面掌握HadoopMapReduce”这篇文章后,用Spark重新实现了里面的例子,希望对初学Spark的童学有帮助࿰

本文是在学习 “7个实例全面掌握Hadoop MapReduce” 这篇文章后,用Spark重新实现了里面的例子,希望对初学Spark的童学有帮助,欢迎大牛们留言提意见。

上篇包含4个实例:

  • 1:自定义对象序列化
  • 2:自定义分区
  • 3:计算出每组订单中金额最大的记录
  • 4:合并多个小文件

下篇包含3个实例:

  • 5:分组输出到多个文件
  • 6:join操作
  • 7:计算出用户间的共同好友

实例1:自定义对象序列化

1)需求

需要统计手机用户流量日志,日志内容实例:

150406196aea91c11c6b28553696abff.png

要把同一个用户的上行流量、下行流量进行累加,并计算出综合。

例如上面的13897230503有两条记录,就要对这两条记录进行累加,计算总和,得到:

13897230503,500,1600,2100

2)代码实践

package mapReduceimport org.apache.spark.sql.{Dataset, SparkSession}object FlowCount {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("FlowCount").master("local[1]").getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._// Optional, help avoid errors due to long lineagespark.sparkContext.setLogLevel("WARN")spark.sparkContext.setCheckpointDir("hdfs://127.0.0.1:9090/tmp/")val base = "hdfs://127.0.0.1:9090/user/ds/"val lines = spark.read.textFile(base + "flowcount.txt")/* 生成DataSet */val flowData = lines.map(preparation).toDF("phone", "upFlow", "dFlow", "sumFlow")flowData.show(5)/* 以手机号为key,对数值求和 */val sumData = flowData.groupBy("phone").agg(sum("upFlow"),sum("dFlow"), sum("sumFlow"))sumData.show(5)spark.stop()}def preparation(str: String): FlowBean = {//切分字段val fields = str.split(" ")//assert(fields.size == 4)//组装对象var phone = fields(0).toString.trimvar upFlow = fields(1).toLongvar dFlow = fields(2).toLongFlowBean(phone, upFlow, dFlow, upFlow + dFlow)}}/** phone:手机号,* upFlow:上行流量, dFlow:下行流量, sumFlow:流量合计*/
case class FlowBean(phone:String, upFlow:Long, dFlow:Long, sumFlow:Long)

3)运行结果

1840ead557a0849d72353028f4564aff.png

实例2:自定义分区

1)需求

还是以上个例子的手机用户流量日志为例,

在上个例子的统计需要基础上添加一个新需求:按省份统计,不同省份的手机号放到不同的文件里。

例如137表示属于河北,138属于河南,那么在结果输出时,他们分别在不同的文件中。

2)代码实践

package mapReduceimport org.apache.spark.sql.{Dataset, SparkSession}object FlowPartitioner {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("FlowCount").master("local[1]").getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._// Optional, help avoid errors due to long lineagespark.sparkContext.setLogLevel("WARN")spark.sparkContext.setCheckpointDir("hdfs://127.0.0.1:9090/tmp/")val base = "hdfs://127.0.0.1:9090/user/ds/"val lines = spark.read.textFile(base + "flowcount.txt")/* 指定数据分区器 */val lineData = lines.map(line => (line.split(" ")(0), line)).rdd.partitionBy(new ProvincePartitioner(3))/* 生成DataSet, 持久化避免重复混洗 */val flowData = lineData.values.map(preparation).toDF("phone", "upFlow", "dFlow", "sumFlow").persist()flowData.show(5)spark.stop()}def preparation(str: String): FlowBean02 = {//切分字段val fields = str.split(" ")//assert(fields.size == 4)//组装对象var phone = fields(0).toString.trimvar upFlow = fields(1).toLongvar dFlow = fields(2).toLongFlowBean02(phone, upFlow, dFlow, upFlow + dFlow)}}/** phone:手机号,* upFlow:上行流量, dFlow:下行流量, sumFlow:流量合计*/
case class FlowBean02(phone:String, upFlow:Long, dFlow:Long, sumFlow:Long)import org.apache.spark.Partitioner/** 自定义partitioner* 根据手机号前缀分区*/
class ProvincePartitioner(val num: Int) extends Partitioner {override def numPartitions: Int = numoverride def getPartition(key: Any): Int = {//手机号前3位val prefix = key.toString().substring(0, 3)//用分区个数取模prefix.toInt % num}
}

3)运行结果

62d0d3a60ee0ee64c1a485903f132904.png

实例3:计算出每组订单中金额最大的记录

1)需求

有如下订单数据:

70417d5e0202bd1e70cb91c2dd1779bb.png

求出每一个订单中成交金额最大的一笔交易。

2)代码实践

package mapReduceimport mapReduce.FlowPartitioner.preparation
import org.apache.spark.sql.SparkSessionobject GroupSort {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("FlowCount").master("local[1]").getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._// Optional, help avoid errors due to long lineagespark.sparkContext.setLogLevel("WARN")spark.sparkContext.setCheckpointDir("hdfs://127.0.0.1:9090/tmp/")val base = "hdfs://127.0.0.1:9090/user/ds/"val lines = spark.read.textFile(base + "groupsort.txt")/* 默认分区器是hashcode */val lineData = lines.map(preparation).repartition(3).toDF("itemid", "goodsid", "amount")val sortData = lineData.sortWithinPartitions("itemid", "amount")sortData.show(10)/* 取出金额最大的订单 */val maxData = sortData.groupBy("itemid").agg(max("amount")).sort($"itemid".asc)maxData.show(10)spark.stop()}def preparation(str: String): ItemBean = {//切分字段val fields = str.split(" ")//assert(fields.size == 4)//组装对象var itemid = fields(0).toString.trimvar goodsid = fields(1).toString.trimvar amount = fields(2).toDoubleItemBean(itemid, goodsid, amount)}}case class ItemBean(itemid:String, goodsid:String, amount:Double)

3)运行结果

5c30e8c2a66c3cac82d58dc2780ed47f.png

实例4:合并多个小文件

1)需求

目标文件中有大量的小文件,会造成分配任务和资源的开销比实际的计算开销还打,这就产生了效率损耗。

需要先把一些小文件合并成一个大文件。

2)代码实践

package mapReduceimport org.apache.spark.sql.SparkSessionobject ManyToOne {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("FlowCount").master("local[1]").getOrCreate()// Optional, help avoid errors due to long lineagespark.sparkContext.setLogLevel("WARN")spark.sparkContext.setCheckpointDir("hdfs://127.0.0.1:9090/tmp/")/* 1.可以将path里的所有文件内容读出 *///spark.read.textFile(path)/* 2.指定文件列表 */val base = "hdfs://127.0.0.1:9090/user/ds/"val fileList = Array(base + "groupsort.txt", base + "flowcount.txt")/* union 连接 */val fileRDD = fileList.map(spark.read.textFile(_))val lineData = fileRDD.reduce((x,y)=> x.union(y))lineData.show(20)spark.stop()}}

3)运行结果

aadc73bc87a0ea13b70c903733858b8e.png

如果数据很大,难以在单机内存上装下,以上操作可能会造成单机内存不足。

转自 "如何使saveAsTextFile只生成一个文件"

在RDD上调用 coalesce(1,true).saveAsTextFile(),

或者,可以调用 repartition(1),它其实是coalesce的一个包装,默认第二个参数为true。

但代价是巨大的,因为Spark面对的是大量的数据,并且是并行执行的,如果强行要求最后只有一个分区,必然导致大量的磁盘IO和网络IO产生。

安全的做法,可以采用HDFS磁盘合并操作:

hadoop fs -cat /hdfs/result/part-* > /data/test.log

hadoop fs -getmerge /hdfs/result /data/test.log




推荐阅读
author-avatar
mobiledu2502906891
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有