热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark系列十七:经典案列使用直连的方式,Kafka,SparkSteaming,Redis

先一个一个java程序,读取日志文件中的数据,然后将数据写入到Kafka中,然后写一个SparkSteaming程序,使用直连的方式读取Kafka中的数据,计算如下指标该文件是一个

先一个一个java程序,读取日志文件中的数据,然后将数据写入到Kafka中,然后写一个SparkSteaming程序,使用直连的方式读取Kafka中的数据,计算如下指标

该文件是一个电商网站某一天用户购买商品的订单成交数据,每一行有多个字段,用空格分割,字段的含义如下
用户ID   ip地址          商品分类   购买明细     商品金额
A        202.106.196.115 手机       iPhone8      8000

0          1                   2        3          4

A 202.106.196.115 手机 iPhone8 8000
B 202.106.0.20 服装 布莱奥尼西服 199
C 202.102.152.3 家具 婴儿床 2000
D 202.96.96.68 家电 电饭锅 1000
F 202.98.0.68 化妆品 迪奥香水 200
H 202.96.75.68 食品 奶粉 600
J 202.97.229.133 图书 Hadoop编程指南 90
A 202.106.196.115 手机 手机壳 200
B 202.106.0.20 手机 iPhone8 8000
C 202.102.152.3 家具 婴儿车 2000
D 202.96.96.68 家具 婴儿车 1000
F 202.98.0.68 化妆品 迪奥香水 200
H 202.96.75.68 食品 婴儿床 600
J 202.97.229.133 图书 spark实战 80

问题1.计算出各个省的成交量总额(结果保存到MySQL中)
问题2.计算每个省城市成交量的top3(结果保存到MySQL中)
问题3.计算每个商品分类的成交总额,并按照从高到低排序(结果保存到MySQL中)
问题4.构建每一个用户的用户画像,就是根据用户购买的具体商品,给用户打上一个标签,为将来的商品推荐系统作数据支撑

说明:如果一个用户购买了一个iPhone8,对应有多个标签:果粉、高端人士、数码一族
请将下面的规则数据保存到MySQL数据库中,并作为标签规则(三个字段分别代表id、商品、对于的标签):

1 iPhone8 果粉
2 iPhone8 高端人士
3 iPhone8 数码一族
4 布莱奥尼西服 高端人士
5 布莱奥尼西服 商务男士
6 婴儿床 育儿中
7 迪奥香水 高端人士
8 迪奥香水 白富美
9 婴儿床 育儿中
10 iPhone8手机壳 果粉
11 iPhone8手机壳 高端人士
12 iPhone8手机壳 数码一族
13 spark实战 IT人士
14 spark实战 屌丝
15 Hadoop编程指南 IT人士
16 Hadoop编程指南 屌丝

用户的行为数据,根据规则打上对应的标签,然后将数据存储到Hbase中,并说明Hbase的注解和列族的设计思想!
由于用户的行为过多,计算过程要对数据进行序列化的压缩,要求使用kryo这种序列化机制,压缩方式自己选择

 

《Spark系列十七:经典案列使用直连的方式,Kafka,SparkSteaming,Redis》

 

object OrderCount {
def main(args: Array[String]): Unit = {
val group = "g1"
val cOnf= new SparkConf().setAppName("OrderCount").setMaster("local[4]")
val ssc = new StreamingContext(conf, Duration(5000))
val broadcastRef = IPUtils.broadcastIpRules(ssc, "/ip/ip.txt")
val topic = "orders"
val brokerList = "lj01:9092,lj02:9092,lj03:9092"
val zkQuorum = "lj01:2181,lj02:2181,lj03:2181"
val topics: Set[String] = Set(topic)
val topicDirs = new ZKGroupTopicDirs(group, topic)
val zkTopicPath = s"${topicDirs.consumerOffsetDir}"
val kafkaParams = Map(
"metadata.broker.list" -> brokerList,
"group.id" -> group,
"auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString
)
val zkClient = new ZkClient(zkQuorum)
val children = zkClient.countChildren(zkTopicPath)
var kafkaStream: InputDStream[(String, String)] = null
var fromOffsets: Map[TopicAndPartition, Long] = Map()
//如果保存过 offset
//注意:偏移量的查询是在Driver完成的
if (children > 0) {
for (i <- 0 until children) {
val partitiOnOffset= zkClient.readData[String](s"$zkTopicPath/${i}")
val tp = TopicAndPartition(topic, i)
fromOffsets += (tp -> partitionOffset.toLong)
}
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message())
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
} else {
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
}
var offsetRanges = Array[OffsetRange]()
//kafkaStream.foreachRDD里面的业务逻辑是在Driver端执行,RDD在Driver端生成,RDD调算子,算子里得函数的执行是在Executor
kafkaStream.foreachRDD { kafkaRDD =>
if(!kafkaRDD.isEmpty()) {
offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
val lines: RDD[String] = kafkaRDD.map(_._2)
//整理数据
val fields: RDD[Array[String]] = lines.map(_.split(" "))
//计算成交总金额
CalculateUtil.calculateIncome(fields)
//计算商品分类金额
CalculateUtil.calculateItem(fields)
//计算区域成交金额
CalculateUtil.calculateZone(fields, broadcastRef) for (o <- offsetRanges) {
val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"
ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString)
}
}
}
ssc.start()
ssc.awaitTermination()
}
}

实现要在redis设置一个key,变量

object Constant {
val TOTAL_INCOME = "TOTAL_INCOME"
}

 工具类CalculateUtil 

object CalculateUtil {
//计算成交总金额
def calculateIncome(fields: RDD[Array[String]]) = {
//将数据计算后写入到Reids
val priceRDD: RDD[Double] = fields.map(arr => {
val price = arr(4).toDouble
price
})
//reduce是一个Action,会把结果返回到Driver端
//将当前批次的总金额返回了
val sum: Double = priceRDD.reduce(_+_)
//获取一个jedis连接
val cOnn= JedisConnectionPool.getConnection()
//将历史值和当前的值进行累加
conn.incrByFloat(Constant.TOTAL_INCOME, sum)
//释放连接
conn.close()
}
// 计算分类的成交金额
def calculateItem(fields: RDD[Array[String]]) = {
val itemAndPrice: RDD[(String, Double)] = fields.map(arr => {
//分类
val item = arr(2)
//金额
val parice = arr(4).toDouble
(item, parice)
})
//按商品分类进行聚合
val reduced: RDD[(String, Double)] = itemAndPrice.reduceByKey(_+_)
//将当前批次的数据累加到Redis中
//foreachPartition是一个Action
//现在这种方式,jeids的连接是在哪一端创建的(Driver)
//在Driver端拿Jedis连接不好
reduced.foreachPartition(part => {
//获取一个Jedis连接
//这个连接其实是在Executor中的获取的
//JedisConnectionPool在一个Executor进程中有几个实例(单例)
val cOnn= JedisConnectionPool.getConnection()
part.foreach(t => {
//一个连接更新多条数据
conn.incrByFloat(t._1, t._2)
})
//将当前分区中的数据跟新完在关闭连接
conn.close()
})
}
//根据Ip计算归属地
def calculateZone(fields: RDD[Array[String]], broadcastRef: Broadcast[Array[(Long, Long, String)]]) = {
val provinceAndPrice: RDD[(String, Double)] = fields.map(arr => {
val ip = arr(1)
val price = arr(4).toDouble
val ipNum = MyUtils.ip2Long(ip)
//在Executor中获取到广播的全部规则
val allRules: Array[(Long, Long, String)] = broadcastRef.value
//二分法查找
val index = MyUtils.binarySearch(allRules, ipNum)
var province = "未知"
if (index != -1) {
province = allRules(index)._3
}
//省份,订单金额
(province, price)
})
//按省份进行聚合
val reduced: RDD[(String, Double)] = provinceAndPrice.reduceByKey(_+_)
//将数据跟新到Redis
reduced.foreachPartition(part => {
val cOnn= JedisConnectionPool.getConnection()
part.foreach(t => {
conn.incrByFloat(t._1, t._2)
})
conn.close()
})
}
}

即Myutilt

object MyUtils {
def ip2Long(ip: String): LOng= {
val fragments = ip.split("[.]")
var ipNum = 0L
for (i <- 0 until fragments.length){
ipNum = fragments(i).toLong | ipNum <<8L
}
ipNum
}
def readRules(path: String): Array[(Long, Long, String)] = {
//读取ip规则
val bf: BufferedSource = Source.fromFile(path)
val lines: Iterator[String] = bf.getLines()
//对ip规则进行整理,并放入到内存
val rules: Array[(Long, Long, String)] = lines.map(line => {
val fileds = line.split("[|]")
val startNum = fileds(2).toLong
val endNum = fileds(3).toLong
val province = fileds(6)
(startNum, endNum, province)
}).toArray
rules
}
def binarySearch(lines: Array[(Long, Long, String)], ip: Long) : Int = {
var low = 0
var high = lines.length - 1
while (low <= high) {
val middle = (low + high) / 2
if ((ip >= lines(middle)._1) && (ip <= lines(middle)._2))
return middle
if (ip high = middle - 1
else {
low = middle + 1
}
}
-1
}
def data2MySQL(it: Iterator[(String, Int)]): Unit = {
//一个迭代器代表一个分区,分区中有多条数据
//先获得一个JDBC连接
val conn: COnnection= DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123568")
//将数据通过Connection写入到数据库
val pstm: PreparedStatement = conn.prepareStatement("INSERT INTO access_log VALUES (?, ?)")
//将分区中的数据一条一条写入到MySQL中
it.foreach(tp => {
pstm.setString(1, tp._1)
pstm.setInt(2, tp._2)
pstm.executeUpdate()
})
//将分区中的数据全部写完之后,在关闭连接
if(pstm != null) {
pstm.close()
}
if (conn != null) {
conn.close()
}
}
def main(args: Array[String]): Unit = {
//数据是在内存中
val rules: Array[(Long, Long, String)] = readRules("/Users/zx/Desktop/ip/ip.txt")
//将ip地址转换成十进制
val ipNum = ip2Long("114.215.43.42")
//查找
val index = binarySearch(rules, ipNum)
//根据脚本到rules中查找对应的数据
val tp = rules(index)
val province = tp._3
println(province)
}
}

 


推荐阅读
  • 在《Cocos2d-x学习笔记:基础概念解析与内存管理机制深入探讨》中,详细介绍了Cocos2d-x的基础概念,并深入分析了其内存管理机制。特别是针对Boost库引入的智能指针管理方法进行了详细的讲解,例如在处理鱼的运动过程中,可以通过编写自定义函数来动态计算角度变化,利用CallFunc回调机制实现高效的游戏逻辑控制。此外,文章还探讨了如何通过智能指针优化资源管理和避免内存泄漏,为开发者提供了实用的编程技巧和最佳实践。 ... [详细]
  • 优化Vite 1.0至2.0升级过程中遇到的某些代码块过大问题解决方案
    本文详细探讨了在将项目从 Vite 1.0 升级到 2.0 的过程中,如何解决某些代码块过大的问题。通过具体的编码示例,文章提供了全面的解决方案,帮助开发者有效优化打包性能。 ... [详细]
  • 服务器部署中的安全策略实践与优化
    服务器部署中的安全策略实践与优化 ... [详细]
  • Hadoop 2.6 主要由 HDFS 和 YARN 两大部分组成,其中 YARN 包含了运行在 ResourceManager 的 JVM 中的组件以及在 NodeManager 中运行的部分。本文深入探讨了 Hadoop 2.6 日志文件的解析方法,并详细介绍了 MapReduce 日志管理的最佳实践,旨在帮助用户更好地理解和优化日志处理流程,提高系统运维效率。 ... [详细]
  • 在第二课中,我们将深入探讨Scala的面向对象编程核心概念及其在Spark源码中的应用。首先,通过详细的实战案例,全面解析Scala中的类和对象。作为一门纯面向对象的语言,Scala的类设计和对象使用是理解其面向对象特性的关键。此外,我们还将介绍如何通过阅读Spark源码来进一步巩固对这些概念的理解。这不仅有助于提升编程技能,还能为后续的高级应用开发打下坚实的基础。 ... [详细]
  • 全面解析JavaScript代码注释技巧与标准规范
    在Web前端开发中,JavaScript代码的可读性和维护性至关重要。本文将详细介绍如何有效地使用注释来提高代码的可读性,并探讨JavaScript代码注释的最佳实践和标准规范。通过合理的注释,开发者可以更好地理解和维护复杂的代码逻辑,提升团队协作效率。 ... [详细]
  • Hadoop平台警告解决:无法加载本机Hadoop库的全面应对方案
    本文探讨了在Hadoop平台上遇到“无法加载本机Hadoop库”警告的多种解决方案。首先,通过修改日志配置文件来忽略该警告,这一方法被证明是有效的。其次,尝试指定本地库的路径,但未能解决问题。接着,尝试不使用Hadoop本地库,同样没有效果。然后,通过替换现有的Hadoop本地库,成功解决了问题。最后,根据Hadoop的源代码自行编译本地库,也达到了预期的效果。以上方法适用于macOS系统。 ... [详细]
  • 为了在Hadoop 2.7.2中实现对Snappy压缩和解压功能的原生支持,本文详细介绍了如何重新编译Hadoop源代码,并优化其Native编译过程。通过这一优化,可以显著提升数据处理的效率和性能。此外,还探讨了编译过程中可能遇到的问题及其解决方案,为用户提供了一套完整的操作指南。 ... [详细]
  • V8不仅是一款著名的八缸发动机,广泛应用于道奇Charger、宾利Continental GT和BossHoss摩托车中。自2008年以来,作为Chromium项目的一部分,V8 JavaScript引擎在性能优化和技术创新方面取得了显著进展。该引擎通过先进的编译技术和高效的垃圾回收机制,显著提升了JavaScript的执行效率,为现代Web应用提供了强大的支持。持续的优化和创新使得V8在处理复杂计算和大规模数据时表现更加出色,成为众多开发者和企业的首选。 ... [详细]
  • 开发日志:201521044091 《Java编程基础》第11周学习心得与总结
    开发日志:201521044091 《Java编程基础》第11周学习心得与总结 ... [详细]
  • 数字图书馆近期展出了一批精选的Linux经典著作,这些书籍虽然部分较为陈旧,但依然具有重要的参考价值。如需转载相关内容,请务必注明来源:小文论坛(http://www.xiaowenbbs.com)。 ... [详细]
  • 本文详细介绍了如何在Java Web服务器上部署音视频服务,并提供了完整的验证流程。以AnyChat为例,这是一款跨平台的音视频解决方案,广泛应用于需要实时音视频交互的项目中。通过具体的部署步骤和测试方法,确保了音视频服务的稳定性和可靠性。 ... [详细]
  • HBase Java API 进阶:过滤器详解与应用实例
    本文详细探讨了HBase 1.2.6版本中Java API的高级应用,重点介绍了过滤器的使用方法和实际案例。首先,文章对几种常见的HBase过滤器进行了概述,包括列前缀过滤器(ColumnPrefixFilter)和时间戳过滤器(TimestampsFilter)。此外,还详细讲解了分页过滤器(PageFilter)的实现原理及其在大数据查询中的应用场景。通过具体的代码示例,读者可以更好地理解和掌握这些过滤器的使用技巧,从而提高数据处理的效率和灵活性。 ... [详细]
  • 如何高效启动大数据应用之旅?
    在前一篇文章中,我探讨了大数据的定义及其与数据挖掘的区别。本文将重点介绍如何高效启动大数据应用项目,涵盖关键步骤和最佳实践,帮助读者快速踏上大数据之旅。 ... [详细]
  • 在搭建Hadoop集群以处理大规模数据存储和频繁读取需求的过程中,经常会遇到各种配置难题。本文总结了作者在实际部署中遇到的典型问题,并提供了详细的解决方案,帮助读者避免常见的配置陷阱。通过这些经验分享,希望读者能够更加顺利地完成Hadoop集群的搭建和配置。 ... [详细]
author-avatar
余夫子曰
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有