热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

FileBeat+Flume+Kafka+HDFS+Neo4j+SparkStreaming+MySQL:【案例】三度关系推荐V1.0版本11:每周一计算最近一月主播视频评级

一、数据计算步骤汇总下面我们通过文字梳理一下具体的数据计算步骤。第一步:历史粉丝关注数据初始化第二步:实时维护粉丝关注数据第三步:每天定

一、数据计算步骤汇总

下面我们通过文字梳理一下具体的数据计算步骤。
第一步:历史粉丝关注数据初始化
第二步:实时维护粉丝关注数据
第三步:每天定时更新主播等级
第四步:每天定时更新用户活跃时间
第五步:每周一计算最近一月主播视频评级
第六步:每周一计算最近一周内主播主播的三度关系列表。
第七步:三度关系列表数据导出到MySQL


二、每周一计算最近一月主播视频评级


1、数据分析

视频数据来源于服务端,当主播开播结束后会产生一条视频数据
数据格式:

{"id":"1769913943534","uid":"1000","nickname":"jack94","gold":284,"watchnumpv":284,"watchnumuv":284,"hosts":284,"nofollower":284,"looktime":284,"smlook":284,"follower":284,"gifter":284,"length":384, "area":"A_US","rating":"A","exp":284,"timestamp":1769913940000,"type":"video_info"}

2、生成数据

之前我们通过埋点模拟上报数据,通过flume落盘到hdfs上面,这样在hdfs上面产生的目录会使用当天日期,为了保证我这里使用的目录和大家都保持一致,所以在这我就生成一个固定的日期目录
使用代码GenerateVideoInfoDataV2,在代码中指定日期 2026-02-01,这样会把模拟生成的用户活跃数据直接上传到hdfs上面,因为之前的数据采集流程我们已经详细分析过了,所以在这就直接把数据上传到hdfs上面了。

执行代码:GenerateVideoInfoDataV2,将会把数据上传到hdfs的这个目录下
hdfs://bigdata01:9000/data/video_info/20260201/

[root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /data/video_info/20260201
Found 1 items
-rw-r--r-- 3 yehua supergroup 2699 2026-02-14 21:32 /data/video_info/20260201/video_info-2026-02-01.log

这个任务需要做的就是统计最近一个月内主播的视频评级信息
在这我们先初始化一天的数据即可,计算一天和计算一个月的数据,计算逻辑是一样的,只有spark任务的输入路径不一样
如果是一个月的数据,假设这一个月有30天,则需要把这30天对应的30个目录使用逗号分隔,拼接成一个字符串,作为Spark任务的输入即可。

为什么这个任务要每周计算一次,而不是每天计算一次呢?
因为很多主播不会每天都开播,所以我们每天都计算意义不大,均衡考虑之后按照每周计算一次这个频率。


3、创建项目

创建子module项目:update_video_info
创建scala目录,引入scala2.11版本的sdk
在scala目录中创建包:com.imooc.spark


(1)引入依赖

org.apache.sparkspark-core_2.11

org.neo4j.driverneo4j-java-driver

com.alibabafastjson


(2) 创建代码

创建类:UpdateVideoInfoScala
代码如下:

package com.imooc.sparkimport com.alibaba.fastjson.JSON
import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase}
import org.slf4j.LoggerFactory/*** 任务5:* 每周一计算最近一个月主播视频评级* 把最近几次视频评级在3B+或2A+的主播,在neo4j中设置flag=1** 注意:在执行程序之前需要先把flag=1的重置为0* */
object UpdateVideoInfoScala {val logger = LoggerFactory.getLogger("UpdateVideoInfo")def main(args: Array[String]): Unit = {var masterUrl = "local"var appName = "UpdateVideoInfo"var filePath = "hdfs://bigdata01:9000/data/video_info/20260201"var boltUrl = "bolt://bigdata04:7687"var username = "neo4j"var password = "admin"if(args.length > 0){masterUrl = args(0)appName = args(1)filePath = args(2)boltUrl = args(3)username = args(4)password = args(5)}//在Driver端执行此代码,将flag=1的重置为0//获取neo4j连接val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))//开启一个会话val session = driver.session()session.run("match(a:User) where a.flag=1 set a.flag=0")//关闭会话session.close()//关闭连接driver.close()//获取SparkContextval conf = new SparkConf().setAppName(appName).setMaster(masterUrl)val sc = new SparkContext(conf)//读取视频评级数据val linesRDD = sc.textFile(filePath)//解析数据中的uid,rating,timestampval tup3RDD = linesRDD.map(line => {try {val jsonObj = JSON.parseObject(line)val uid = jsonObj.getString("uid")val rating = jsonObj.getString("rating")val timestamp: Long = jsonObj.getLong("timestamp")(uid, rating, timestamp)} catch {case ex: Exception => logger.error("json数据解析失败:" + line)("0", "0", 0L)}})//过滤掉异常数据val filterRDD = tup3RDD.filter(_._2 != "0")//获取用户最近3场直播的评级信息val top3RDD = filterRDD.groupBy(_._1).map(group=>{val top3 = group._2.toList.sortBy(_._3).reverse.take(3).mkString("\t")(group._1,top3)})//过滤出来满足3场B+的数据val top3BRDD = top3RDD.filter(tup => {var flag = falseval fields = tup._2.split("\t")if (fields.length == 3) {//3场B+,表示里面没有出现C和Dval tmp_str = fields(0).split(",")(1) + "," + fields(1).split(",")(1) + "," + fields(2).split(",")(1)if (!tmp_str.contains("C") && !tmp_str.contains("D")) {flag = true}}flag})//把满足3场B+的数据更新到neo4j中,增加一个字段flag,flag=1表示是视频评级满足条件的主播,允许推荐给用户//注意:针对3场B+的数据还需要额外再限制一下主播等级,主播等级需要>=15,这样可以保证筛选出来的主播尽可能是一些优质主播top3BRDD.foreachPartition(it=>{//获取neo4j连接val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))//开启一个会话val session = driver.session()it.foreach(tup=>{session.run("match(a:User {uid: '"+tup._1+"'}) where a.level >=15 set a.flag = 1")})//关闭会话session.close()//关闭连接driver.close()})//过滤出来满足2场A+的数据val top2ARDD = top3RDD.filter(tup=>{var flag = falseval fields = tup._2.split("\t")if (fields.length >= 2) {//2场A+,获取最近两场直播评级,里面不能出现B、C、Dval tmp_str = fields(0).split(",")(1) + "," + fields(1).split(",")(1)if (!tmp_str.contains("B") && !tmp_str.contains("C") && !tmp_str.contains("D")) {flag = true}}flag})//把满足2场A+的数据更新到neo4j中,增加一个字段flag,flag=1表示是视频评级满足条件的主播,允许推荐给用户//注意:针对2场A+的数据还需要额外再限制一下主播等级,主播等级需要>=4,这样可以保证筛选出来的主播尽可能是一些优质主播top2ARDD.foreachPartition(it=>{//获取neo4j连接val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))//开启一个会话val session = driver.session()it.foreach(tup=>{session.run("match(a:User {uid: '"+tup._1+"'}) where a.level >=4 set a.flag = 1")})//关闭会话session.close()//关闭连接driver.close()})}}

4、本地执行

在本地执行代码
然后到neo4j的web界面查看结果,发现只有uid为1005的数据对应的flag不等于1(没有flag属性)
这样是正确的。


5、开发提交任务脚本

下面开发任务执行脚本
注意:这个脚本中需要实现获取最近一个月的数据目录
startUpdateVideoInfo.sh

#!/bin/bash# 获取最近一个月的文件目录
#filePath=""
#for((i&#61;1;i<&#61;30;i&#43;&#43;))
#do
# filePath&#43;&#61;"hdfs://bigdata01:9000/data/video_info/"&#96;date -d "$i days ago" &#43;"%Y%m%d"&#96;,
#done#默认获取昨天时间
dt&#61;&#96;date -d "1 days ago" &#43;"%Y%m%d"&#96;
if [ "x$1" !&#61; "x" ]
thendt&#61;$1
fi
#HDFS输入数据路径
filePath&#61;"hdfs://bigdata01:9000/data/video_info/${dt}"masterUrl&#61;"yarn-cluster"
master&#61;&#96;echo ${masterUrl} | awk -F&#39;-&#39; &#39;{print $1}&#39;&#96;
deployMode&#61;&#96;echo ${masterUrl} | awk -F&#39;-&#39; &#39;{print $2}&#39;&#96;# 组装一个唯一的名称
appName&#61;"UpdateVideoInfoScala"&#96;date &#43;%s&#96;
boltUrl&#61;"bolt://bigdata04:7687"
username&#61;"neo4j"
password&#61;"admin"yarnCommonLib&#61;"hdfs://bigdata01:9000/yarnCommonLib"spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.imooc.spark.UpdateVideoInfoScala \
--jars ${yarnCommonLib}/fastjson-1.2.68.jar,${yarnCommonLib}/neo4j-java-driver-4.1.1.jar,${yarnCommonLib}/reactive-streams-1.0.3.jar \
/data/soft/video_recommend/jobs/update_video_info-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${filePath} ${boltUrl} ${username} ${password}#验证任务执行状态
appStatus&#61;&#96;yarn application -appStates FINISHED -list | grep ${appName} | awk &#39;{print $7}&#39;&#96;
if [ "${appStatus}" !&#61; "SUCCEEDED" ]
thenecho "任务执行失败"# 发送短信或者邮件
elseecho "任务执行成功"
fi

6、配置打包

对项目代码编译打包&#xff0c;在pom.xml中添加打包配置

org.apache.maven.pluginsmaven-compiler-plugin3.6.01.81.8UTF-8net.alchim31.mavenscala-maven-plugin3.1.62.112.11.12compile-scalacompileadd-sourcecompiletest-compile-scalatest-compileadd-sourcetestCompile


7、打包

打jar包

D:\IdeaProjects\db_video_recommend\update_video_info>mvn clean package -DskipTests
[INFO] Scanning for projects...
[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) &#64; update_video_info ---
[INFO] Building jar: D:\IdeaProjects\db_video_recommend\update_video_info\target\update_video_info-1.0-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 7.793s
[INFO] Final Memory: 23M/619M
[INFO] ------------------------------------------------------------------------

8、上传jar包和脚本

将jar包和任务执行脚本上传到bigdata04机器上面

[root&#64;bigdata04 jobs]# ll
-rw-r--r--. 1 root root 1461 Aug 31 2020 startUpdateVideoInfo.sh
-rw-r--r--. 1 root root 17242 Aug 31 2020 update_video_info-1.0-SNAPSHOT.jar

9、提交任务、验证

向集群中提交任务

[root&#64;bigdata04 jobs]# sh -x startUpdateVideoInfo.sh 20260201

到集群中验证任务执行状态&#xff0c;发现任务执行成功&#xff0c;此时neo4j中的数据还是老样子&#xff0c;因为刚才我们已经在本地执行过一次了&#xff0c;重复再执行对结果没影响。

在这里插入图片描述


推荐阅读
  • 本文介绍了如何使用Flume从Linux文件系统收集日志并存储到HDFS,然后通过MapReduce清洗数据,使用Hive进行数据分析,并最终通过Sqoop将结果导出到MySQL数据库。 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 流处理中的计数挑战与解决方案
    本文探讨了在流处理中进行计数的各种技术和挑战,并基于作者在2016年圣何塞举行的Hadoop World大会上的演讲进行了深入分析。文章不仅介绍了传统批处理和Lambda架构的局限性,还详细探讨了流处理架构的优势及其在现代大数据应用中的重要作用。 ... [详细]
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • 电商高并发解决方案详解
    本文以京东为例,详细探讨了电商中常见的高并发解决方案,包括多级缓存和Nginx限流技术,旨在帮助读者更好地理解和应用这些技术。 ... [详细]
  • mybatis 详解(七)一对一、一对多、多对多
    mybatis详解(七)------一 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • Zookeeper作为Apache Hadoop生态系统中的一个重要组件,主要致力于解决分布式应用中的常见数据管理难题。它提供了统一的命名服务、状态同步服务以及集群管理功能,有效提升了分布式系统的可靠性和可维护性。此外,Zookeeper还支持配置管理和临时节点管理,进一步增强了其在复杂分布式环境中的应用价值。 ... [详细]
  • 深入理解Flink的水印机制
    本文详细探讨了Apache Flink框架中的水印机制,这是一种用于处理数据流中时间不一致问题的重要工具。通过介绍水印的工作原理及其在实际应用中的实现方式,帮助读者更好地理解和利用这一功能。 ... [详细]
  • 零拷贝技术是提高I/O性能的重要手段,常用于Java NIO、Netty、Kafka等框架中。本文将详细解析零拷贝技术的原理及其应用。 ... [详细]
  • 通过使用Sqoop导入工具,可以精确控制并高效地将表数据的特定子集导入到HDFS中。具体而言,可以通过在导入命令中添加WHERE子句来指定所需的数据范围,从而在数据库服务器上执行相应的SQL查询,并将查询结果高效地存储到HDFS中。这种方法不仅提高了数据导入的灵活性,还确保了数据的准确性和完整性。 ... [详细]
  • Hadoop平台警告解决:无法加载本机Hadoop库的全面应对方案
    本文探讨了在Hadoop平台上遇到“无法加载本机Hadoop库”警告的多种解决方案。首先,通过修改日志配置文件来忽略该警告,这一方法被证明是有效的。其次,尝试指定本地库的路径,但未能解决问题。接着,尝试不使用Hadoop本地库,同样没有效果。然后,通过替换现有的Hadoop本地库,成功解决了问题。最后,根据Hadoop的源代码自行编译本地库,也达到了预期的效果。以上方法适用于macOS系统。 ... [详细]
  • Apache Hadoop HDFS QJournalProtocol 中 getJournalCTime 方法的应用与代码实例分析 ... [详细]
  • 【漫画解析】数据已删,存储空间为何未减?揭秘背后真相
    在数据迁移过程中,即使删除了原有数据,存储空间却未必会相应减少。本文通过漫画形式解析了这一现象背后的真相。具体来说,使用 `mysqldump` 命令进行数据导出时,该工具作为 MySQL 的逻辑备份工具,通过连接数据库并查询所需数据,将其转换为 SQL 语句。然而,这种操作并不会立即释放存储空间,因为数据库系统可能保留了已删除数据的碎片信息。文章进一步探讨了如何优化存储管理,以确保数据删除后能够有效回收存储空间。 ... [详细]
  • 在搭建Hadoop集群以处理大规模数据存储和频繁读取需求的过程中,经常会遇到各种配置难题。本文总结了作者在实际部署中遇到的典型问题,并提供了详细的解决方案,帮助读者避免常见的配置陷阱。通过这些经验分享,希望读者能够更加顺利地完成Hadoop集群的搭建和配置。 ... [详细]
author-avatar
豆豆bo69_550
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有