在此案例中,我们使用学生成绩实现分区功能(第三列为成绩):
[root@hadoop01 test_data]# hdfs dfs -mkdir /test_partition_input
[root@hadoop01 test_data]# hdfs dfs -put test_partiton.csv /test_partition_input
新建project:
4.0.0wyh.testtest_partition1.0-SNAPSHOT88jarorg.apache.hadoophadoop-common2.7.5org.apache.hadoophadoop-client2.7.5org.apache.hadoophadoop-hdfs2.7.5org.apache.hadoophadoop-mapreduce-client-core2.7.5junitjunitRELEASEorg.apache.maven.pluginsmaven-compiler-plugin3.11.8UTF-8org.apache.maven.pluginsmaven-shade-plugin2.4.3packageshadetrue
package wyh.test.partition;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** 四个泛型,分别是K1,V1,K2,V2的类型(要分区的字段必须包含在K2里),这里我们可以暂时将V2置空*/
public class PartitionMapper extends Mapper {/*** map(...)用于将K1,V1转为K2,V2,在我们的案例中,K2直接使用V1的值即可。* K1为行偏移量,V1为行数据* K2为V1的值,也即行数据* V2置空*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {context.write(value, NullWritable.get());//NullWritable.get()需要使用get()方法拿到NullWritable对应的对象}
}
package wyh.test.partition;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;/*** 两个泛型依次对应K2,V2的类型*/
public class PartitionPartitioner extends Partitioner {/*** 该方法用于定义分区的具体规则,并返回分区的编号* @param text - K2* @param nullWritable - V2* @param i - 分区个数* @return*/@Overridepublic int getPartition(Text text, NullWritable nullWritable, int i) {//获取原始行数据,并截取成绩值String[] split = text.toString().split(",");String gradeString = split[2];int grade=Integer.parseInt(gradeString);//定义分区规则if(grade > 90){return 1;//返回分区编号}else{return 0;}}
}
package wyh.test.partition;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** 四个泛型依次为K2,V2,K3,V3的类型* K2 - 行数据* V2 - 置空* K3 - 行数据(在我们的案例中此处的Reduce中不需要对数据进行处理),直接将数据进行传递即可。* V3 - 置空*/
public class PartitionReducer extends Reducer {@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}
}
package wyh.test.partition;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class PartitionJobMain extends Configured implements Tool {@Overridepublic int run(String[] strings) throws Exception {//创建job对象Job job = Job.getInstance(super.getConf(), "test_partition_job");//集群中必须配置job.setJarByClass(PartitionJobMain.class);//配置输入项job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job, new Path("hdfs://192.168.126.132:8020/test_partition_input"));//配置Mapjob.setMapperClass(PartitionMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//配置分区job.setPartitionerClass(PartitionPartitioner.class);//配置Reducejob.setReducerClass(PartitionReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job, new Path("hdfs://192.168.126.132/test_partition_output"));//设置Reduce Task个数(默认是1),Reduce Task个数也即分区个数job.setNumReduceTasks(2);//等待job执行状态返回值boolean status = job.waitForCompletion(true);//三目运算的结果会引用到main()方法里的runStatusreturn status?0:1;}public static void main(String[] args) throws Exception {Configuration configuration = new Configuration();//启动job,返回任务执行状态int runStatus = ToolRunner.run(configuration, new PartitionJobMain(), args);System.exit(runStatus);}
}
先clean,清除编译过程中产生的中间文件,然后package进行打包:
[root@hadoop01 test_jar]# hadoop jar test_partition-1.0-SNAPSHOT.jar wyh.test.partition.PartitionJobMain#最后面的值是主类的全路径
由于我们设置的是两个分区,所以这里就会生成两个分区文件:
在PartitionPartitioner类中我们指定了成绩大于90的进入1号分区,否则进入0号分区:
所以查看分区文件中0号文件得到的就是成绩<&#61;90的所有学生信息&#xff1a;
查看分区文件中1号文件得到的就是成绩>90的所有学生信息&#xff1a;
这样就简单地实现了MapReduce中分区的功能。