本文是在学习 “7个实例全面掌握Hadoop MapReduce” 这篇文章后,用Spark重新实现了里面的例子,希望对初学Spark的童学有帮助,欢迎大牛们留言提意见。
上篇包含4个实例:
- 1:自定义对象序列化
- 2:自定义分区
- 3:计算出每组订单中金额最大的记录
- 4:合并多个小文件
下篇包含3个实例:
- 5:分组输出到多个文件
- 6:join操作
- 7:计算出用户间的共同好友
实例1:自定义对象序列化
1)需求
需要统计手机用户流量日志,日志内容实例:
要把同一个用户的上行流量、下行流量进行累加,并计算出综合。
例如上面的13897230503有两条记录,就要对这两条记录进行累加,计算总和,得到:
13897230503,500,1600,2100
2)代码实践
package mapReduceimport org.apache.spark.sql.{Dataset, SparkSession}object FlowCount {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("FlowCount").master("local[1]").getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._// Optional, help avoid errors due to long lineagespark.sparkContext.setLogLevel("WARN")spark.sparkContext.setCheckpointDir("hdfs://127.0.0.1:9090/tmp/")val base = "hdfs://127.0.0.1:9090/user/ds/"val lines = spark.read.textFile(base + "flowcount.txt")/* 生成DataSet */val flowData = lines.map(preparation).toDF("phone", "upFlow", "dFlow", "sumFlow")flowData.show(5)/* 以手机号为key,对数值求和 */val sumData = flowData.groupBy("phone").agg(sum("upFlow"),sum("dFlow"), sum("sumFlow"))sumData.show(5)spark.stop()}def preparation(str: String): FlowBean = {//切分字段val fields = str.split(" ")//assert(fields.size == 4)//组装对象var phone = fields(0).toString.trimvar upFlow = fields(1).toLongvar dFlow = fields(2).toLongFlowBean(phone, upFlow, dFlow, upFlow + dFlow)}}/** phone:手机号,* upFlow:上行流量, dFlow:下行流量, sumFlow:流量合计*/
case class FlowBean(phone:String, upFlow:Long, dFlow:Long, sumFlow:Long)
3)运行结果
实例2:自定义分区
1)需求
还是以上个例子的手机用户流量日志为例,
在上个例子的统计需要基础上添加一个新需求:按省份统计,不同省份的手机号放到不同的文件里。
例如137表示属于河北,138属于河南,那么在结果输出时,他们分别在不同的文件中。
2)代码实践
package mapReduceimport org.apache.spark.sql.{Dataset, SparkSession}object FlowPartitioner {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("FlowCount").master("local[1]").getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._// Optional, help avoid errors due to long lineagespark.sparkContext.setLogLevel("WARN")spark.sparkContext.setCheckpointDir("hdfs://127.0.0.1:9090/tmp/")val base = "hdfs://127.0.0.1:9090/user/ds/"val lines = spark.read.textFile(base + "flowcount.txt")/* 指定数据分区器 */val lineData = lines.map(line => (line.split(" ")(0), line)).rdd.partitionBy(new ProvincePartitioner(3))/* 生成DataSet, 持久化避免重复混洗 */val flowData = lineData.values.map(preparation).toDF("phone", "upFlow", "dFlow", "sumFlow").persist()flowData.show(5)spark.stop()}def preparation(str: String): FlowBean02 = {//切分字段val fields = str.split(" ")//assert(fields.size == 4)//组装对象var phone = fields(0).toString.trimvar upFlow = fields(1).toLongvar dFlow = fields(2).toLongFlowBean02(phone, upFlow, dFlow, upFlow + dFlow)}}/** phone:手机号,* upFlow:上行流量, dFlow:下行流量, sumFlow:流量合计*/
case class FlowBean02(phone:String, upFlow:Long, dFlow:Long, sumFlow:Long)import org.apache.spark.Partitioner/** 自定义partitioner* 根据手机号前缀分区*/
class ProvincePartitioner(val num: Int) extends Partitioner {override def numPartitions: Int = numoverride def getPartition(key: Any): Int = {//手机号前3位val prefix = key.toString().substring(0, 3)//用分区个数取模prefix.toInt % num}
}
3)运行结果
实例3:计算出每组订单中金额最大的记录
1)需求
有如下订单数据:
求出每一个订单中成交金额最大的一笔交易。
2)代码实践
package mapReduceimport mapReduce.FlowPartitioner.preparation
import org.apache.spark.sql.SparkSessionobject GroupSort {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("FlowCount").master("local[1]").getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._// Optional, help avoid errors due to long lineagespark.sparkContext.setLogLevel("WARN")spark.sparkContext.setCheckpointDir("hdfs://127.0.0.1:9090/tmp/")val base = "hdfs://127.0.0.1:9090/user/ds/"val lines = spark.read.textFile(base + "groupsort.txt")/* 默认分区器是hashcode */val lineData = lines.map(preparation).repartition(3).toDF("itemid", "goodsid", "amount")val sortData = lineData.sortWithinPartitions("itemid", "amount")sortData.show(10)/* 取出金额最大的订单 */val maxData = sortData.groupBy("itemid").agg(max("amount")).sort($"itemid".asc)maxData.show(10)spark.stop()}def preparation(str: String): ItemBean = {//切分字段val fields = str.split(" ")//assert(fields.size == 4)//组装对象var itemid = fields(0).toString.trimvar goodsid = fields(1).toString.trimvar amount = fields(2).toDoubleItemBean(itemid, goodsid, amount)}}case class ItemBean(itemid:String, goodsid:String, amount:Double)
3)运行结果
实例4:合并多个小文件
1)需求
目标文件中有大量的小文件,会造成分配任务和资源的开销比实际的计算开销还打,这就产生了效率损耗。
需要先把一些小文件合并成一个大文件。
2)代码实践
package mapReduceimport org.apache.spark.sql.SparkSessionobject ManyToOne {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("FlowCount").master("local[1]").getOrCreate()// Optional, help avoid errors due to long lineagespark.sparkContext.setLogLevel("WARN")spark.sparkContext.setCheckpointDir("hdfs://127.0.0.1:9090/tmp/")/* 1.可以将path里的所有文件内容读出 *///spark.read.textFile(path)/* 2.指定文件列表 */val base = "hdfs://127.0.0.1:9090/user/ds/"val fileList = Array(base + "groupsort.txt", base + "flowcount.txt")/* union 连接 */val fileRDD = fileList.map(spark.read.textFile(_))val lineData = fileRDD.reduce((x,y)=> x.union(y))lineData.show(20)spark.stop()}}
3)运行结果
如果数据很大,难以在单机内存上装下,以上操作可能会造成单机内存不足。
转自 "如何使saveAsTextFile只生成一个文件"
在RDD上调用 coalesce(1,true).saveAsTextFile()
,
或者,可以调用 repartition(1),它其实是coalesce的一个包装,默认第二个参数为true。
但代价是巨大的,因为Spark面对的是大量的数据,并且是并行执行的,如果强行要求最后只有一个分区,必然导致大量的磁盘IO和网络IO产生。
安全的做法,可以采用HDFS磁盘合并操作:
hadoop fs -cat /hdfs/result/part-* > /data/test.log
或
hadoop fs -getmerge /hdfs/result /data/test.log