热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

MapReducer自定义bean-排序-分组和shuffle的过程

1.需求根据手机号码,查询该号码的上行,下行,总流量,并从高到低排序,并对手机号码根据省份分组13631579850661372623050300-FD-07-A4-72-B8:

1. 需求

根据手机号码,查询该号码的上行,下行,总流量,并从高到低排序,并对手机号码根据省份分组

1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
1363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	200
1363154400022 	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4			4	0	240	0	200
1363157993044 	18211575961	94-71-AC-CD-E6-18:CMCC-EASY	120.196.100.99	iface.qiyi.com	视频网站	15	12	1527	2106	200
1363157995074 	84138413	5C-0E-8B-8C-E8-20:7DaysInn	120.197.40.4	122.72.52.12		20	16	4116	1432	200
1363157993055 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
1363157995033 	15920133257	5C-0E-8B-C7-BA-20:CMCC	120.197.40.4	sug.so.360.cn	信息安全	20	20	3156	2936	200
1363157983019 	13719199419	68-A1-B7-03-07-B1:CMCC-EASY	120.196.100.82			4	0	240	0	200
1363157984041 	13660577991	5C-0E-8B-92-5C-20:CMCC-EASY	120.197.40.4	s19.cnzz.com	站点统计	24	9	6960	690	200
1363157973098 	15013685858	5C-0E-8B-C7-F7-90:CMCC	120.197.40.4	rank.ie.sogou.com	搜索引擎	28	27	3659	3538	200
1363157986029 	15989002119	E8-99-C4-4E-93-E0:CMCC-EASY	120.196.100.99	www.umeng.com	站点统计	3	3	1938	180	200
1363157992093 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			15	9	918	4938	200
1363157986041 	13480253104	5C-0E-8B-C7-FC-80:CMCC-EASY	120.197.40.4			3	3	180	180	200
1363157984040 	13602846565	5C-0E-8B-8B-B6-00:CMCC	120.197.40.4	2052.flash2-http.qq.com	综合门户	15	12	1938	2910	200
1363157995093 	13922314466	00-FD-07-A2-EC-BA:CMCC	120.196.100.82	img.qfc.cn		12	12	3008	3720	200
1363157982040 	13502468823	5C-0A-5B-6A-0B-D4:CMCC-EASY	120.196.100.99	y0.ifengimg.com	综合门户	57	102	7335	110349	200
1363157986072 	18320173382	84-25-DB-4F-10-1A:CMCC-EASY	120.196.100.99	input.shouji.sogou.com	搜索引擎	21	18	9531	2412	200
1363157990043 	13925057413	00-1F-64-E1-E6-9A:CMCC	120.196.100.55	t3.baidu.com	搜索引擎	69	63	11058	48243	200
1363157988072 	13760778710	00-FD-07-A4-7B-08:CMCC	120.196.100.82			2	2	120	120	200
1363157985066 	13726238888	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157993055 	13560436666	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200

自定义数据结构

这次我们使用MapReducer进行同一号码的上下行流量统计
首先是需要定义自己的数据结果,需要我们定义的数据实现Writable 接口,实现 序列化和反序列化的函数,这样MapReduer在数据传递过程中,才不会报错,也不会丢失数据

FlowBean.java 

package cn.itcast.hadoop.mr.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * FlowBean 是我们自定义的数据类型,要在hadoop的各个节点之间传输,应遵循hadoop的序列化机制
 * 就必须实现hadoop相应的序列化接口
 * */

public class FlowBean implements Writable {

    private String phoneNB;
    private long up_flow;
    private long d_flow;
    private long s_flow;

    // 在反序列化时,反射机制需要调用空参构造函数
    public FlowBean(){}

    public FlowBean(String phoneNB, long up_flow, long d_flow) {
        super();
        this.phOneNB= phoneNB;
        this.up_flow = up_flow;
        this.d_flow = d_flow;
    }

    public String getPhoneNB() {
        return phoneNB;
    }

    public void setPhoneNB(String phoneNB) {
        this.phOneNB= phoneNB;
    }

    public long getUp_flow() {
        return up_flow;
    }

    public void setUp_flow(long up_flow) {
        this.up_flow = up_flow;
    }

    public long getD_flow() {
        return d_flow;
    }

    public void setD_flow(long d_flow) {
        this.d_flow = d_flow;
    }

    /**
     * @return the s_flow
     */
    public long getS_flow() {
        return s_flow;
    }

    /**
     * @param s_flow the s_flow to set
     */
    public void setS_flow(long s_flow) {
        this.s_flow = s_flow;
    }


    @Override
    public String toString() {

        return ""+ phoneNB + "\t" + up_flow + "\t" + d_flow;
    }


    // 将对象数据序列化到流中
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(phoneNB);
        out.writeLong(up_flow);
        out.writeLong(d_flow);
        out.writeLong(s_flow);

    }

    // 从数据流中泛序列出对象的数据
    // 从数据流中读出对象字段时,必须跟序列化时的顺序保持一致
    @Override
    public void readFields(DataInput in) throws IOException {

        phoneNB = in.readUTF();
        up_flow = in.readLong();
        d_flow = in.readLong();
        s_flow = in.readLong();
    }
}

为了节省空间,就把map和reducer的类定义成了静态类

package cn.itcast.hadoop.mr.flowsum;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;


public class FlowSumRunner extends Configured implements Tool {

    public static class FlowSumMapper extends Mapper {
        //拿到日志中的数据,切分各个字段
        @Override
        protected void map(LongWritable key, Text value, Context context)
         throws IOException, InterruptedException{
        //拿一行数据
            String line = value.toString();

            String[] fileds = StringUtils.split(line, "\t");

            String phoneNB = fileds[1];
            long up_flow = Long.parseLong(fileds[7]);
            long d_flow = Long.parseLong(fileds[8]);
        //封装数据为kv并输出
            context.write(new Text(phoneNB), new FlowBean(phoneNB,up_flow, d_flow));
        }
    }


    public static class FlowSumReducer extends Reducer {

      //框架每传递一组数据<1387788654,{flowbean,flowbean,flowbean,flowbean.....}>调用一次我们的reduce方法
      //reduce中的业务逻辑就是遍历values,然后进行累加求和再输出

        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

            long up_flow_counter = 0;
            long d_flow_counter= 0;

            for (FlowBean value : values) {
                up_flow_counter += value.getUp_flow();
                d_flow_counter += value.getD_flow();
            }
            context.write(key, new FlowBean(key.toString(),up_flow_counter, d_flow_counter));
        }
    }

    @Override
    public int run(String[] args) throws Exception {

        Configuration conf = new Configuration(); 

        Job job = Job.getInstance(conf);

        job.setJarByClass(FlowSumRunner.class);

        job.setMapperClass(FlowSumMapper.class);
        job.setReducerClass(FlowSumReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));  //读取命令行参数
        FileOutputFormat.setOutputPath(job, new Path(args[1]));//读取命令行参数

        return job.waitForCompletion(true)?0:1;
    }

    public static void main(String[] args) throws Exception{

    //hadoop jar flow.jar cn.itcast.hadoop.mr.flowsum.FlowSumRunner /flow/data /flow/output

        int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
        System.exit(res);
    }
}

 

 

上行流量排序

如果不实用MapReducer默认的排序方式,使用自定义的方式,FlowBean需要实现Comparable接口,compareTo这个函数 Writable和Comparable合在一起 就是 WritableComparable接口

FlowBean.class

package cn.itcast.hadoop.mr.flowsum;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable {

    ...
    ...
    ...

    @Override
    public int compareTo(FlowBean o) {
        return this.getUp_flow() > o.getUp_flow() ? -1:1;
    }
}//如果对象本身的id大于传入的对象id,

//返回值是正数,就是升序排序

//返回值是负数,就是降序排序
因为MapReduce默认是索引 也就是 Key排序,所以我们也要相应的改一下Map和Reduer
package cn.itcast.hadoop.mr.flowsort;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import cn.itcast.hadoop.mr.flowsum.FlowBean;

public class SortMR {

    // NullWritable 如果什么也不想输出的话  使用NumWritable
    public static class SortMapper extends Mapper {

        @Override
        protected void map(LongWritable key, Text value, Context context )
        throws IOException, InterruptedException{
            String line = value.toString();

            String[] fields = StringUtils.split(line,"\t");

            String phoneNB = fields[0];
            long up_flow = Long.parseLong(fields[1]);
            long d_flow = Long.parseLong(fields[2]);


            context.write(new FlowBean(phoneNB, up_flow, d_flow), NullWritable.get());
        }
    }

    public static class SortReducer extends Reducer {

        @Override
        protected void reduce(FlowBean key, Iterable values, Context context)
                 throws IOException, InterruptedException

        {
            context.write(key, NullWritable.get());
        }
    }

    public static void main(String[] args) 
            throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration(); 

        Job job = Job.getInstance(conf);

        job.setJarByClass(SortMR.class);

        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(FlowBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));


        int result = job.waitForCompletion(true)?0:1;

        System.exit(result);
    }

}

输出结果:

13600217502 186852  200
13560439658 4938    200
84138413    4116    1432
13922314466 3008    3720
13726230503 2481    24681
13926435656 1512    200
13560439658 954 200
13480253104 180 200
13823070001 180 200
13760778710 120 200
13502468823 102 7335
13925057413 63  11058
15013685858 27  3659
15920133257 20  3156
18320173382 18  9531
18211575961 12  1527
13602846565 12  1938
13660577991 9   6960
15989002119 3   1938
13719199419 0   200
13826544101 0   200
13926251106 0   200

 

 

输出结果分组

截止到现在 我们看到的输出结果一直都只有一个文件

我们现在想 把 135,136,137,138,139和其他的号码区分开

这个时候实际就是把这不同部分的数据交给不同的Reducer去处理,然后各个Reducer输出各自的结果 把map的结果分发给不同的Reducer,需要进行一个分组的动作,hadoop默认都是一个组,所以默认只有一个Reducer,输出的也就只有一个文件了

现在我们实现Partitioner 这个类,可以按照我们自己的需求把数据进行分组

package cn.itcast.hadoop.mr.areapartition;

import java.util.HashMap;
import org.apache.hadoop.mapreduce.Partitioner;

public  class AreaPartitioner extends Partitioner {

    private static  HashMap areaMap = new HashMap<>();

    static {        //暂且先静态写死分组
        areaMap.put("135", 0);
        areaMap.put("136", 1);
        areaMap.put("137", 2);
        areaMap.put("138", 3);
        areaMap.put("139", 4);
    }

    @Override
    public int getPartition(KEY key, VALUE value, int numPartitions) {

        // 从key中拿到手机号,查询手机归属地字典, 不同省份返回不同的编组
        Integer code = areaMap.get(key.toString().substring(0,3));
        int areaCoder = code  ==  null ? 5:code;

        return areaCoder;
    }

}

然后需要在 job运行之前指定 ParrtitionerClass 类

 
job.setPartitionerClass(AreaPartitioner.class);
job.setNumReduceTasks(6); //然后设置启动Reducer的数量

// 这里需要注意的是  这里的数量必须大于等于 你设置的数据分组的数量  不然会进行报错  
//  如果不设置 默认为1  就不会分组, 所有数据就只有一个文件
// 如果设置的多了  多的文件里面不会有数据

结果:

hadoop jar flowarea.jar cn.itcast.hadoop.mr.areapartition.FlowSumArea /flow/data  /flow/areaoutput

[hadoop@hadoop1 ~]$ hadoop fs -ls /flow/areaoutput
Found 7 items
-rw-r--r--   1 hadoop supergroup          0 2018-03-18 13:57 /flow/areaoutput/_SUCCESS
-rw-r--r--   1 hadoop supergroup         66 2018-03-18 13:56 /flow/areaoutput/part-r-00000
-rw-r--r--   1 hadoop supergroup         98 2018-03-18 13:56 /flow/areaoutput/part-r-00001
-rw-r--r--   1 hadoop supergroup         97 2018-03-18 13:57 /flow/areaoutput/part-r-00002
-rw-r--r--   1 hadoop supergroup         62 2018-03-18 13:57 /flow/areaoutput/part-r-00003
-rw-r--r--   1 hadoop supergroup        130 2018-03-18 13:57 /flow/areaoutput/part-r-00004
-rw-r--r--   1 hadoop supergroup        219 2018-03-18 13:57 /flow/areaoutput/part-r-00005

[hadoop@hadoop1 ~]$ hadoop fs -cat /flow/areaoutput/part-r-00000
13502468823 13502468823 102 7335
13560439658 13560439658 5892    400

2. MRAppMaster的Shuffer的过程

shuffle的主要工作是从Map结束到Reduce开始之间的过程。

 

一、Map端的shuffle

  Map端会处理输入数据并产生中间结果,这个中间结果会写到本地磁盘,而不是HDFS。每个Map的输出会先写到内存缓冲区中,当写入的数据达到设定的阈值时,系统将会启动一个线程将缓冲区的数据写到磁盘,这个过程叫做spill。

  在spill写入之前,会先进行二次排序,首先根据数据所属的partition进行排序,然后每个partition中的数据再按key来排序。partition的目是将记录划分到不同的Reducer上去,以期望能够达到负载均衡,以后的Reducer就会根据partition来读取自己对应的数据。接着运行combiner(如果设置了的话),combiner的本质也是一个Reducer,其目的是对将要写入到磁盘上的文件先进行一次处理,这样,写入到磁盘的数据量就会减少。最后将数据写到本地磁盘产生spill文件(spill文件保存在{mapred.local.dir}指定的目录中,Map任务结束后就会被删除)。

  最后,每个Map任务可能产生多个spill文件,在每个Map任务完成前,会通过多路归并算法将这些spill文件归并成一个文件。至此,Map的shuffle过程就结束了。

  二、Reduce端的shuffle

  Reduce端的shuffle主要包括三个阶段,copy、sort(merge)和reduce。

  首先要将Map端产生的输出文件拷贝到Reduce端,但每个Reducer如何知道自己应该处理哪些数据呢?因为Map端进行partition的时候,实际上就相当于指定了每个Reducer要处理的数据(partition就对应了Reducer),所以Reducer在拷贝数据的时候只需拷贝与自己对应的partition中的数据即可。每个Reducer会处理一个或者多个partition,但需要先将自己对应的partition中的数据从每个Map的输出结果中拷贝过来。

  接下来就是sort阶段,也成为merge阶段,因为这个阶段的主要工作是执行了归并排序。从Map端拷贝到Reduce端的数据都是有序的,所以很适合归并排序。最终在Reduce端生成一个较大的文件作为Reduce的输入。

  最后就是Reduce过程了,在这个过程中产生了最终的输出结果,并将其写到HDFS上。

 

  现在来总结一下shuffle过程,我画了张图,希望能够帮助理解。

3. split切片机制

block:block是物理切块,在文件上传到HDFS文件系统后,对大文将以每128MB的大小切分若干,存放在不同的DataNode上;

2、split:split是逻辑切片,在mapreduce中的map task开始之前,将文件按照指定的大小切割成若干个部分,每一部分称为一个split,默认是split的大小与block的大小相等,均为128MB。

注意:在hadoop1.x版本中,block默认的大小为64MB,在hadoop2.x版本修改成了128MB。



                        
                        
                         
推荐阅读
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 对于开源的东东,尤其是刚出来不久,我认为最好的学习方式就是能够看源代码和doc,測试它的样例为了方便查看源代码,关联导入源代 ... [详细]
  • 【shell】网络处理:判断IP是否在网段、两个ip是否同网段、IP地址范围、网段包含关系
    本文介绍了使用shell脚本判断IP是否在同一网段、判断IP地址是否在某个范围内、计算IP地址范围、判断网段之间的包含关系的方法和原理。通过对IP和掩码进行与计算,可以判断两个IP是否在同一网段。同时,还提供了一段用于验证IP地址的正则表达式和判断特殊IP地址的方法。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • 本文介绍了在sqoop1.4.*版本中,如何实现自定义分隔符的方法及步骤。通过修改sqoop生成的java文件,并重新编译,可以满足实际开发中对分隔符的需求。具体步骤包括修改java文件中的一行代码,重新编译所需的hadoop包等。详细步骤和编译方法在本文中都有详细说明。 ... [详细]
  • MR程序的几种提交运行模式本地模型运行1在windows的eclipse里面直接运行main方法,就会将job提交给本地执行器localjobrunner执行-- ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 深入理解CSS中的margin属性及其应用场景
    本文主要介绍了CSS中的margin属性及其应用场景,包括垂直外边距合并、padding的使用时机、行内替换元素与费替换元素的区别、margin的基线、盒子的物理大小、显示大小、逻辑大小等知识点。通过深入理解这些概念,读者可以更好地掌握margin的用法和原理。同时,文中提供了一些相关的文档和规范供读者参考。 ... [详细]
  • 学习Java异常处理之throws之抛出并捕获异常(9)
    任务描述本关任务:在main方法之外创建任意一个方法接收给定的两个字符串,把第二个字符串的长度减1生成一个整数值,输出第一个字符串长度是 ... [详细]
  • 本文介绍了利用ARMA模型对平稳非白噪声序列进行建模的步骤及代码实现。首先对观察值序列进行样本自相关系数和样本偏自相关系数的计算,然后根据这些系数的性质选择适当的ARMA模型进行拟合,并估计模型中的位置参数。接着进行模型的有效性检验,如果不通过则重新选择模型再拟合,如果通过则进行模型优化。最后利用拟合模型预测序列的未来走势。文章还介绍了绘制时序图、平稳性检验、白噪声检验、确定ARMA阶数和预测未来走势的代码实现。 ... [详细]
  • Maven构建Hadoop,
    Maven构建Hadoop工程阅读目录序Maven安装构建示例下载系列索引 序  上一篇,我们编写了第一个MapReduce,并且成功的运行了Job,Hadoop1.x是通过ant ... [详细]
  • Hadoop源码解析1Hadoop工程包架构解析
    1 Hadoop中各工程包依赖简述   Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。   GoogleCluster:ht ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
author-avatar
手机用户2502870457
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有