文章地址:http://www.haha174.top/article/details/253715
源码:https://github.com/haha174/spark-session.git
上一遍中通过代码构造出了测试的数据下面开始对该数据进行聚合
首先分析一下
/**** 接收用户创建的分析任务,用户可能指定的条件如下:** 1、时间范围:起始日期~结束日期* 2、性别:男或女* 3、年龄范围* 4、职业:多选* 5、城市:多选* 6、搜索词:多个搜索词,只要某个session中的任何一个action搜索过指定的关键词,那么session就符合条件* 7、点击品类:多个品类,只要某个session中的任何一个action点击过某个品类,那么session就符合条件** 我们的spark作业如何接受用户创建的任务?** J2EE平台在接收用户创建任务的请求之后,会将任务信息插入MySQL的task表中,任务参数以JSON格式封装在task_param* 字段中** 接着J2EE平台会执行我们的spark-submit shell脚本,并将taskid作为参数传递给spark-submit shell脚本* spark-submit shell脚本,在执行时,是可以接收参数的,并且会将接收的参数,传递给Spark作业的main函数* 参数就封装在main函数的args数组中* 这是spark本身提供的特性*/
下面创建一个task (向task 表中插入一条数据 在测试的时候写的是固定的值但是实际上是在 环境信息中获取的)
使用之前开发好的jdbc 组件插入一条数据如下
private static void setTask() {Map map = new HashMap();map.put("startDate", "2018-05-01");map.put("endDate", "2018-06-31");String sql = "insert into task(task_id,task_name,task_param) values('" + taskid + "','test01','" + JSON.toJSONString(map) + "')";SessionFactory sessionFactory = SessionFactory.getSessionFactory();sessionFactory.executeUpdate(sql, null);}
下面获取当前的taskId 取得进行session 参数条件
//taskDAO 请参考 https://blog.csdn.net/u012957549/article/details/80445329Task task = taskDAO.findById(taskid);JSONObject taskParam = JSONObject.parseObject(task.getTaskParam());
获取指定参数内的session 数据
/*** 获取指定日期范围内的用户访问行为数据* &#64;param sqlContext SQLContext* &#64;param taskParam 任务参数* &#64;return 行为数据RDD*/private static JavaRDD getActionRDDByDateRange(SQLContext sqlContext, JSONObject taskParam) {String startDate &#61; ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_START_DATE);String endDate &#61; ParamUtils.getParam(taskParam, Constants.SESSION_PROJECT.PARAM_END_DATE);String sql &#61;"select * "&#43; "from user_visit_action "&#43; "where date>&#61;&#39;" &#43; startDate &#43; "&#39; "&#43; "and date<&#61;&#39;" &#43; endDate &#43; "&#39;";Dataset actionDF &#61; sqlContext.sql(sql);return actionDF.javaRDD();}
// 首先&#xff0c;可以将行为数据&#xff0c;按照session_id进行groupByKey分组// 此时的数据的粒度就是session粒度了&#xff0c;然后呢&#xff0c;可以将session粒度的数据// 与用户信息数据&#xff0c;进行join// 然后就可以获取到session粒度的数据&#xff0c;同时呢&#xff0c;数据里面还包含了session对应的user的信息JavaPairRDD sessionid2AggrInfoRDD &#61;aggregateBySession(sqlContext, actionRDD);
/*** 对行为数据按session粒度进行聚合* &#64;param actionRDD 行为数据RDD* &#64;return session粒度聚合数据*/private static JavaPairRDD aggregateBySession(SQLContext sqlContext, JavaRDD actionRDD) {// 现在actionRDD中的元素是Row&#xff0c;一个Row就是一行用户访问行为记录&#xff0c;比如一次点击或者搜索// 我们现在需要将这个Row映射成的格式JavaPairRDD sessionid2ActionRDD &#61; actionRDD.mapToPair(/*** PairFunction* 第一个参数&#xff0c;相当于是函数的输入* 第二个参数和第三个参数&#xff0c;相当于是函数的输出&#xff08;Tuple&#xff09;&#xff0c;分别是Tuple第一个和第二个值*/new PairFunction() {private static final long serialVersionUID &#61; 1L;&#64;Overridepublic Tuple2 call(Row row) throws Exception {return new Tuple2(row.getString(2), row);}});// 对行为数据按session粒度进行分组JavaPairRDD> sessionid2ActionsRDD &#61;sessionid2ActionRDD.groupByKey();// 对每一个session分组进行聚合&#xff0c;将session中所有的搜索词和点击品类都聚合起来// 到此为止&#xff0c;获取的数据格式&#xff0c;如下&#xff1a;JavaPairRDD userid2PartAggrInfoRDD &#61; sessionid2ActionsRDD.mapToPair(new PairFunction>, Long, String>() {private static final long serialVersionUID &#61; 1L;&#64;Overridepublic Tuple2 call(Tuple2> tuple)throws Exception {String sessionid &#61; tuple._1;Iterator iterator &#61; tuple._2.iterator();StringBuffer searchKeywordsBuffer &#61; new StringBuffer("");StringBuffer clickCategoryIdsBuffer &#61; new StringBuffer("");Long userid &#61; null;// 遍历session所有的访问行为while (iterator.hasNext()) {// 提取每个访问行为的搜索词字段和点击品类字段Row row &#61; iterator.next();if (userid &#61;&#61; null) {userid &#61; row.getLong(1);}String searchKeyword &#61; row.getString(5);Long clickCategoryId &#61; null;if (row.get(6) !&#61; null) {clickCategoryId &#61; row.getLong(6);}// 实际上这里要对数据说明一下// 并不是每一行访问行为都有searchKeyword何clickCategoryId两个字段的// 其实&#xff0c;只有搜索行为&#xff0c;是有searchKeyword字段的// 只有点击品类的行为&#xff0c;是有clickCategoryId字段的// 所以&#xff0c;任何一行行为数据&#xff0c;都不可能两个字段都有&#xff0c;所以数据是可能出现null值的// 我们决定是否将搜索词或点击品类id拼接到字符串中去// 首先要满足&#xff1a;不能是null值// 其次&#xff0c;之前的字符串中还没有搜索词或者点击品类idif (StringUtils.isNotEmpty(searchKeyword)) {if (!searchKeywordsBuffer.toString().contains(searchKeyword)) {searchKeywordsBuffer.append(searchKeyword &#43; ",");}}if (clickCategoryId !&#61; null) {if (!clickCategoryIdsBuffer.toString().contains(String.valueOf(clickCategoryId))) {clickCategoryIdsBuffer.append(clickCategoryId &#43; ",");}}}String searchKeywords &#61; StringUtils.trimComma(searchKeywordsBuffer.toString());String clickCategoryIds &#61; StringUtils.trimComma(clickCategoryIdsBuffer.toString());// 我们返回的数据格式&#xff0c;即使// 但是&#xff0c;这一步聚合完了以后&#xff0c;其实&#xff0c;我们是还需要将每一行数据&#xff0c;跟对应的用户信息进行聚合// 问题就来了&#xff0c;如果是跟用户信息进行聚合的话&#xff0c;那么key&#xff0c;就不应该是sessionid// 就应该是userid&#xff0c;才能够跟格式的用户信息进行聚合// 如果我们这里直接返回&#xff0c;还得再做一次mapToPair算子// 将RDD映射成的格式&#xff0c;那么就多此一举// 所以&#xff0c;我们这里其实可以直接&#xff0c;返回的数据格式&#xff0c;就是// 然后跟用户信息join的时候&#xff0c;将partAggrInfo关联上userInfo// 然后再直接将返回的Tuple的key设置成sessionid// 最后的数据格式&#xff0c;还是// 聚合数据&#xff0c;用什么样的格式进行拼接&#xff1f;// 我们这里统一定义&#xff0c;使用key&#61;value|key&#61;valueString partAggrInfo &#61; Constants.SESSION_PROJECT.FIELD_SESSION_ID &#43; "&#61;" &#43; sessionid &#43; "|"&#43; Constants.SESSION_PROJECT.FIELD_SEARCH_KEYWORDS &#43; "&#61;" &#43; searchKeywords &#43; "|"&#43; Constants.SESSION_PROJECT.FIELD_CLICK_CATEGORY_IDS &#43; "&#61;" &#43; clickCategoryIds;return new Tuple2(userid, partAggrInfo);}});// 查询所有用户数据&#xff0c;并映射成的格式String sql &#61; "select * from user_info";JavaRDD userInfoRDD &#61; sqlContext.sql(sql).javaRDD();JavaPairRDD userid2InfoRDD &#61; userInfoRDD.mapToPair(new PairFunction() {private static final long serialVersionUID &#61; 1L;&#64;Overridepublic Tuple2 call(Row row) throws Exception {return new Tuple2(row.getLong(0), row);}});// 将session粒度聚合数据&#xff0c;与用户信息进行joinJavaPairRDD> userid2FullInfoRDD &#61;userid2PartAggrInfoRDD.join(userid2InfoRDD);// 对join起来的数据进行拼接&#xff0c;并且返回格式的数据JavaPairRDD sessionid2FullAggrInfoRDD &#61; userid2FullInfoRDD.mapToPair(new PairFunction>, String, String>() {private static final long serialVersionUID &#61; 1L;&#64;Overridepublic Tuple2 call(Tuple2> tuple)throws Exception {String partAggrInfo &#61; tuple._2._1;Row userInfoRow &#61; tuple._2._2;String sessionid &#61; StringUtils.getFieldFromConcatString(partAggrInfo, "\\|", Constants.SESSION_PROJECT.FIELD_SESSION_ID);int age &#61; userInfoRow.getInt(3);String professional &#61; userInfoRow.getString(4);String city &#61; userInfoRow.getString(5);String sex &#61; userInfoRow.getString(6);String fullAggrInfo &#61; partAggrInfo &#43; "|"&#43; Constants.FIELD.FIELD_AGE &#43; "&#61;" &#43; age &#43; "|"&#43; Constants.FIELD.FIELD_PROFESSIONAL &#43; "&#61;" &#43; professional &#43; "|"&#43; Constants.FIELD.FIELD_CITY &#43; "&#61;" &#43; city &#43; "|"&#43; Constants.FIELD.FIELD_SEX &#43; "&#61;" &#43; sex;return new Tuple2(sessionid, fullAggrInfo);}});return sessionid2FullAggrInfoRDD;}
欢迎关注&#xff0c;更多惊喜等着你