热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

MapReduce分区和reduceTask的数量

MapReduce分区和reduceTask的数量1.MapReduce分区:相同key的数据发送到同一个reduce里面去。mapTask处理的是文件切片file

MapReduce分区和reduceTask的数量

1.MapReduce分区:相同key的数据发送到同一个reduce里面去。
mapTask处理的是文件切片filesplit。
注意:block的概念是在hdfs当中的,mapreduce当中,每一个mapTask处理的数据都是叫做一个文件切片。
暂时可以简单地认为,一个文件切片就是对应一个block块。还可以简单地认为,有多少个block块,就要启动多少个mapTask。

2.分区规则:
HashPartitioner的getPartition方法返回值是int类型的,每条数据都要进来计算一下数据的分区,然后给每条数据打上一个逻辑标识,计算每一条数据要去哪一个reduceTask里去。
逻辑编号:(key.hashCode() & Integer.MAX_VALUE) % numberReduceTasks
因为key.hashCode()有可能是负数,所以要&Integer.MAX_VALUE,这样就永远是一个正整数。&按位与。
numberReduceTasks指多少个reduceTask。

 

3.自定义分区的一个例子

需求&#xff1a;将开奖结果分为>15的一个文件&#xff0c;<15的一个文件。
开奖结果在partition.csv文件每一行数据的第六个字段

一个reduceTask对应产生一个文件。
k1: LongWritable, v1: Text
k2: Text, v2: NullWritable&#xff0c;即null
核心代码&#xff1a;自定义分区
String[] arrays &#61; k2.toString().split("\t");//因为数据是用\t进行切割的&#xff0c;所以用tab键做好格式
if(arrays[5] >&#61; 15){return 0;}//5为第六个字段&#xff0c;也就是开奖结果
else{return 1;}

k3: Text, v3: NullWritable&#xff0c;即null


代码&#xff1a;
第一步&#xff1a;定义我们的mapper
我们这里的mapper程序不做任何逻辑&#xff0c;也不对key&#xff0c;与value做任何改变&#xff0c;只是接收我们的数据&#xff0c;然后往下发送
package cn.itcast.mr.demo1.partition;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class PartitionMapper extends Mapper {

    &#64;Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //定义我们k2   v2类型是  Text  和 NullWritable
        context.write(value,NullWritable.get());
    }
}


第二步&#xff1a;定义我们的reducer逻辑
我们的reducer也不做任何处理&#xff0c;将我们的数据原封不动的输出即可
package cn.itcast.mr.demo1.partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class PartitionReducer extends Reducer {
    &#64;Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        //reduce对数据不做任何处理&#xff0c;直接将我们的数据输出
        context.write(key,NullWritable.get());
    }
}

第三步&#xff1a;自定义partitioner
package cn.itcast.mr.demo1.partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class PartitionOwn extends Partitioner {
    &#64;Override
    public int getPartition(Text text, NullWritable nullWritable, int i) {
       //自定义分区规则&#xff0c;将我们大于15的&#xff0c;分到一个reduceTask里面去&#xff0c;小于15的分到一个reduceTask里面去
        String[] split &#61; text.toString().split("\t");
        if(Integer.parseInt(split[5]) >&#61; 15){
            return 0;
        }else{

            return 1;
        }

    }
}

第四步&#xff1a;程序main函数入口
package cn.itcast.mr.demo1.partition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

public class PartitionMain extends Configured implements Tool {
    &#64;Override
    public int run(String[] args) throws Exception {
        //获取job对象
        Job job &#61; Job.getInstance(super.getConf(), "partition");
        //如果程序需要打包运行&#xff0c;这一句必不可少
        job.setJarByClass(PartitionMain.class);

        //第一步读取文件&#xff0c;解析成key,value对
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/partition_in"));

        //第二步&#xff1a;自定义map逻辑&#xff0c;接收k1   v1  转换成新的K2   v2 输出
        job.setMapperClass(PartitionMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        //第三步&#xff0c;分区&#xff0c;自定义分区规则&#xff0c;大于15的到一个分区号&#xff0c;小于15的到一个分区号
        job.setPartitionerClass(PartitionOwn.class);
        //第四步&#xff1a;排序  第五步&#xff1a;规约 第六步&#xff1a;分组&#xff0c;全部省掉

        //第七步 自定义reduce逻辑&#xff0c;接收k2   v2  转换成新的k3   v3 输出

        job.setReducerClass(PartitionReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setOutputFormatClass(TextOutputFormat.class);

        //手动设置reduceTask的个数
        job.setNumReduceTasks(2);

       /* FileSystem fileSystem &#61; FileSystem.get(new URI("hdfs://node01:8020"), super.getConf());
        if(fileSystem.exists(new Path("hdfs://node01:8020/partition_out"))){
            fileSystem.delete(new Path("hdfs://node01:8020/partition_out"),true);
        }*/

        TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/partition_out"));


        boolean b &#61; job.waitForCompletion(true);


        return b?0:1;
    }


    public static void main(String[] args) throws Exception {
        int run &#61; ToolRunner.run(new Configuration(), new PartitionMain(), args);
        System.exit(run);
    }


}


推荐阅读
author-avatar
假装坚持-我很不爽_547
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有