热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

hadoop商品推荐_百战卓越班学员学习经验分享:商品推荐

商品推荐入口方法packagecn.sxt.itemcf;importorg.apache.hadoop.conf.Configuration;importjava.util.

商品推荐

入口方法

package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import java.util.HashMap;
import java.util.Map;
public class StartRun {public static void main(String[] args) {Configuration conf = new Configuration();conf.set("mapreduce.app-submission.corss-paltform", "true");conf.set("mapreduce.framework.name", "local");Map paths = new HashMap();paths.put("Step1Input","/data/itemcf/input/");paths.put("Step1Output","/data/itemcf/output/step1");paths.put("Step2Input",paths.get("Step1Output"));paths.put("Step2Output","/data/itemcf/output/step2");paths.put("Step3Input",paths.get("Step2Output"));paths.put("Step3Output","/data/itemcf/output/step3");paths.put("Step4Input1",paths.get("Step2Output"));paths.put("Step4Input2",paths.get("Step3Output"));paths.put("Step4Output","/data/itemcf/output/step4");paths.put("Step5Input",paths.get("Step4Output"));paths.put("Step5Output","/data/itemcf/output/step5");paths.put("Step6Input",paths.get("Step5Output"));paths.put("Step6Output","/data/itemcf/output/step6");
// Step1.run(conf,paths);//去除重复行
// Step2.run(conf,paths);
// Step3.run(conf,paths);
// Step4.run(conf,paths);
// Step5.run(conf,paths);Step6.run(conf,paths);}public static Map R = new HashMap();static {R.put("click",1);R.put("collect",2);R.put("cart",3);R.put("alipay",4);}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Map;
public class Step1 {public static boolean run(Configuration conf, Map paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step1");job.setJarByClass(Step1.class);job.setMapperClass(Step1_Mapper.class);job.setReducerClass(Step1_Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);FileInputFormat.addInputPath(job,new Path(paths.get("Step1Input")));Path output = new Path(paths.get("Step1Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step1_Mapper extends Mapper {//key是行的偏移量,这里是将第一行以外的数据做处理,就是去除掉第一行 item_id,user_id,action,vtime@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {if(key.get() != 0){//以数据作为key,直接达到去重的目的context.write(value,NullWritable.get());}}}private static class Step1_Reduce extends Reducer {@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {context.write(key,NullWritable.get());}}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class Step2 {public static boolean run(Configuration conf, Map paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step2");job.setJarByClass(Step2.class);job.setMapperClass(Step2_Mapper.class);job.setReducerClass(Step2_Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);FileInputFormat.addInputPath(job,new Path(paths.get("Step2Input")));Path output = new Path(paths.get("Step2Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step2_Mapper extends Mapper {//进来的数据格式 i160,u2781,click,2014/9/23 22:25@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] tokens = value.toString().split(",");String item = tokens[0];String user = tokens[1];String action = tokens[2];Text k = new Text(user);Integer rv = StartRun.R.get(action);Text v = new Text(item+":"+rv.intValue());//出去的格式为 u2781 i160:2context.write(k,v);}}private static class Step2_Reduce extends Reducer {@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {//u2781 i160:2Map r = new HashMap();for (Text value : values) {String[] vs = value.toString().split(":");String item = vs[0];Integer action = Integer.parseInt(vs[1]);//先判断map集合中有没有该item对应的值,没有就取0,有就取出来和新的相加,表示该用户对该商品的总评分action = ((Integer) (r.get(item) == null ? 0 : r.get(item))).intValue()+action;r.put(item,action);}StringBuffer sb = new StringBuffer();for (Map.Entry entry : r.entrySet()) {sb.append(entry.getKey()+":"+entry.getValue().intValue()+",");}//打印出来后就是该用户对自己接触过的商品的全部评分context.write(key,new Text(sb.toString()));}}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Map;
public class Step3 {private final static Text K = new Text();private final static IntWritable V = new IntWritable(1);public static boolean run(Configuration conf, Map paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step3");job.setJarByClass(Step3.class);job.setMapperClass(Step3_Mapper.class);job.setReducerClass(Step3_Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job,new Path(paths.get("Step3Input")));Path output = new Path(paths.get("Step3Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step3_Mapper extends Mapper {//value的值 u2778 i160:8,i270:1,i319:2,i352:5,i487:1,i325:1,i249:2,@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] tokens = value.toString().split("t");//items i160:8String[] items = tokens[1].split(",");for (int i = 0; i {//统计的结果表示买了A商品同时又买了B商品的次数,得到一个类矩阵@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum = sum + value.get();}V.set(sum);context.write(key,V);}}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;
public class Step4 {public static boolean run(Configuration conf, Map paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step4");job.setJarByClass(Step4.class);job.setMapperClass(Step4_Mapper.class);job.setReducerClass(Step4_Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);FileInputFormat.setInputPaths(job,new Path[] {new Path(paths.get("Step4Input1")),new Path(paths.get("Step4Input2"))});Path output = new Path(paths.get("Step4Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step4_Mapper extends Mapper {private String flag;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//进来的有两个目录,一个step2,一个step3,flag的值就是其中之一FileSplit split = (FileSplit) context.getInputSplit();flag = split.getPath().getParent().getName();System.out.println(flag + "**************************");}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//数据要么是 u2778 i160:8,i270:1,i319:2,i352:5,i487:1,i325:1,i249:2, step2//要么是 i100:i184 2 step3String[] tokens = Pattern.compile("[t,]").split(value.toString());if(flag.equals("step3")){String[] v1 = tokens[0].split(":");String itemID1 = v1[0];String itemID2 = v1[1];String num = tokens[1];Text k = new Text(itemID1);Text v = new Text("A:"+itemID2+","+num);//i100 A:i184,2context.write(k,v);}else if(flag.equals("step2")){//上面已经按制表符和逗号做了分隔,到这的数据实际就是//u2778 i160:8 i270:1 i319:2 i352:5 i487:1 i325:1 i249:2// 0 1 2 3 4 5 6 7String userID = tokens[0];for (int i = 1; i {//i100 A:i184,2//i100 B:u2778,1@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {Map mapA = new HashMap();Map mapB = new HashMap();for (Text line : values) {String val = line.toString();if(val.startsWith("A:")){String[] kv = Pattern.compile("[t,]").split(val.substring(2));try {mapA.put(kv[0],Integer.parseInt(kv[1]));}catch (Exception e){e.printStackTrace();}}else if(val.startsWith("B:")){String[] kv = Pattern.compile("[t,]").split(val.substring(2));try {//不同的用户对该商品的评价,key是用户mapB.put(kv[0],Integer.parseInt(kv[1]));}catch (Exception e){e.printStackTrace();}}}double result = 0;Iterator iter = mapA.keySet().iterator();while (iter.hasNext()){//同现矩阵中的某个关联商品String mapk = iter.next();//关联的商品名称//i184,2int num = mapA.get(mapk).intValue();Iterator iterb = mapB.keySet().iterator();while (iterb.hasNext()){//对于该商品不同用户的评分String mapkb = iterb.next();//用户IDint pref = mapB.get(mapkb).intValue();result = num*pref;Text k = new Text(mapkb);Text v = new Text(mapk + ","+result);context.write(k,v);}}}}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;
public class Step5 {private final static Text K = new Text();private final static Text V = new Text();public static boolean run(Configuration conf, Map paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step5");job.setJarByClass(Step5.class);job.setMapperClass(Step5_Mapper.class);job.setReducerClass(Step5_Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);FileInputFormat.addInputPath(job,new Path(paths.get("Step5Input")));Path output = new Path(paths.get("Step5Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step5_Mapper extends Mapper {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] tokens = Pattern.compile("[t,]").split(value.toString());Text k = new Text(tokens[0]);Text v = new Text(tokens[1]+","+tokens[2]);context.write(k,v);}}private static class Step5_Reduce extends Reducer {@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {Map map = new HashMap();for (Text line : values) {String[] tokens = line.toString().split(",");String itemID = tokens[0];Double source = Double.parseDouble(tokens[1]);if(map.containsKey(itemID)){map.put(itemID,map.get(itemID)+source);}else{map.put(itemID,source);}}Iterator iter = map.keySet().iterator();while (iter.hasNext()){String itemID = iter.next();double source = map.get(itemID);Text v = new Text(itemID+","+source);context.write(key,v);}}}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import java.util.regex.Pattern;
public class Step6 {private final static Text K = new Text();private final static Text V = new Text();public static boolean run(Configuration conf, Map paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step6");job.setJarByClass(Step6.class);job.setMapperClass(Step6_Mapper.class);job.setReducerClass(Step6_Reduce.class);job.setSortComparatorClass(NumSort.class);job.setGroupingComparatorClass(UserGroup.class);job.setMapOutputKeyClass(PairWritable.class);job.setMapOutputValueClass(Text.class);FileInputFormat.addInputPath(job,new Path(paths.get("Step6Input")));Path output = new Path(paths.get("Step6Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step6_Mapper extends Mapper {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] tokens = Pattern.compile("[t,]").split(value.toString());String u = tokens[0];String item = tokens[1];String num = tokens[2];PairWritable k = new PairWritable();k.setUid(u);k.setNum(Double.parseDouble(num));V.set(item+":"+num);context.write(k,V);}}private static class Step6_Reduce extends Reducer {@Overrideprotected void reduce(PairWritable key, Iterable values, Context context) throws IOException, InterruptedException {int i = 0;StringBuffer sb = new StringBuffer();for (Text value : values) {if(i == 10){break;}sb.append(value.toString()+",");i++;}K.set(key.getUid());V.set(sb.toString());context.write(K,V);}}private static class NumSort extends WritableComparator {public NumSort() {super(PairWritable.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {PairWritable o1 = (PairWritable) a;PairWritable o2 = (PairWritable) b;int r = o1.getUid().compareTo(o2.getUid());if(r == 0){return -Double.compare(o1.getNum(),o2.getNum());}return r;}}private static class UserGroup extends WritableComparator {public UserGroup() {super(PairWritable.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {PairWritable o1 = (PairWritable) a;PairWritable o2 = (PairWritable) b;return o1.getUid().compareTo(o2.getUid());}}private static class PairWritable implements WritableComparable {private String uid;private double num;public String getUid() {return uid;}public void setUid(String uid) {this.uid = uid;}public double getNum() {return num;}public void setNum(double num) {this.num = num;}@Overridepublic int compareTo(PairWritable o) {int r = this.uid.compareTo(o.getUid());if(r == 0){return Double.compare(this.num,o.getNum());}return r;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(uid);out.writeDouble(num);}@Overridepublic void readFields(DataInput in) throws IOException {this.uid = in.readUTF();this.num = in.readDouble();}}
}

868177ccc0420f3c96618f7bd51f815c.png

更多科技一手咨询,欢迎关注!



推荐阅读
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • (三)多表代码生成的实现方法
    本文介绍了一种实现多表代码生成的方法,使用了java代码和org.jeecg框架中的相关类和接口。通过设置主表配置,可以生成父子表的数据模型。 ... [详细]
  • Java如何导入和导出Excel文件的方法和步骤详解
    本文详细介绍了在SpringBoot中使用Java导入和导出Excel文件的方法和步骤,包括添加操作Excel的依赖、自定义注解等。文章还提供了示例代码,并将代码上传至GitHub供访问。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • 本文介绍了解决Netty拆包粘包问题的一种方法——使用特殊结束符。在通讯过程中,客户端和服务器协商定义一个特殊的分隔符号,只要没有发送分隔符号,就代表一条数据没有结束。文章还提供了服务端的示例代码。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • Spring学习(4):Spring管理对象之间的关联关系
    本文是关于Spring学习的第四篇文章,讲述了Spring框架中管理对象之间的关联关系。文章介绍了MessageService类和MessagePrinter类的实现,并解释了它们之间的关联关系。通过学习本文,读者可以了解Spring框架中对象之间的关联关系的概念和实现方式。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • Java容器中的compareto方法排序原理解析
    本文从源码解析Java容器中的compareto方法的排序原理,讲解了在使用数组存储数据时的限制以及存储效率的问题。同时提到了Redis的五大数据结构和list、set等知识点,回忆了作者大学时代的Java学习经历。文章以作者做的思维导图作为目录,展示了整个讲解过程。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • 本文介绍了禅道作为一款国产开源免费的测试管理工具的特点和功能,并提供了禅道的搭建和调试方法。禅道是一款B/S结构的项目管理工具,可以实现组织管理、后台管理、产品管理、项目管理和测试管理等功能。同时,本文还介绍了其他软件测试相关工具,如功能自动化工具和性能自动化工具,以及白盒测试工具的使用。通过本文的阅读,读者可以了解禅道的基本使用方法和优势,从而更好地进行测试管理工作。 ... [详细]
author-avatar
田得婕_762
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有