一、概述
1.是Hadoop中的一套分布式的计算框架
2.将整个计算过程拆分为2个阶段:Map阶段、Reduce阶段
3.Map阶段一般负责数据的整理,Reduce阶段负责数据的汇总
4.如果输入路径是一个文件,则MapReduce只处理这个文件;如果输入的是一个目录,则处理这个目录下的所有文件
--注意:如果文件以 _ 开头,则该文件会被跳过,在MapReduce中,_ 开头的文件被认为是隐藏文件不需要处理
5.Rduce中的迭代器采用的是地址复用机制
6.Reduce中的迭代器只能遍历一次
7.在MapReduce中,针对Reduce出去的结果文件内容,如果不指定,内容(键值)中的默认键值之间用\t进行分割的
9.在MapReduce中,如果需要实现的功能不需要Reduce,即Reduce没有业务逻辑,可以省略Reduce
二、序列化
1.在MapReduce中,要求被传输的数据必须能够被序列化
2.Hadoop中,序列化机制默认使用AVRO,但是Hadoop对AVRO的序列化机制进行了进一步的封装,提供了更简单的序列化机制
3.在Hadoop想要实现序列化,需要实现Writable,重新其中的方法
4.在Hadoop中序列化的时候,要求属性不能为null
序列化示例:
package com.apple.flow;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Flow implements Writable{
private String phone;
private String name;
private String addr;
//--在使用包装类,注意初始值的问题(null)
private int flow;
/*
* 序列化方法
* 如果是String,则调用writeUTF
* 其他的:WriteInt,WriteLong,WriteByte,WriteDouble等
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phone);
out.writeUTF(name);
out.writeUTF(addr);
out.writeInt(flow);
}
/*
* 反序列化一定要注意顺序,和序列化的顺序一致
*/
@Override
public void readFields(DataInput in) throws IOException {
this.phOne=in.readUTF();
this.name=in.readUTF();
this.addr=in.readUTF();
this.flow=in.readInt();
}
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phOne= phone;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAddr() {
return addr;
}
public void setAddr(String addr) {
this.addr = addr;
}
public int getFlow() {
return flow;
}
public void setFlow(int flow) {
this.flow = flow;
}
@Override
public String toString() {
return "Flow [phOne=" + phone + ", name=" + name + ", addr=" + addr + ", flow=" + flow + "]";
}
}
三、分区
1.分区在MapReduce中用于进行数据的分类
2.在MapReduce中,如果不指定,则默认只有1个分区
3.每一个分区都必须对应一个ReduceTask,每一ReduceTask都会产生一个结果文件
4.在MapReduce中对分区进行了编号,编码默认从0开始递增
5.分区的顶级父类是Partitioner
6.在MapReduce中,默认使用HashPartitioner
代码示例:
public class FlowPartitioner extends Partitioner{
@Override
public int getPartition(Text key, Flow value, int numPartitions) {
if(value.getAddr().equals("bj")){
return 0;
}
else if(value.getAddr().equals("sh")){
return 1;
}else{
return 2;
}
}
}
主类Driver入口
public class FlowDriver {
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Flow.class);
job.setOutputKeyClass(Flow.class);
job.setOutputValueClass(NullWritable.class);
//--有3个分区,故有3个ReduceTask
job.setNumReduceTasks(3);
//--设置自定义的分区组件。如果不设定,默认用的是HashPartitioner
//--默认的分区组件,会按Mapper输出key的hashcode分区,
//--确保相同的key落到同一个分区里
job.setPartitionerClass(FlowPartitioner.class);
FileInputFormat.setInputPaths(job,
new Path("hdfs://192.168.150.137:9000/flow"));
FileOutputFormat.setOutputPath(job,
new Path("hdfs://192.168.150.137:9000/flow/result"));
job.waitForCompletion(true);
}
}
Map类:
public class FlowMapper extends Mapper{
@Override
protected void map(LongWritable key, Text value, Mapper.Context context)
throws IOException, InterruptedException {
String line=value.toString();
Flow f=new Flow();
String[] info=line.split(" ");
f.setPhone(info[0]);
f.setName(info[1]);
f.setAddr(info[2]);
f.setFlow(Integer.parseInt(info[3]));
context.write(new Text(f.getName()),f);
}
}
Reduce类:
public class FlowReducer extends Reducer{
@Override
protected void reduce(Text key, Iterable values,
Reducer.Context context)
throws IOException, InterruptedException {
Flow result=new Flow();
for(Flow value:values){
result.setPhone(value.getPhone());
result.setName(value.getName());
result.setAddr(value.getAddr());
result.setFlow(result.getFlow()+value.getFlow());
}
context.write(result,NullWritable.get());
}
}
四、排序
1.在MapReduce中,会对键做自动的排序 - 自然排序
2.如果自定义一个类产生的对象想要作为键,那么这个对象必须要允许被排序 - 实现WritableComparable接口
3.多属性排序的场景称之为二次排序
Hadoop MapReduce