热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Writable接口与序列化机制

序列化概念序列化(Serialization)是指把结构化对象转化为字节流。反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。Java序列化(j

序列化概念



  • 序列化(Serialization)是指把结构化对象转化为字节流。

  • 反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象。

  • Java序列化(java.io.Serializable)


Hadoop序列化的特点



  • 序列化格式特点:


    • 紧凑:高效使用存储空间。

    • 快速:读写数据的额外开销小

    • 可扩展:可透明地读取老格式的数据

    • 互操作:支持多语言的交互



  • Hadoop的序列化格式:Writable


Hadoop序列化的作用



  • 序列化在分布式环境的两大作用:进程间通信,永久存储。

  • Hadoop节点间通信。



Writable接口

Writable接口, 是根据 DataInput 和 DataOutput 实现的简单、有效的序列化对象.MR的任意Key和Value必须实现Writable接口.



MR的任意key必须实现WritableComparable接口

常用的Writable实现类

Text一般认为它等价于java.lang.String的Writable。针对UTF-8序列。


例:


Text test = new Text("test");
IntWritable One= new IntWritable(1);


自定义Writable类



  • Writable


    • write 是把每个对象序列化到输出流

    • readFields是把输入流字节反序列化




  • 实现WritableComparable.

  • Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法

     




MapReduce输入的处理类



  • 1、FileInputFormat:


    • FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。



  • 2、InputFormat:


  •  



    • InputFormat 负责处理MR的输入部分.有三个作用:


      • 验证作业的输入是否规范.

      • 把输入文件切分成InputSplit.

      • 提供RecordReader 的实现类,把InputSplit读到Mapper中进行处理.





  • 3、InputSplit:


    • 在执行mapreduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(key-value对),map会依次处理每一个记录。

    • FileInputFormat只划分比HDFS block大的文件,所以FileInputFormat划分的结果是这个文件或者是这个文件中的一部分.

    • 如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。

    • 当Hadoop处理很多小文件(文件大小小于hdfs block大小)的时候,由于FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当做一个split并分配一个map任务,导致效率底下。

    • 例如:一个1G的文件,会被划分成16个64MB的split,并分配16个map任务处理,而10000个100kb的文件会被10000个map任务处理。 





  • 4、TextInputFormat:


    • TextInputformat是默认的处理类,处理普通文本文件。

    • 文件中每一行作为一个记录,他将每一行在文件中的起始偏移量作为key,每一行的内容作为value。

    • 默认以\n或回车键作为一行记录。

    • TextInputFormat继承了FileInputFormat。




InputFormat类的层次结构


其他输入类



  • 1、CombineFileInputFormat


    • 相对于大量的小文件来说,hadoop更合适处理少量的大文件。

    • CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的。



  • 2、KeyValueTextInputFormat


    • 当输入数据的每一行是两列,并用tab分离的形式的时候,KeyValueTextInputformat处理这种格式的文件非常适合。



  • 3、NLineInputformat


    • NLineInputformat可以控制在每个split中数据的行数。



  • 4、SequenceFileInputformat


    • 当输入文件格式是sequencefile的时候,要使用SequenceFileInputformat作为输入。




自定义输入格式



  • 1、继承FileInputFormat基类。

  • 2、重写里面的getSplits(JobContext context)方法。

  • 3、重写createRecordReader(InputSplit split,TaskAttemptContext context)方法。


Hadoop的输出



  • 1、TextOutputformat


    • 默认的输出格式,key和value中间值用tab隔开的。



  • 2、SequenceFileOutputformat


    • 将key和value以sequencefile格式输出。



  • 3、SequenceFileAsOutputFormat


    • 将key和value以原始二进制的格式输出。



  • 4、MapFileOutputFormat


    • 将key和value写入MapFile中。由于MapFile中的key是有序的,所以写入的时候必须保证记录是按key值顺序写入的。



  • 5、MultipleOutputFormat


    • 默认情况下一个reducer会产生一个输出,但是有些时候我们想一个reducer产生多个输出,MultipleOutputFormat和MultipleOutputs可以实现这个功能。




案例实现:



数据


      136315798506613726230503248124681200
      1363157995052138265441012640200
      1363157991076139264356561321512200
      1363154400022139262511062400200
      13631579930441821157596115272106200
      13631579950748413841341161432200
      1363157993055135604396581116954200
      13631579950331592013325731562936200
      1363157983019137191994192400200
      1363157984041136605779916960690200
      13631579730981501368585836593538200
      1363157986029159890021191938180200
      1363157992093135604396589184938200
      136315798604113480253104180180200
      13631579840401360284656519382910200

      13726230503248124681sum


DataBean类


    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    import org.apache.hadoop.io.Writable;

    public class DataBean implements Writable{

      //电话号码
      private String phone;
      //上行流量
      private Long upPayLoad;
      //下行流量
      private Long downPayLoad;
      //总流量
      private Long totalPayLoad;

      public DataBean(){}

      public DataBean(String phone,Long upPayLoad, Long downPayLoad) {
          super();
          this.phOne=phone;
          this.upPayLoad = upPayLoad;
          this.downPayLoad = downPayLoad;
          this.totalPayLoad=upPayLoad+downPayLoad;
      }

      /**
        * 序列化
        * 注意:序列化和反序列化的顺序和类型必须一致
        */
      @Override
      public void write(DataOutput out) throws IOException {
          // TODO Auto-generated method stub
          out.writeUTF(phone);
          out.writeLong(upPayLoad);
          out.writeLong(downPayLoad);
          out.writeLong(totalPayLoad);
      }

      /**
        * 反序列化
        */
      @Override
      public void readFields(DataInput in) throws IOException {
          // TODO Auto-generated method stub
          this.phOne=in.readUTF();
          this.upPayLoad=in.readLong();
          this.downPayLoad=in.readLong();
          this.totalPayLoad=in.readLong();
      }

      @Override
      public String toString() {
          return upPayLoad +"\t"+ downPayLoad +"\t"+ totalPayLoad;
      }

      public String getPhone() {
          return phone;
      }

      public void setPhone(String phone) {
          this.phOne= phone;
      }

      public Long getUpPayLoad() {
          return upPayLoad;
      }

      public void setUpPayLoad(Long upPayLoad) {
          this.upPayLoad = upPayLoad;
      }

      public Long getDownPayLoad() {
          return downPayLoad;
      }

      public void setDownPayLoad(Long downPayLoad) {
          this.downPayLoad = downPayLoad;
      }

      public Long getTotalPayLoad() {
          return totalPayLoad;
      }

      public void setTotalPayLoad(Long totalPayLoad) {
          this.totalPayLoad = totalPayLoad;
      }
    }


DataCount类


    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class DataCount {

      public static void main(String[] args) throws IOException, ClassNotFoundException,
      InterruptedException {
          // TODO Auto-generated method stub
          Job job=Job.getInstance(new Configuration());

          job.setJarByClass(DataCount.class);

          job.setMapperClass(DataCountMapper.class);
          job.setMapOutputKeyClass(Text.class);
          job.setMapOutputValueClass(DataBean.class);
          FileInputFormat.setInputPaths(job, args[0]);

          job.setReducerClass(DataCountReducer.class);
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(DataBean.class);
          FileOutputFormat.setOutputPath(job, new Path(args[1]));

          job.waitForCompletion(true);
      }

      public static class DataCountMapper extends Mapper{

          @Override
          protected void map(LongWritable key, Text value,
          Mapper.Context context)
                throws IOException, InterruptedException {
            String hang=value.toString();
            String[] strings=hang.split("\t");
            String phOne=strings[1];
            long up=Long.parseLong(strings[2]);
            long down=Long.parseLong(strings[3]);
            DataBean dataBean=new DataBean(phone,up, down);

            context.write(new Text(phone), dataBean);
          }

      }

      public static class DataCountReducer extends Reducer{

          @Override
          protected void reduce(Text k2, Iterable v2,
          Reducer.Context context)
                throws IOException, InterruptedException {
            long upSum=0;
            long downSum=0;

            for(DataBean dataBean:v2){
                upSum += dataBean.getUpPayLoad();
                downSum += dataBean.getDownPayLoad();
            }

            DataBean dataBean=new DataBean(k2.toString(),upSum,downSum);

            context.write(new Text(k2), dataBean);
          }

      }
    }

 





推荐阅读
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 标题: ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • Linux重启网络命令实例及关机和重启示例教程
    本文介绍了Linux系统中重启网络命令的实例,以及使用不同方式关机和重启系统的示例教程。包括使用图形界面和控制台访问系统的方法,以及使用shutdown命令进行系统关机和重启的句法和用法。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • Java如何导入和导出Excel文件的方法和步骤详解
    本文详细介绍了在SpringBoot中使用Java导入和导出Excel文件的方法和步骤,包括添加操作Excel的依赖、自定义注解等。文章还提供了示例代码,并将代码上传至GitHub供访问。 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • 本文介绍了在Win10上安装WinPythonHadoop的详细步骤,包括安装Python环境、安装JDK8、安装pyspark、安装Hadoop和Spark、设置环境变量、下载winutils.exe等。同时提醒注意Hadoop版本与pyspark版本的一致性,并建议重启电脑以确保安装成功。 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 本文介绍了在sqoop1.4.*版本中,如何实现自定义分隔符的方法及步骤。通过修改sqoop生成的java文件,并重新编译,可以满足实际开发中对分隔符的需求。具体步骤包括修改java文件中的一行代码,重新编译所需的hadoop包等。详细步骤和编译方法在本文中都有详细说明。 ... [详细]
  • Apache Shiro 身份验证绕过漏洞 (CVE202011989) 详细解析及防范措施
    本文详细解析了Apache Shiro 身份验证绕过漏洞 (CVE202011989) 的原理和影响,并提供了相应的防范措施。Apache Shiro 是一个强大且易用的Java安全框架,常用于执行身份验证、授权、密码和会话管理。在Apache Shiro 1.5.3之前的版本中,与Spring控制器一起使用时,存在特制请求可能导致身份验证绕过的漏洞。本文还介绍了该漏洞的具体细节,并给出了防范该漏洞的建议措施。 ... [详细]
author-avatar
mobiledu2502887637
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有