1:shuffle机制
shuffle是处于Map方法之后,Reduce方法之前的数据处理过程。
2:Partition分区
1:要求将统计结果按照条件输出到不同的文件中(分区)。将统计结果按照手机归属地不同省份输出到不同文件中(分区)。
2:默认的Partition分区
public class HashPartitioner extends Partitioner {public int getPartition(K key, V value, int numReduceTasks) {// 相当于key的hashcode值与numReduceTasks取余得到。return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}
}
3:自定义Partitioner步骤
# 1.自定义类继承Partitioner,重写getPartition方法
public class CustomPartitioner extends Partitioner {@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {// 控制分区代码逻辑… …return partition;}
}# 2.在job驱动中,设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);# 3.自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);
3:自定义分区案例
1:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
2:输入的数据phone_data.txt
3:期望输出数据:136、137、138、139开头都放到一个独立的4个文件中,其他开头的放在一个文件中。
4:136对应分区0,137对应分区1,138对应分区2,139对应分区3,其他对应分区4
5:Driver类中,// 指定自定义数据分区 job.setPartitionerClass(ProvincePartitioner. class);
// 同时指定相应数量的reduceTask job.setNumReduceTasks(5);
6:代码
public class ProvincePartitioner extends Partitioner {@Overridepublic int getPartition(Text text, FlowBean flowBean, int numPartitions) {//获取手机号前三位 prePhoneString phone = text.toString();String prePhone = phone.substring(0, 3);//定义一个分区号变量 partition,根据 prePhone 设置分区号int partition;if("136".equals(prePhone)){partition = 0;}else if("137".equals(prePhone)){partition = 1;}else if("138".equals(prePhone)){partition = 2;}else if("139".equals(prePhone)){partition = 3;}else {partition = 4;}// 上卖弄partition参数的设置,需要连续,不能有空//最后返回分区号 partitionreturn partition;}
}# 在driver类中添加如下代码
//8 指定自定义分区器
job.setPartitionerClass(ProvincePartitioner.class);
//9 同时指定相应数量的 ReduceTask// 这个数值的设置,设置2-4都会报IO异常,设置5走自定义类,设置6会多出来一个空文件,设置1只有会有个分区文件
job.setNumReduceTasks(5);
4:排序
MapTask:一次快排,在环形缓冲区溢写前 ,对key的索引按照字典的顺序(a、b、c...)进行排序,进行分区内的快排,一次归并,对溢写文件进行归并,溢写文件是区内有序,因为快排,归并排序是把所有溢写文件中同属于一个分区的所有数据进行排序,使得整个分区一内排好顺序。
ReduceTask:一次归并排序。例如Reduce Task1只负责partition0内的数据,但是MapTask1和MapTask2都有partition0的数据,所以ReduceTask1会主动把这两个MapTask内的partition0数据进行拉取,然后进行归并排序,排成只有一个文件。
一定要进行排序,因为最后到Reducer的时候,是相同key为一组进行操作的,事先排好序的话,效率会比较高。
1:部分排序。MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部有序
2:全排序。最终输出结果只有一个文件,且文件内部有序。
3:二次排序。在自定义排序中,如果compareTo中判断条件为两个即为二次排序。
5:全排序案例以及二次排序案例(使用之前的序列化案例,二次排序是在总流量相同的情况下,设置上行流量按照升序的方式排列,在compareTo方法里面设置就好)
# 根据手机总流量进行降序
# 输入数据,既然对流量进行排序,就把流量认为key
137 2481 24681 27162
138 264 0 264
139 132 1512 1644
135 7355 110349 117684# 输出数据
135 7355 110349 117684
137 2481 24681 27162
139 132 1512 1644
138 264 0 264 # FlowBean实现WritableCompareable接口重写compareTo方法
# public class FlowBean implements WritableComparable
@Override
public int compareTo(FlowBean o) {// 倒序排列,按照总流量从大到小if(this.sumFlow > o.getSunFlow){return -1;}else if(this.sunFlow o.upFlow){return 1}else if(this.upFlow public class FlowMapper extends Mapper {private FlowBean outK = new FlowBean();private Text outV = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {//1 获取一行数据String line = value.toString();//2 按照"\t",切割数据String[] split = line.split("\t");//3 封装 outK outVoutK.setUpFlow(Long.parseLong(split[1]));outK.setDownFlow(Long.parseLong(split[2]));outK.setSumFlow();outV.set(split[0]);//4 写出 outK outVcontext.write(outK,outV);}
}
# Reducer类
// 循环输出,避免总流量相同情况
public class FlowReducer extends Reducer {@Overrideprotected void reduce(FlowBean key, Iterable values, Context
context) throws IOException, InterruptedException {//遍历 values 集合,循环写出,避免总流量相同的情况for (Text value : values) {//调换 KV 位置,反向写出context.write(value,key);}}
}#driver类
//4 修改设置 Map 端输出数据的 KV 类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);
6:区排序(分区加排序的共同使用,分区类也要注意key和value是什么)
# 根据上面案例的输出情况,把136、137、138等等开头的手机号放在不同的文件夹中,同时各个文件中也 # 按照总流量进行排序,这样其实就是分区加排序的综合使用# 分区类
public class ProvincePartitioner2 extends Partitioner {@Overridepublic int getPartition(FlowBean flowBean, Text text, int numPartitions) {//获取手机号前三位String phone = text.toString();String prePhone = phone.substring(0, 3);//定义一个分区号变量 partition,根据 prePhone 设置分区号int partition;if("136".equals(prePhone)){partition = 0;}else if("137".equals(prePhone)){partition = 1;}else if("138".equals(prePhone)){partition = 2;}else if("139".equals(prePhone)){partition = 3;}else {partition = 4;}//最后返回分区号 partitionreturn partition;}
}# 在driver类添加,进行绑定
job.setPartitionerClass(ProvincePartitioner2.class);
// 设置对应的 ReduceTask 的个数
job.setNumReduceTasks(5);
7:Combiner合并
1:Combiner可有可无,但是为了考虑效率问题,一般需要使用
2:Combiner父类是Reducer
3:Combiner和Reducer区别在于运行的位置:Combiner只负责一个MapTask,Reducer负责全局所有的MapTask
4:Combiner意思是对每一个MapTask的输出进行局部汇总,减少传输量。
5:Combiner可以用的前提是不能影响最终的业务逻辑(算平均值不可,求和可以),且Combiner的输出kv要和Reducer的输入kv对应起来。
Mapper Reducer
3 5 7 -> (3+5+7)/3=5 (3+5+7+2+6)/5=23/5 不等于 (5+4)/2=9/2
2 6 -> (2+6)/2=4-> 3+5+7=15 (15+8)=23 等于 (3+5+7+2+6)=23-> 2+6=8
8:Combiner案例 (wordcount案例基础之上)
# 输入数据 希望在mapper阶段即可得到这个
banzhang ni hao
xihuan hadoop
banzhang
banzhang ni hao
xihuan hadoop
banzhang# 期望结果,在日志文件中可以看到combine input records = 12 combine output records = 5
# 方案一:新建WordCountCombiner类
public class WordCountCombiner extends Reducer {private IntWritable outV = new IntWritable();@Overrideprotected void reduce(Text key, Iterable values, Context
context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}//封装 outKVoutV.set(sum);//写出 outKVcontext.write(key,outV);}
}// 指定需要使用 combiner,以及用哪个类作为 combiner 的逻辑
job.setCombinerClass(WordCountCombiner.class);
# 如果设置 job.setNumReduceTasks(0),即取消Reduce阶段,那么shuffle整个都不会执行,因为
# shuffle是连接map和reduce的,所以combiner也不会执行。 # 方案二:因为上面类的代码和Reducer代码一样
// 指定需要使用 Combiner,以及用哪个类作为 Combiner 的逻辑
job.setCombinerClass(WordCountReducer.class);