package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import java.util.HashMap;
import java.util.Map;
public class StartRun {public static void main(String[] args) {Configuration conf = new Configuration();conf.set("mapreduce.app-submission.corss-paltform", "true");conf.set("mapreduce.framework.name", "local");Map paths = new HashMap();paths.put("Step1Input","/data/itemcf/input/");paths.put("Step1Output","/data/itemcf/output/step1");paths.put("Step2Input",paths.get("Step1Output"));paths.put("Step2Output","/data/itemcf/output/step2");paths.put("Step3Input",paths.get("Step2Output"));paths.put("Step3Output","/data/itemcf/output/step3");paths.put("Step4Input1",paths.get("Step2Output"));paths.put("Step4Input2",paths.get("Step3Output"));paths.put("Step4Output","/data/itemcf/output/step4");paths.put("Step5Input",paths.get("Step4Output"));paths.put("Step5Output","/data/itemcf/output/step5");paths.put("Step6Input",paths.get("Step5Output"));paths.put("Step6Output","/data/itemcf/output/step6");
// Step1.run(conf,paths);//去除重复行
// Step2.run(conf,paths);
// Step3.run(conf,paths);
// Step4.run(conf,paths);
// Step5.run(conf,paths);Step6.run(conf,paths);}public static Map R = new HashMap();static {R.put("click",1);R.put("collect",2);R.put("cart",3);R.put("alipay",4);}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Map;
public class Step1 {public static boolean run(Configuration conf, Map paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step1");job.setJarByClass(Step1.class);job.setMapperClass(Step1_Mapper.class);job.setReducerClass(Step1_Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);FileInputFormat.addInputPath(job,new Path(paths.get("Step1Input")));Path output = new Path(paths.get("Step1Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step1_Mapper extends Mapper {//key是行的偏移量,这里是将第一行以外的数据做处理,就是去除掉第一行 item_id,user_id,action,vtime@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {if(key.get() != 0){//以数据作为key,直接达到去重的目的context.write(value,NullWritable.get());}}}private static class Step1_Reduce extends Reducer {@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {context.write(key,NullWritable.get());}}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class Step2 {public static boolean run(Configuration conf, Map paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step2");job.setJarByClass(Step2.class);job.setMapperClass(Step2_Mapper.class);job.setReducerClass(Step2_Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);FileInputFormat.addInputPath(job,new Path(paths.get("Step2Input")));Path output = new Path(paths.get("Step2Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step2_Mapper extends Mapper {//进来的数据格式 i160,u2781,click,2014/9/23 22:25@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] tokens = value.toString().split(",");String item = tokens[0];String user = tokens[1];String action = tokens[2];Text k = new Text(user);Integer rv = StartRun.R.get(action);Text v = new Text(item+":"+rv.intValue());//出去的格式为 u2781 i160:2context.write(k,v);}}private static class Step2_Reduce extends Reducer {@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {//u2781 i160:2Map r = new HashMap();for (Text value : values) {String[] vs = value.toString().split(":");String item = vs[0];Integer action = Integer.parseInt(vs[1]);//先判断map集合中有没有该item对应的值,没有就取0,有就取出来和新的相加,表示该用户对该商品的总评分action = ((Integer) (r.get(item) == null ? 0 : r.get(item))).intValue()+action;r.put(item,action);}StringBuffer sb = new StringBuffer();for (Map.Entry entry : r.entrySet()) {sb.append(entry.getKey()+":"+entry.getValue().intValue()+",");}//打印出来后就是该用户对自己接触过的商品的全部评分context.write(key,new Text(sb.toString()));}}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Map;
public class Step3 {private final static Text K = new Text();private final static IntWritable V = new IntWritable(1);public static boolean run(Configuration conf, Map paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step3");job.setJarByClass(Step3.class);job.setMapperClass(Step3_Mapper.class);job.setReducerClass(Step3_Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job,new Path(paths.get("Step3Input")));Path output = new Path(paths.get("Step3Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step3_Mapper extends Mapper {//value的值 u2778 i160:8,i270:1,i319:2,i352:5,i487:1,i325:1,i249:2,@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] tokens = value.toString().split("t");//items i160:8String[] items = tokens[1].split(",");for (int i = 0; i {//统计的结果表示买了A商品同时又买了B商品的次数,得到一个类矩阵@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum = sum + value.get();}V.set(sum);context.write(key,V);}}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;
public class Step4 {public static boolean run(Configuration conf, Map paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step4");job.setJarByClass(Step4.class);job.setMapperClass(Step4_Mapper.class);job.setReducerClass(Step4_Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);FileInputFormat.setInputPaths(job,new Path[] {new Path(paths.get("Step4Input1")),new Path(paths.get("Step4Input2"))});Path output = new Path(paths.get("Step4Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step4_Mapper extends Mapper {private String flag;@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//进来的有两个目录,一个step2,一个step3,flag的值就是其中之一FileSplit split = (FileSplit) context.getInputSplit();flag = split.getPath().getParent().getName();System.out.println(flag + "**************************");}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//数据要么是 u2778 i160:8,i270:1,i319:2,i352:5,i487:1,i325:1,i249:2, step2//要么是 i100:i184 2 step3String[] tokens = Pattern.compile("[t,]").split(value.toString());if(flag.equals("step3")){String[] v1 = tokens[0].split(":");String itemID1 = v1[0];String itemID2 = v1[1];String num = tokens[1];Text k = new Text(itemID1);Text v = new Text("A:"+itemID2+","+num);//i100 A:i184,2context.write(k,v);}else if(flag.equals("step2")){//上面已经按制表符和逗号做了分隔,到这的数据实际就是//u2778 i160:8 i270:1 i319:2 i352:5 i487:1 i325:1 i249:2// 0 1 2 3 4 5 6 7String userID = tokens[0];for (int i = 1; i {//i100 A:i184,2//i100 B:u2778,1@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {Map mapA = new HashMap();Map mapB = new HashMap();for (Text line : values) {String val = line.toString();if(val.startsWith("A:")){String[] kv = Pattern.compile("[t,]").split(val.substring(2));try {mapA.put(kv[0],Integer.parseInt(kv[1]));}catch (Exception e){e.printStackTrace();}}else if(val.startsWith("B:")){String[] kv = Pattern.compile("[t,]").split(val.substring(2));try {//不同的用户对该商品的评价,key是用户mapB.put(kv[0],Integer.parseInt(kv[1]));}catch (Exception e){e.printStackTrace();}}}double result = 0;Iterator iter = mapA.keySet().iterator();while (iter.hasNext()){//同现矩阵中的某个关联商品String mapk = iter.next();//关联的商品名称//i184,2int num = mapA.get(mapk).intValue();Iterator iterb = mapB.keySet().iterator();while (iterb.hasNext()){//对于该商品不同用户的评分String mapkb = iterb.next();//用户IDint pref = mapB.get(mapkb).intValue();result = num*pref;Text k = new Text(mapkb);Text v = new Text(mapk + ","+result);context.write(k,v);}}}}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Pattern;
public class Step5 {private final static Text K = new Text();private final static Text V = new Text();public static boolean run(Configuration conf, Map paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step5");job.setJarByClass(Step5.class);job.setMapperClass(Step5_Mapper.class);job.setReducerClass(Step5_Reduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);FileInputFormat.addInputPath(job,new Path(paths.get("Step5Input")));Path output = new Path(paths.get("Step5Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step5_Mapper extends Mapper {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] tokens = Pattern.compile("[t,]").split(value.toString());Text k = new Text(tokens[0]);Text v = new Text(tokens[1]+","+tokens[2]);context.write(k,v);}}private static class Step5_Reduce extends Reducer {@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {Map map = new HashMap();for (Text line : values) {String[] tokens = line.toString().split(",");String itemID = tokens[0];Double source = Double.parseDouble(tokens[1]);if(map.containsKey(itemID)){map.put(itemID,map.get(itemID)+source);}else{map.put(itemID,source);}}Iterator iter = map.keySet().iterator();while (iter.hasNext()){String itemID = iter.next();double source = map.get(itemID);Text v = new Text(itemID+","+source);context.write(key,v);}}}
}package cn.sxt.itemcf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import java.util.regex.Pattern;
public class Step6 {private final static Text K = new Text();private final static Text V = new Text();public static boolean run(Configuration conf, Map paths) {try {FileSystem fs = FileSystem.get(conf);Job job = Job.getInstance(conf);job.setJobName("step6");job.setJarByClass(Step6.class);job.setMapperClass(Step6_Mapper.class);job.setReducerClass(Step6_Reduce.class);job.setSortComparatorClass(NumSort.class);job.setGroupingComparatorClass(UserGroup.class);job.setMapOutputKeyClass(PairWritable.class);job.setMapOutputValueClass(Text.class);FileInputFormat.addInputPath(job,new Path(paths.get("Step6Input")));Path output = new Path(paths.get("Step6Output"));if(fs.exists(output)){fs.delete(output,true);}FileOutputFormat.setOutputPath(job,output);boolean b = job.waitForCompletion(true);return b;} catch (Exception e) {e.printStackTrace();}return false;}private static class Step6_Mapper extends Mapper {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] tokens = Pattern.compile("[t,]").split(value.toString());String u = tokens[0];String item = tokens[1];String num = tokens[2];PairWritable k = new PairWritable();k.setUid(u);k.setNum(Double.parseDouble(num));V.set(item+":"+num);context.write(k,V);}}private static class Step6_Reduce extends Reducer {@Overrideprotected void reduce(PairWritable key, Iterable values, Context context) throws IOException, InterruptedException {int i = 0;StringBuffer sb = new StringBuffer();for (Text value : values) {if(i == 10){break;}sb.append(value.toString()+",");i++;}K.set(key.getUid());V.set(sb.toString());context.write(K,V);}}private static class NumSort extends WritableComparator {public NumSort() {super(PairWritable.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {PairWritable o1 = (PairWritable) a;PairWritable o2 = (PairWritable) b;int r = o1.getUid().compareTo(o2.getUid());if(r == 0){return -Double.compare(o1.getNum(),o2.getNum());}return r;}}private static class UserGroup extends WritableComparator {public UserGroup() {super(PairWritable.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {PairWritable o1 = (PairWritable) a;PairWritable o2 = (PairWritable) b;return o1.getUid().compareTo(o2.getUid());}}private static class PairWritable implements WritableComparable {private String uid;private double num;public String getUid() {return uid;}public void setUid(String uid) {this.uid = uid;}public double getNum() {return num;}public void setNum(double num) {this.num = num;}@Overridepublic int compareTo(PairWritable o) {int r = this.uid.compareTo(o.getUid());if(r == 0){return Double.compare(this.num,o.getNum());}return r;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(uid);out.writeDouble(num);}@Overridepublic void readFields(DataInput in) throws IOException {this.uid = in.readUTF();this.num = in.readDouble();}}
}