热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

spark大型项目实战(十五):用户访问session分析(十五)按照session粒度进行数据聚合

文章地址:http:www.haha174.toparticledetails253715源码:https:github.comhaha174spark

文章地址:http://www.haha174.top/article/details/253715
源码:https://github.com/haha174/spark-session.git
上一遍中通过代码构造出了测试的数据下面开始对该数据进行聚合
首先分析一下

/**** 接收用户创建的分析任务,用户可能指定的条件如下:** 1、时间范围:起始日期~结束日期* 2、性别:男或女* 3、年龄范围* 4、职业:多选* 5、城市:多选* 6、搜索词:多个搜索词,只要某个session中的任何一个action搜索过指定的关键词,那么session就符合条件* 7、点击品类:多个品类,只要某个session中的任何一个action点击过某个品类,那么session就符合条件** 我们的spark作业如何接受用户创建的任务?** J2EE平台在接收用户创建任务的请求之后,会将任务信息插入MySQL的task表中,任务参数以JSON格式封装在task_param* 字段中** 接着J2EE平台会执行我们的spark-submit shell脚本,并将taskid作为参数传递给spark-submit shell脚本* spark-submit shell脚本,在执行时,是可以接收参数的,并且会将接收的参数,传递给Spark作业的main函数* 参数就封装在main函数的args数组中* 这是spark本身提供的特性*/

下面创建一个task (向task 表中插入一条数据 在测试的时候写的是固定的值但是实际上是在 环境信息中获取的)
使用之前开发好的jdbc 组件插入一条数据如下

private static void setTask() {Map map = new HashMap();map.put("startDate", "2018-05-01");map.put("endDate", "2018-06-31");String sql = "insert into task(task_id,task_name,task_param) values('" + taskid + "','test01','" + JSON.toJSONString(map) + "')";SessionFactory sessionFactory = SessionFactory.getSessionFactory();sessionFactory.executeUpdate(sql, null);}

下面获取当前的taskId 取得进行session 参数条件

//taskDAO 请参考 https://blog.csdn.net/u012957549/article/details/80445329Task task = taskDAO.findById(taskid);JSONObject taskParam = JSONObject.parseObject(task.getTaskParam());

获取指定参数内的session 数据

/*** 获取指定日期范围内的用户访问行为数据* &#64;param sqlContext SQLContext* &#64;param taskParam 任务参数* &#64;return 行为数据RDD*/private static JavaRDD getActionRDDByDateRange(SQLContext sqlContext, JSONObject taskParam) {String startDate &#61; ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_START_DATE);String endDate &#61; ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_END_DATE);String sql &#61;"select * "&#43; "from user_visit_action "&#43; "where date>&#61;&#39;" &#43; startDate &#43; "&#39; "&#43; "and date<&#61;&#39;" &#43; endDate &#43; "&#39;";Dataset actionDF &#61; sqlContext.sql(sql);return actionDF.javaRDD();}

// 首先&#xff0c;可以将行为数据&#xff0c;按照session_id进行groupByKey分组// 此时的数据的粒度就是session粒度了&#xff0c;然后呢&#xff0c;可以将session粒度的数据// 与用户信息数据&#xff0c;进行join// 然后就可以获取到session粒度的数据&#xff0c;同时呢&#xff0c;数据里面还包含了session对应的user的信息JavaPairRDD sessionid2AggrInfoRDD &#61;aggregateBySession(sqlContext, actionRDD);

/*** 对行为数据按session粒度进行聚合* &#64;param actionRDD 行为数据RDD* &#64;return session粒度聚合数据*/private static JavaPairRDD aggregateBySession(SQLContext sqlContext, JavaRDD actionRDD) {// 现在actionRDD中的元素是Row&#xff0c;一个Row就是一行用户访问行为记录&#xff0c;比如一次点击或者搜索// 我们现在需要将这个Row映射成的格式JavaPairRDD sessionid2ActionRDD &#61; actionRDD.mapToPair(/*** PairFunction* 第一个参数&#xff0c;相当于是函数的输入* 第二个参数和第三个参数&#xff0c;相当于是函数的输出&#xff08;Tuple&#xff09;&#xff0c;分别是Tuple第一个和第二个值*/new PairFunction() {private static final long serialVersionUID &#61; 1L;&#64;Overridepublic Tuple2 call(Row row) throws Exception {return new Tuple2(row.getString(2), row);}});// 对行为数据按session粒度进行分组JavaPairRDD> sessionid2ActionsRDD &#61;sessionid2ActionRDD.groupByKey();// 对每一个session分组进行聚合&#xff0c;将session中所有的搜索词和点击品类都聚合起来// 到此为止&#xff0c;获取的数据格式&#xff0c;如下&#xff1a;JavaPairRDD userid2PartAggrInfoRDD &#61; sessionid2ActionsRDD.mapToPair(new PairFunction>, Long, String>() {private static final long serialVersionUID &#61; 1L;&#64;Overridepublic Tuple2 call(Tuple2> tuple)throws Exception {String sessionid &#61; tuple._1;Iterator iterator &#61; tuple._2.iterator();StringBuffer searchKeywordsBuffer &#61; new StringBuffer("");StringBuffer clickCategoryIdsBuffer &#61; new StringBuffer("");Long userid &#61; null;// 遍历session所有的访问行为while (iterator.hasNext()) {// 提取每个访问行为的搜索词字段和点击品类字段Row row &#61; iterator.next();if (userid &#61;&#61; null) {userid &#61; row.getLong(1);}String searchKeyword &#61; row.getString(5);Long clickCategoryId &#61; null;if (row.get(6) !&#61; null) {clickCategoryId &#61; row.getLong(6);}// 实际上这里要对数据说明一下// 并不是每一行访问行为都有searchKeyword何clickCategoryId两个字段的// 其实&#xff0c;只有搜索行为&#xff0c;是有searchKeyword字段的// 只有点击品类的行为&#xff0c;是有clickCategoryId字段的// 所以&#xff0c;任何一行行为数据&#xff0c;都不可能两个字段都有&#xff0c;所以数据是可能出现null值的// 我们决定是否将搜索词或点击品类id拼接到字符串中去// 首先要满足&#xff1a;不能是null值// 其次&#xff0c;之前的字符串中还没有搜索词或者点击品类idif (StringUtils.isNotEmpty(searchKeyword)) {if (!searchKeywordsBuffer.toString().contains(searchKeyword)) {searchKeywordsBuffer.append(searchKeyword &#43; ",");}}if (clickCategoryId !&#61; null) {if (!clickCategoryIdsBuffer.toString().contains(String.valueOf(clickCategoryId))) {clickCategoryIdsBuffer.append(clickCategoryId &#43; ",");}}}String searchKeywords &#61; StringUtils.trimComma(searchKeywordsBuffer.toString());String clickCategoryIds &#61; StringUtils.trimComma(clickCategoryIdsBuffer.toString());// 我们返回的数据格式&#xff0c;即使// 但是&#xff0c;这一步聚合完了以后&#xff0c;其实&#xff0c;我们是还需要将每一行数据&#xff0c;跟对应的用户信息进行聚合// 问题就来了&#xff0c;如果是跟用户信息进行聚合的话&#xff0c;那么key&#xff0c;就不应该是sessionid// 就应该是userid&#xff0c;才能够跟格式的用户信息进行聚合// 如果我们这里直接返回&#xff0c;还得再做一次mapToPair算子// 将RDD映射成的格式&#xff0c;那么就多此一举// 所以&#xff0c;我们这里其实可以直接&#xff0c;返回的数据格式&#xff0c;就是// 然后跟用户信息join的时候&#xff0c;将partAggrInfo关联上userInfo// 然后再直接将返回的Tuple的key设置成sessionid// 最后的数据格式&#xff0c;还是// 聚合数据&#xff0c;用什么样的格式进行拼接&#xff1f;// 我们这里统一定义&#xff0c;使用key&#61;value|key&#61;valueString partAggrInfo &#61; Constants.SESSION_PROJECT.FIELD_SESSION_ID &#43; "&#61;" &#43; sessionid &#43; "|"&#43; Constants.SESSION_PROJECT.FIELD_SEARCH_KEYWORDS &#43; "&#61;" &#43; searchKeywords &#43; "|"&#43; Constants.SESSION_PROJECT.FIELD_CLICK_CATEGORY_IDS &#43; "&#61;" &#43; clickCategoryIds;return new Tuple2(userid, partAggrInfo);}});// 查询所有用户数据&#xff0c;并映射成的格式String sql &#61; "select * from user_info";JavaRDD userInfoRDD &#61; sqlContext.sql(sql).javaRDD();JavaPairRDD userid2InfoRDD &#61; userInfoRDD.mapToPair(new PairFunction() {private static final long serialVersionUID &#61; 1L;&#64;Overridepublic Tuple2 call(Row row) throws Exception {return new Tuple2(row.getLong(0), row);}});// 将session粒度聚合数据&#xff0c;与用户信息进行joinJavaPairRDD> userid2FullInfoRDD &#61;userid2PartAggrInfoRDD.join(userid2InfoRDD);// 对join起来的数据进行拼接&#xff0c;并且返回格式的数据JavaPairRDD sessionid2FullAggrInfoRDD &#61; userid2FullInfoRDD.mapToPair(new PairFunction>, String, String>() {private static final long serialVersionUID &#61; 1L;&#64;Overridepublic Tuple2 call(Tuple2> tuple)throws Exception {String partAggrInfo &#61; tuple._2._1;Row userInfoRow &#61; tuple._2._2;String sessionid &#61; StringUtils.getFieldFromConcatString(partAggrInfo, "\\|", Constants.SESSION_PROJECT.FIELD_SESSION_ID);int age &#61; userInfoRow.getInt(3);String professional &#61; userInfoRow.getString(4);String city &#61; userInfoRow.getString(5);String sex &#61; userInfoRow.getString(6);String fullAggrInfo &#61; partAggrInfo &#43; "|"&#43; Constants.FIELD.FIELD_AGE &#43; "&#61;" &#43; age &#43; "|"&#43; Constants.FIELD.FIELD_PROFESSIONAL &#43; "&#61;" &#43; professional &#43; "|"&#43; Constants.FIELD.FIELD_CITY &#43; "&#61;" &#43; city &#43; "|"&#43; Constants.FIELD.FIELD_SEX &#43; "&#61;" &#43; sex;return new Tuple2(sessionid, fullAggrInfo);}});return sessionid2FullAggrInfoRDD;}

欢迎关注&#xff0c;更多惊喜等着你

这里写图片描述


推荐阅读
  • 阿,里,云,物,联网,net,core,客户端,czgl,aliiotclient, ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 本文介绍了一个Java猜拳小游戏的代码,通过使用Scanner类获取用户输入的拳的数字,并随机生成计算机的拳,然后判断胜负。该游戏可以选择剪刀、石头、布三种拳,通过比较两者的拳来决定胜负。 ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 知识图谱——机器大脑中的知识库
    本文介绍了知识图谱在机器大脑中的应用,以及搜索引擎在知识图谱方面的发展。以谷歌知识图谱为例,说明了知识图谱的智能化特点。通过搜索引擎用户可以获取更加智能化的答案,如搜索关键词"Marie Curie",会得到居里夫人的详细信息以及与之相关的历史人物。知识图谱的出现引起了搜索引擎行业的变革,不仅美国的微软必应,中国的百度、搜狗等搜索引擎公司也纷纷推出了自己的知识图谱。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文介绍了Java高并发程序设计中线程安全的概念与synchronized关键字的使用。通过一个计数器的例子,演示了多线程同时对变量进行累加操作时可能出现的问题。最终值会小于预期的原因是因为两个线程同时对变量进行写入时,其中一个线程的结果会覆盖另一个线程的结果。为了解决这个问题,可以使用synchronized关键字来保证线程安全。 ... [详细]
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
author-avatar
酱油丸子-310
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有