下面我们通过文字梳理一下具体的数据计算步骤。
第一步:历史粉丝关注数据初始化
第二步:实时维护粉丝关注数据
第三步:每天定时更新主播等级
第四步:每天定时更新用户活跃时间
第五步:每周一计算最近一月主播视频评级
第六步:每周一计算最近一周内主播主播的三度关系列表。
第七步:三度关系列表数据导出到MySQL
视频数据来源于服务端,当主播开播结束后会产生一条视频数据
数据格式:
{"id":"1769913943534","uid":"1000","nickname":"jack94","gold":284,"watchnumpv":284,"watchnumuv":284,"hosts":284,"nofollower":284,"looktime":284,"smlook":284,"follower":284,"gifter":284,"length":384, "area":"A_US","rating":"A","exp":284,"timestamp":1769913940000,"type":"video_info"}
之前我们通过埋点模拟上报数据,通过flume落盘到hdfs上面,这样在hdfs上面产生的目录会使用当天日期,为了保证我这里使用的目录和大家都保持一致,所以在这我就生成一个固定的日期目录
使用代码GenerateVideoInfoDataV2,在代码中指定日期 2026-02-01,这样会把模拟生成的用户活跃数据直接上传到hdfs上面,因为之前的数据采集流程我们已经详细分析过了,所以在这就直接把数据上传到hdfs上面了。
执行代码:GenerateVideoInfoDataV2,将会把数据上传到hdfs的这个目录下
hdfs://bigdata01:9000/data/video_info/20260201/
[root@bigdata01 hadoop-3.2.0]# hdfs dfs -ls /data/video_info/20260201
Found 1 items
-rw-r--r-- 3 yehua supergroup 2699 2026-02-14 21:32 /data/video_info/20260201/video_info-2026-02-01.log
这个任务需要做的就是统计最近一个月内主播的视频评级信息
在这我们先初始化一天的数据即可,计算一天和计算一个月的数据,计算逻辑是一样的,只有spark任务的输入路径不一样
如果是一个月的数据,假设这一个月有30天,则需要把这30天对应的30个目录使用逗号分隔,拼接成一个字符串,作为Spark任务的输入即可。
为什么这个任务要每周计算一次,而不是每天计算一次呢?
因为很多主播不会每天都开播,所以我们每天都计算意义不大,均衡考虑之后按照每周计算一次这个频率。
创建子module项目:update_video_info
创建scala目录,引入scala2.11版本的sdk
在scala目录中创建包:com.imooc.spark
创建类:UpdateVideoInfoScala
代码如下:
package com.imooc.sparkimport com.alibaba.fastjson.JSON
import org.apache.spark.{SparkConf, SparkContext}
import org.neo4j.driver.{AuthTokens, GraphDatabase}
import org.slf4j.LoggerFactory/*** 任务5:* 每周一计算最近一个月主播视频评级* 把最近几次视频评级在3B+或2A+的主播,在neo4j中设置flag=1** 注意:在执行程序之前需要先把flag=1的重置为0* */
object UpdateVideoInfoScala {val logger = LoggerFactory.getLogger("UpdateVideoInfo")def main(args: Array[String]): Unit = {var masterUrl = "local"var appName = "UpdateVideoInfo"var filePath = "hdfs://bigdata01:9000/data/video_info/20260201"var boltUrl = "bolt://bigdata04:7687"var username = "neo4j"var password = "admin"if(args.length > 0){masterUrl = args(0)appName = args(1)filePath = args(2)boltUrl = args(3)username = args(4)password = args(5)}//在Driver端执行此代码,将flag=1的重置为0//获取neo4j连接val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))//开启一个会话val session = driver.session()session.run("match(a:User) where a.flag=1 set a.flag=0")//关闭会话session.close()//关闭连接driver.close()//获取SparkContextval conf = new SparkConf().setAppName(appName).setMaster(masterUrl)val sc = new SparkContext(conf)//读取视频评级数据val linesRDD = sc.textFile(filePath)//解析数据中的uid,rating,timestampval tup3RDD = linesRDD.map(line => {try {val jsonObj = JSON.parseObject(line)val uid = jsonObj.getString("uid")val rating = jsonObj.getString("rating")val timestamp: Long = jsonObj.getLong("timestamp")(uid, rating, timestamp)} catch {case ex: Exception => logger.error("json数据解析失败:" + line)("0", "0", 0L)}})//过滤掉异常数据val filterRDD = tup3RDD.filter(_._2 != "0")//获取用户最近3场直播的评级信息val top3RDD = filterRDD.groupBy(_._1).map(group=>{val top3 = group._2.toList.sortBy(_._3).reverse.take(3).mkString("\t")(group._1,top3)})//过滤出来满足3场B+的数据val top3BRDD = top3RDD.filter(tup => {var flag = falseval fields = tup._2.split("\t")if (fields.length == 3) {//3场B+,表示里面没有出现C和Dval tmp_str = fields(0).split(",")(1) + "," + fields(1).split(",")(1) + "," + fields(2).split(",")(1)if (!tmp_str.contains("C") && !tmp_str.contains("D")) {flag = true}}flag})//把满足3场B+的数据更新到neo4j中,增加一个字段flag,flag=1表示是视频评级满足条件的主播,允许推荐给用户//注意:针对3场B+的数据还需要额外再限制一下主播等级,主播等级需要>=15,这样可以保证筛选出来的主播尽可能是一些优质主播top3BRDD.foreachPartition(it=>{//获取neo4j连接val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))//开启一个会话val session = driver.session()it.foreach(tup=>{session.run("match(a:User {uid: '"+tup._1+"'}) where a.level >=15 set a.flag = 1")})//关闭会话session.close()//关闭连接driver.close()})//过滤出来满足2场A+的数据val top2ARDD = top3RDD.filter(tup=>{var flag = falseval fields = tup._2.split("\t")if (fields.length >= 2) {//2场A+,获取最近两场直播评级,里面不能出现B、C、Dval tmp_str = fields(0).split(",")(1) + "," + fields(1).split(",")(1)if (!tmp_str.contains("B") && !tmp_str.contains("C") && !tmp_str.contains("D")) {flag = true}}flag})//把满足2场A+的数据更新到neo4j中,增加一个字段flag,flag=1表示是视频评级满足条件的主播,允许推荐给用户//注意:针对2场A+的数据还需要额外再限制一下主播等级,主播等级需要>=4,这样可以保证筛选出来的主播尽可能是一些优质主播top2ARDD.foreachPartition(it=>{//获取neo4j连接val driver = GraphDatabase.driver(boltUrl, AuthTokens.basic(username, password))//开启一个会话val session = driver.session()it.foreach(tup=>{session.run("match(a:User {uid: '"+tup._1+"'}) where a.level >=4 set a.flag = 1")})//关闭会话session.close()//关闭连接driver.close()})}}
在本地执行代码
然后到neo4j的web界面查看结果,发现只有uid为1005的数据对应的flag不等于1(没有flag属性)
这样是正确的。
下面开发任务执行脚本
注意:这个脚本中需要实现获取最近一个月的数据目录
startUpdateVideoInfo.sh
#!/bin/bash# 获取最近一个月的文件目录
#filePath=""
#for((i&#61;1;i<&#61;30;i&#43;&#43;))
#do
# filePath&#43;&#61;"hdfs://bigdata01:9000/data/video_info/"&#96;date -d "$i days ago" &#43;"%Y%m%d"&#96;,
#done#默认获取昨天时间
dt&#61;&#96;date -d "1 days ago" &#43;"%Y%m%d"&#96;
if [ "x$1" !&#61; "x" ]
thendt&#61;$1
fi
#HDFS输入数据路径
filePath&#61;"hdfs://bigdata01:9000/data/video_info/${dt}"masterUrl&#61;"yarn-cluster"
master&#61;&#96;echo ${masterUrl} | awk -F&#39;-&#39; &#39;{print $1}&#39;&#96;
deployMode&#61;&#96;echo ${masterUrl} | awk -F&#39;-&#39; &#39;{print $2}&#39;&#96;# 组装一个唯一的名称
appName&#61;"UpdateVideoInfoScala"&#96;date &#43;%s&#96;
boltUrl&#61;"bolt://bigdata04:7687"
username&#61;"neo4j"
password&#61;"admin"yarnCommonLib&#61;"hdfs://bigdata01:9000/yarnCommonLib"spark-submit --master ${master} \
--name ${appName} \
--deploy-mode ${deployMode} \
--queue default \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
--num-executors 2 \
--class com.imooc.spark.UpdateVideoInfoScala \
--jars ${yarnCommonLib}/fastjson-1.2.68.jar,${yarnCommonLib}/neo4j-java-driver-4.1.1.jar,${yarnCommonLib}/reactive-streams-1.0.3.jar \
/data/soft/video_recommend/jobs/update_video_info-1.0-SNAPSHOT.jar ${masterUrl} ${appName} ${filePath} ${boltUrl} ${username} ${password}#验证任务执行状态
appStatus&#61;&#96;yarn application -appStates FINISHED -list | grep ${appName} | awk &#39;{print $7}&#39;&#96;
if [ "${appStatus}" !&#61; "SUCCEEDED" ]
thenecho "任务执行失败"# 发送短信或者邮件
elseecho "任务执行成功"
fi
对项目代码编译打包&#xff0c;在pom.xml中添加打包配置
打jar包
D:\IdeaProjects\db_video_recommend\update_video_info>mvn clean package -DskipTests
[INFO] Scanning for projects...
[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) &#64; update_video_info ---
[INFO] Building jar: D:\IdeaProjects\db_video_recommend\update_video_info\target\update_video_info-1.0-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 7.793s
[INFO] Final Memory: 23M/619M
[INFO] ------------------------------------------------------------------------
将jar包和任务执行脚本上传到bigdata04机器上面
[root&#64;bigdata04 jobs]# ll
-rw-r--r--. 1 root root 1461 Aug 31 2020 startUpdateVideoInfo.sh
-rw-r--r--. 1 root root 17242 Aug 31 2020 update_video_info-1.0-SNAPSHOT.jar
向集群中提交任务
[root&#64;bigdata04 jobs]# sh -x startUpdateVideoInfo.sh 20260201
到集群中验证任务执行状态&#xff0c;发现任务执行成功&#xff0c;此时neo4j中的数据还是老样子&#xff0c;因为刚才我们已经在本地执行过一次了&#xff0c;重复再执行对结果没影响。