数据来源
基于Hadoop序列化小例子[统计手机的上行流量,下行流量,总流量]
MapReduce排序
在Shuffle阶段,在分区之后,排序时根据key值默认按字典顺序进行排序,排序方法为快速排序。
基于上述案例的自定义排序原理
FlowBean对象做为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。
相关代码
FlowBean.java
package MapReduceFlow;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class FlowBean implements WritableComparable {private long upFlow;private long downFlow;private long sumFlow;public FlowBean() {}public FlowBean(long upFlow, long downFlow){this.upFlow &#61; upFlow;this.downFlow &#61; downFlow;this.sumFlow &#61; this.upFlow &#43; this.downFlow;}&#64;Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(this.upFlow);dataOutput.writeLong(this.downFlow);dataOutput.writeLong(this.sumFlow);}&#64;Overridepublic void readFields(DataInput dataInput) throws IOException {this.upFlow &#61; dataInput.readLong();this.downFlow &#61; dataInput.readLong();this.sumFlow &#61; dataInput.readLong();}&#64;Overridepublic int compareTo(Object o) {FlowBean fb &#61; (FlowBean)o;if(this.sumFlow < fb.sumFlow){return -1;}else if(this.sumFlow &#61;&#61; fb.sumFlow){return 0;}else{return 1;}}&#64;Overridepublic String toString() {return upFlow &#43; "\t" &#43; downFlow &#43; "\t" &#43; sumFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow &#61; upFlow;this.sumFlow &#61; this.upFlow &#43; this.downFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow &#61; downFlow;this.sumFlow &#61; this.upFlow &#43; this.downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow &#61; sumFlow;}
}
MyMapper.java
package MapReduceFlow;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;import java.io.IOException;public class MyMapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, FlowBean, Text> {protected Text t &#61; new Text();FlowBean fb &#61; new FlowBean();&#64;Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException,NumberFormatException {String str &#61; value.toString();String[] words &#61; str.split("\t");String telNum &#61; words[0];fb.setUpFlow(Long.parseLong(words[2]));fb.setDownFlow(Long.parseLong(words[3]));t.set(telNum);context.write(fb,t);}
}
MyReducer.java
package MapReduceFlow;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MyReducer extends Reducer<FlowBean,Text,Text, FlowBean> {&#64;Overrideprotected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for(Text t : values){context.write(t,key);}}
}
MyDriver.java
package MapReduceFlow;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class MyDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf &#61; new Configuration();Job job &#61; Job.getInstance(conf);job.setJarByClass(MyDriver.class);job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job,new Path("F:\\Codes\\JavaCodes\\MapReduceLearning\\testdata\\flowbean.txt"));FileOutputFormat.setOutputPath(job,new Path("F:\\Codes\\JavaCodes\\MapReduceLearning\\testdata\\output"));boolean ret &#61; job.waitForCompletion(true);System.exit(ret ? 0 : 1);}
}
结果