热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spark直接将日志写入指定文件实例_7个实例全面掌握SparkMapReduce(上篇)

本文是在学习“7个实例全面掌握HadoopMapReduce”这篇文章后,用Spark重新实现了里面的例子,希望对初学Spark的童学有帮助࿰

本文是在学习 “7个实例全面掌握Hadoop MapReduce” 这篇文章后,用Spark重新实现了里面的例子,希望对初学Spark的童学有帮助,欢迎大牛们留言提意见。

上篇包含4个实例:

  • 1:自定义对象序列化
  • 2:自定义分区
  • 3:计算出每组订单中金额最大的记录
  • 4:合并多个小文件

下篇包含3个实例:

  • 5:分组输出到多个文件
  • 6:join操作
  • 7:计算出用户间的共同好友

实例1:自定义对象序列化

1)需求

需要统计手机用户流量日志,日志内容实例:

150406196aea91c11c6b28553696abff.png

要把同一个用户的上行流量、下行流量进行累加,并计算出综合。

例如上面的13897230503有两条记录,就要对这两条记录进行累加,计算总和,得到:

13897230503,500,1600,2100

2)代码实践

package mapReduceimport org.apache.spark.sql.{Dataset, SparkSession}object FlowCount {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("FlowCount").master("local[1]").getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._// Optional, help avoid errors due to long lineagespark.sparkContext.setLogLevel("WARN")spark.sparkContext.setCheckpointDir("hdfs://127.0.0.1:9090/tmp/")val base = "hdfs://127.0.0.1:9090/user/ds/"val lines = spark.read.textFile(base + "flowcount.txt")/* 生成DataSet */val flowData = lines.map(preparation).toDF("phone", "upFlow", "dFlow", "sumFlow")flowData.show(5)/* 以手机号为key,对数值求和 */val sumData = flowData.groupBy("phone").agg(sum("upFlow"),sum("dFlow"), sum("sumFlow"))sumData.show(5)spark.stop()}def preparation(str: String): FlowBean = {//切分字段val fields = str.split(" ")//assert(fields.size == 4)//组装对象var phone = fields(0).toString.trimvar upFlow = fields(1).toLongvar dFlow = fields(2).toLongFlowBean(phone, upFlow, dFlow, upFlow + dFlow)}}/** phone:手机号,* upFlow:上行流量, dFlow:下行流量, sumFlow:流量合计*/
case class FlowBean(phone:String, upFlow:Long, dFlow:Long, sumFlow:Long)

3)运行结果

1840ead557a0849d72353028f4564aff.png

实例2:自定义分区

1)需求

还是以上个例子的手机用户流量日志为例,

在上个例子的统计需要基础上添加一个新需求:按省份统计,不同省份的手机号放到不同的文件里。

例如137表示属于河北,138属于河南,那么在结果输出时,他们分别在不同的文件中。

2)代码实践

package mapReduceimport org.apache.spark.sql.{Dataset, SparkSession}object FlowPartitioner {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("FlowCount").master("local[1]").getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._// Optional, help avoid errors due to long lineagespark.sparkContext.setLogLevel("WARN")spark.sparkContext.setCheckpointDir("hdfs://127.0.0.1:9090/tmp/")val base = "hdfs://127.0.0.1:9090/user/ds/"val lines = spark.read.textFile(base + "flowcount.txt")/* 指定数据分区器 */val lineData = lines.map(line => (line.split(" ")(0), line)).rdd.partitionBy(new ProvincePartitioner(3))/* 生成DataSet, 持久化避免重复混洗 */val flowData = lineData.values.map(preparation).toDF("phone", "upFlow", "dFlow", "sumFlow").persist()flowData.show(5)spark.stop()}def preparation(str: String): FlowBean02 = {//切分字段val fields = str.split(" ")//assert(fields.size == 4)//组装对象var phone = fields(0).toString.trimvar upFlow = fields(1).toLongvar dFlow = fields(2).toLongFlowBean02(phone, upFlow, dFlow, upFlow + dFlow)}}/** phone:手机号,* upFlow:上行流量, dFlow:下行流量, sumFlow:流量合计*/
case class FlowBean02(phone:String, upFlow:Long, dFlow:Long, sumFlow:Long)import org.apache.spark.Partitioner/** 自定义partitioner* 根据手机号前缀分区*/
class ProvincePartitioner(val num: Int) extends Partitioner {override def numPartitions: Int = numoverride def getPartition(key: Any): Int = {//手机号前3位val prefix = key.toString().substring(0, 3)//用分区个数取模prefix.toInt % num}
}

3)运行结果

62d0d3a60ee0ee64c1a485903f132904.png

实例3:计算出每组订单中金额最大的记录

1)需求

有如下订单数据:

70417d5e0202bd1e70cb91c2dd1779bb.png

求出每一个订单中成交金额最大的一笔交易。

2)代码实践

package mapReduceimport mapReduce.FlowPartitioner.preparation
import org.apache.spark.sql.SparkSessionobject GroupSort {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("FlowCount").master("local[1]").getOrCreate()import spark.implicits._import org.apache.spark.sql.functions._// Optional, help avoid errors due to long lineagespark.sparkContext.setLogLevel("WARN")spark.sparkContext.setCheckpointDir("hdfs://127.0.0.1:9090/tmp/")val base = "hdfs://127.0.0.1:9090/user/ds/"val lines = spark.read.textFile(base + "groupsort.txt")/* 默认分区器是hashcode */val lineData = lines.map(preparation).repartition(3).toDF("itemid", "goodsid", "amount")val sortData = lineData.sortWithinPartitions("itemid", "amount")sortData.show(10)/* 取出金额最大的订单 */val maxData = sortData.groupBy("itemid").agg(max("amount")).sort($"itemid".asc)maxData.show(10)spark.stop()}def preparation(str: String): ItemBean = {//切分字段val fields = str.split(" ")//assert(fields.size == 4)//组装对象var itemid = fields(0).toString.trimvar goodsid = fields(1).toString.trimvar amount = fields(2).toDoubleItemBean(itemid, goodsid, amount)}}case class ItemBean(itemid:String, goodsid:String, amount:Double)

3)运行结果

5c30e8c2a66c3cac82d58dc2780ed47f.png

实例4:合并多个小文件

1)需求

目标文件中有大量的小文件,会造成分配任务和资源的开销比实际的计算开销还打,这就产生了效率损耗。

需要先把一些小文件合并成一个大文件。

2)代码实践

package mapReduceimport org.apache.spark.sql.SparkSessionobject ManyToOne {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("FlowCount").master("local[1]").getOrCreate()// Optional, help avoid errors due to long lineagespark.sparkContext.setLogLevel("WARN")spark.sparkContext.setCheckpointDir("hdfs://127.0.0.1:9090/tmp/")/* 1.可以将path里的所有文件内容读出 *///spark.read.textFile(path)/* 2.指定文件列表 */val base = "hdfs://127.0.0.1:9090/user/ds/"val fileList = Array(base + "groupsort.txt", base + "flowcount.txt")/* union 连接 */val fileRDD = fileList.map(spark.read.textFile(_))val lineData = fileRDD.reduce((x,y)=> x.union(y))lineData.show(20)spark.stop()}}

3)运行结果

aadc73bc87a0ea13b70c903733858b8e.png

如果数据很大,难以在单机内存上装下,以上操作可能会造成单机内存不足。

转自 "如何使saveAsTextFile只生成一个文件"

在RDD上调用 coalesce(1,true).saveAsTextFile(),

或者,可以调用 repartition(1),它其实是coalesce的一个包装,默认第二个参数为true。

但代价是巨大的,因为Spark面对的是大量的数据,并且是并行执行的,如果强行要求最后只有一个分区,必然导致大量的磁盘IO和网络IO产生。

安全的做法,可以采用HDFS磁盘合并操作:

hadoop fs -cat /hdfs/result/part-* > /data/test.log

hadoop fs -getmerge /hdfs/result /data/test.log




推荐阅读
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • 使用Ubuntu中的Python获取浏览器历史记录原文: ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • 本文详细介绍了SQL日志收缩的方法,包括截断日志和删除不需要的旧日志记录。通过备份日志和使用DBCC SHRINKFILE命令可以实现日志的收缩。同时,还介绍了截断日志的原理和注意事项,包括不能截断事务日志的活动部分和MinLSN的确定方法。通过本文的方法,可以有效减小逻辑日志的大小,提高数据库的性能。 ... [详细]
  • 本文介绍了设计师伊振华受邀参与沈阳市智慧城市运行管理中心项目的整体设计,并以数字赋能和创新驱动高质量发展的理念,建设了集成、智慧、高效的一体化城市综合管理平台,促进了城市的数字化转型。该中心被称为当代城市的智能心脏,为沈阳市的智慧城市建设做出了重要贡献。 ... [详细]
  • 在说Hibernate映射前,我们先来了解下对象关系映射ORM。ORM的实现思想就是将关系数据库中表的数据映射成对象,以对象的形式展现。这样开发人员就可以把对数据库的操作转化为对 ... [详细]
  • 本文介绍了Oracle数据库中tnsnames.ora文件的作用和配置方法。tnsnames.ora文件在数据库启动过程中会被读取,用于解析LOCAL_LISTENER,并且与侦听无关。文章还提供了配置LOCAL_LISTENER和1522端口的示例,并展示了listener.ora文件的内容。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • MyBatis错题分析解析及注意事项
    本文对MyBatis的错题进行了分析和解析,同时介绍了使用MyBatis时需要注意的一些事项,如resultMap的使用、SqlSession和SqlSessionFactory的获取方式、动态SQL中的else元素和when元素的使用、resource属性和url属性的配置方式、typeAliases的使用方法等。同时还指出了在属性名与查询字段名不一致时需要使用resultMap进行结果映射,而不能使用resultType。 ... [详细]
  • 本文详细介绍了在ASP.NET中获取插入记录的ID的几种方法,包括使用SCOPE_IDENTITY()和IDENT_CURRENT()函数,以及通过ExecuteReader方法执行SQL语句获取ID的步骤。同时,还提供了使用这些方法的示例代码和注意事项。对于需要获取表中最后一个插入操作所产生的ID或马上使用刚插入的新记录ID的开发者来说,本文提供了一些有用的技巧和建议。 ... [详细]
  • 关键词:Golang, Cookie, 跟踪位置, net/http/cookiejar, package main, golang.org/x/net/publicsuffix, io/ioutil, log, net/http, net/http/cookiejar ... [详细]
  • 本文详细介绍了Spring的JdbcTemplate的使用方法,包括执行存储过程、存储函数的call()方法,执行任何SQL语句的execute()方法,单个更新和批量更新的update()和batchUpdate()方法,以及单查和列表查询的query()和queryForXXX()方法。提供了经过测试的API供使用。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 树莓派语音控制的配置方法和步骤
    本文介绍了在树莓派上实现语音控制的配置方法和步骤。首先感谢博主Eoman的帮助,文章参考了他的内容。树莓派的配置需要通过sudo raspi-config进行,然后使用Eoman的控制方法,即安装wiringPi库并编写控制引脚的脚本。具体的安装步骤和脚本编写方法在文章中详细介绍。 ... [详细]
author-avatar
mobiledu2502906891
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有