王五 李四 小丽 小玲
小丽 王五 赵六
李四 王五 张三 赵六 小红
小玲 王五 张三 赵六
张三 小玲 李四 赵六
赵六 小丽 小玲 张三 李四 小红
小红 李四 赵六
1 import org.apache.hadoop.conf.Configuration;
2 import org.apache.hadoop.fs.FileSystem;
3 import org.apache.hadoop.fs.Path;
4 import org.apache.hadoop.io.IntWritable;
5 import org.apache.hadoop.io.LongWritable;
6 import org.apache.hadoop.io.Text;
7 import org.apache.hadoop.mapreduce.Job;
8 import org.apache.hadoop.mapreduce.Mapper;
9 import org.apache.hadoop.mapreduce.Reducer;
10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12
13 import java.io.IOException;
14
15 /**
16 * 好友推荐:
17 * 计算两个非好友的推荐值,就是两个非好友的共同好友数
18 *
19 * Created by Edward on 2016/7/12.
20 */
21 public class RunJob {
22
23
24 public static void main(String[] args){
25
26 System.setProperty("HADOOP_USER_NAME", "root");
27
28 Configuration cOnf= new Configuration();
29
30 conf.set("fs.defaultFS", "hdfs://node1:8020");
31
32 try {
33 FileSystem fs = FileSystem.get(conf);
34
35 Job job = Job.getInstance(conf);
36 job.setJarByClass(RunJob.class);
37 job.setMapperClass(MyMapper.class);
38 job.setReducerClass(MyReducer.class);
39
40 //需要指定 map out 的 key 和 value
41 job.setOutputKeyClass(Text.class);
42 job.setOutputValueClass(IntWritable.class);
43
44 FileInputFormat.addInputPath(job, new Path("/test/friend/input"));
45
46 Path path = new Path("/test/friend/output");
47 if(fs.exists(path))//如果目录存在,则删除目录
48 {
49 fs.delete(path,true);
50 }
51 FileOutputFormat.setOutputPath(job, path);
52
53 boolean b = job.waitForCompletion(true);
54 if(b)
55 {
56 System.out.println("OK");
57 }
58
59 } catch (Exception e) {
60 e.printStackTrace();
61 }
62
63 }
64
65 public static class MyMapper extends Mapper {
66
67 @Override
68 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
69
70 String[] str = value.toString().split("\t");
71
72 for(int i=1; i) {
73 //a 的好友是 b
74 context.write(new Text(str[0] + ":" + str[i]), new IntWritable(0));
75 //b 的好友是 a
76 context.write(new Text(str[i] + ":" + str[0]), new IntWritable(0));
77 for (int j = i + 1; j ) {
78 // A 的推荐好友是 B
79 context.write(new Text(str[i] + ":" + str[j]), new IntWritable(1));
80 // B 的推荐好友是 A
81 context.write(new Text(str[j] + ":" + str[i]), new IntWritable(1));
82 }
83 }
84 }
85 }
86
87 public static class MyReducer extends Reducer {
88
89 @Override
90 protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
91
92 int sum = 0;
93 for(IntWritable i: values)
94 {
95 if(i.get() == 0) {//两个人已经是好朋友的,排除在外
96 sum = 0;
97 //break;
98 return;
99 }
100 sum += i.get();
101 }
102 context.write(key, new IntWritable(sum));
103 }
104 }
105 }
小丽:小玲 2
小丽:小红 1
小丽:张三 1
小丽:李四 2
小玲:小丽 2
小玲:小红 1
小玲:李四 3
小红:小丽 1
小红:小玲 1
小红:张三 2
小红:王五 1
张三:小丽 1
张三:小红 2
张三:王五 2
李四:小丽 2
李四:小玲 3
王五:小红 1
王五:张三 2
王五:赵六 3
赵六:王五 3