MapReduce分区和reduceTask的数量
1.MapReduce分区:相同key的数据发送到同一个reduce里面去。
mapTask处理的是文件切片filesplit。
注意:block的概念是在hdfs当中的,mapreduce当中,每一个mapTask处理的数据都是叫做一个文件切片。
暂时可以简单地认为,一个文件切片就是对应一个block块。还可以简单地认为,有多少个block块,就要启动多少个mapTask。
2.分区规则:
HashPartitioner的getPartition方法返回值是int类型的,每条数据都要进来计算一下数据的分区,然后给每条数据打上一个逻辑标识,计算每一条数据要去哪一个reduceTask里去。
逻辑编号:(key.hashCode() & Integer.MAX_VALUE) % numberReduceTasks
因为key.hashCode()有可能是负数,所以要&Integer.MAX_VALUE,这样就永远是一个正整数。&按位与。
numberReduceTasks指多少个reduceTask。
3.自定义分区的一个例子
需求&#xff1a;将开奖结果分为>15的一个文件&#xff0c;<15的一个文件。
开奖结果在partition.csv文件每一行数据的第六个字段
一个reduceTask对应产生一个文件。
k1: LongWritable, v1: Text
k2: Text, v2: NullWritable&#xff0c;即null
核心代码&#xff1a;自定义分区
String[] arrays &#61; k2.toString().split("\t");//因为数据是用\t进行切割的&#xff0c;所以用tab键做好格式
if(arrays[5] >&#61; 15){return 0;}//5为第六个字段&#xff0c;也就是开奖结果
else{return 1;}
k3: Text, v3: NullWritable&#xff0c;即null
代码&#xff1a;
第一步&#xff1a;定义我们的mapper
我们这里的mapper程序不做任何逻辑&#xff0c;也不对key&#xff0c;与value做任何改变&#xff0c;只是接收我们的数据&#xff0c;然后往下发送
package cn.itcast.mr.demo1.partition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PartitionMapper extends Mapper {
&#64;Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//定义我们k2 v2类型是 Text 和 NullWritable
context.write(value,NullWritable.get());
}
}
第二步&#xff1a;定义我们的reducer逻辑
我们的reducer也不做任何处理&#xff0c;将我们的数据原封不动的输出即可
package cn.itcast.mr.demo1.partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PartitionReducer extends Reducer {
&#64;Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
//reduce对数据不做任何处理&#xff0c;直接将我们的数据输出
context.write(key,NullWritable.get());
}
}
第三步&#xff1a;自定义partitioner
package cn.itcast.mr.demo1.partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class PartitionOwn extends Partitioner {
&#64;Override
public int getPartition(Text text, NullWritable nullWritable, int i) {
//自定义分区规则&#xff0c;将我们大于15的&#xff0c;分到一个reduceTask里面去&#xff0c;小于15的分到一个reduceTask里面去
String[] split &#61; text.toString().split("\t");
if(Integer.parseInt(split[5]) >&#61; 15){
return 0;
}else{
return 1;
}
}
}
第四步&#xff1a;程序main函数入口
package cn.itcast.mr.demo1.partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.net.URI;
public class PartitionMain extends Configured implements Tool {
&#64;Override
public int run(String[] args) throws Exception {
//获取job对象
Job job &#61; Job.getInstance(super.getConf(), "partition");
//如果程序需要打包运行&#xff0c;这一句必不可少
job.setJarByClass(PartitionMain.class);
//第一步读取文件&#xff0c;解析成key,value对
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/partition_in"));
//第二步&#xff1a;自定义map逻辑&#xff0c;接收k1 v1 转换成新的K2 v2 输出
job.setMapperClass(PartitionMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//第三步&#xff0c;分区&#xff0c;自定义分区规则&#xff0c;大于15的到一个分区号&#xff0c;小于15的到一个分区号
job.setPartitionerClass(PartitionOwn.class);
//第四步&#xff1a;排序 第五步&#xff1a;规约 第六步&#xff1a;分组&#xff0c;全部省掉
//第七步 自定义reduce逻辑&#xff0c;接收k2 v2 转换成新的k3 v3 输出
job.setReducerClass(PartitionReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
//手动设置reduceTask的个数
job.setNumReduceTasks(2);
/* FileSystem fileSystem &#61; FileSystem.get(new URI("hdfs://node01:8020"), super.getConf());
if(fileSystem.exists(new Path("hdfs://node01:8020/partition_out"))){
fileSystem.delete(new Path("hdfs://node01:8020/partition_out"),true);
}*/
TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/partition_out"));
boolean b &#61; job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run &#61; ToolRunner.run(new Configuration(), new PartitionMain(), args);
System.exit(run);
}
}