热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

MapReduce读取数据

1、InputFormat运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型&#x

1、InputFormat

        运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。那么,针对不同的数据类型,MapReduce是如何读取这些数据的呢?

InputFormat是MapReduce框架用来读取数据的类。

InputFormat常见子类包括:


  • TextInputFormat (普通文本文件,MR框架默认的读取实现类型)
  • KeyValueTextInputFormat(读取一行文本数据按照指定分隔符,把数据封装为kv类型)
  • NLineInputF ormat(读取数据按照行数进行划分分片)
  • CombineTextInputFormat(合并小文件,避免启动过多MapTask任务)
  • 自定义InputFormat

CombineTextInputFormat案例

        MR框架默认的TextInputFormat切片机制按文件划分切片,文件无论多小,都是单独一个切片,然后由一个MapTask处理,如果有大量小文件,就对应的会生成并启动大量的 MapTask,而每个MapTask处理的数据量很小大量时间浪费在初始化资源启动收回等阶段,这种方式导致资源利用率不高。

        CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上划分成一个切片,这样多个小文件就可以交给一个MapTask处理,提高资源利用率。

需求:

        将输入数据中的多个小文件合并为一个切片处理,运行WordCount案例,准备多个小文件

具体使用方式:

// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m, 单位是kb

验证切片数量的变化!!

CombineTextInputFormat切片原理

切片生成过程分为两部分:虚拟存储过程和切片过程


假设设置setMaxInputSplitSize值为4M
四个小文件:1.txt -->2M ;2.txt-->7M;3.txt-->0.3M;4.txt--->8.2M


虚拟存储过程:把输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值进行比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。

        比如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分出一个4M的块。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的非常小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。


  • 1.txt-->2M;2M<4M;一个块;
  • 2.txt-->7M;7M>4M,但是不大于两倍,均匀分成两块;两块:每块3.5M;
  • 3.txt-->0.3M;0.3<4M ,0.3M<4M ,一个块
  • 4.txt-->8.2M;大于最大值且大于两倍;一个4M的块,剩余4.2M分成两块,每块2.1M
  • 所有块信息:2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M 共7个虚拟存储块。

切片过程:


  • 判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
  • 如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
  • 按照之前输入文件:有4个小文件大小分别为2M、7M、0.3M以及8.2M这四个小文件,则虚拟存储之后形成7个文件块,大小分别为:2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M

最终会形成3个切片,大小分别为:
(2+3.5)M,(3.5+0.3+4)M,(2.1+2.1)M


注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。



2、自定义InputFormat

        HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义InputFormat实现小文件的合并。

需求:

        将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。

结果:得到一个合并了多个小文件的SequenceFile文件

整体思路:


  1. 定义一个类继承FileInputFormat(TextInputFormat的父类就是FileInputFormat
  2. 重写isSplitable()指定为不可切分;重写createRecordReader()方法,创建自己的RecorderReader对象(实现数据自定义读取
  3. 改变默认读取数据方式,实现一次读取一个完整文件作为kv输出;
  4. Driver指定使用的InputFormat类型

代码参考:

自定义InputFormat

package com.lagou.mr.sequence;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.io.*;import java.io.IOException;// 自定义inputFormat读取多个小文件合并为一个SequenceFile文件
// SequenceFile文件中以kv形式存储文件,key --> 文件路径+文件名称, value --> 文件的整个内容
// TextInputFormat中泛型是LongWritable:文本的偏移量, Text:一行文本内容:指定当前inputFormat的输出数据类型
// 自定义inputFormat:key-->文件路径+名称,value-->整个文件内容
public class CustomInputFormat extends FileInputFormat {// 重写是否可切分@Overrideprotected boolean isSplitable(JobContext context, Path filename) {// 对于当前需求,不需要把文件切分,保证一个切片就是文件/*** 返回true允许切分* 返回false不允许切分*/return false;}// RecordReader就是用来读取数据的对象@Overridepublic RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {CustomRecordReader recordReader = new CustomRecordReader();// 调用recordReader的初始化方法recordReader.initialize(split, context);return recordReader;}
}

自定义RecordReader

package com.lagou.mr.sequence;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;// 附则读取数据,一次读取整个文件内容,封装为kv输出
public class CustomRecordReader extends RecordReader {private FileSplit split;// hadoop配置文件对象private Configuration conf;// 定义key,value的成员变量private Text key = new Text();private BytesWritable value = new BytesWritable();// 初始化方法,把切片以及上下文提升为全局@Overridepublic void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {this.split = (FileSplit) split;cOnf= context.getConfiguration();}private Boolean flag = true;// 用来读取数据的方法@Overridepublic boolean nextKeyValue() throws IOException, InterruptedException {// 对于当前split来说只需要读取一次即可,因为一次就把整个文件全部读取了if (flag) {// 准备一个数组存放读取到的数据,数据大小是多少?byte[] cOntent= new byte[(int) split.getLength()];Path path = split.getPath(); // 获取切片的path信息FileSystem fs = path.getFileSystem(conf); // 获取到文件系统对象FSDataInputStream fis = fs.open(path); // 获取到输入流IOUtils.readFully(fis, content, 0, content.length); // 读取数据并把数据放在数组中// 封装key和valuekey.set(path.toString());value.set(content, 0, content.length);// 关闭流IOUtils.closeStream(fis);// 把再次读取的开关置为falseflag = false;return true;}return false;}// 获取到key@Overridepublic Text getCurrentKey() throws IOException, InterruptedException {return key;}// 获取到value@Overridepublic BytesWritable getCurrentValue() throws IOException, InterruptedException {return value;}// 获取进度@Overridepublic float getProgress() throws IOException, InterruptedException {return 0;}// 关闭资源@Overridepublic void close() throws IOException {}
}

Mapper

package com.lagou.mr.sequence;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;// Text:代表的是一个文件的path+名称,BytesWritable:代表的是一个文件的内容
public class SequenceFileMapper extends Mapper {@Overrideprotected void map(Text key, BytesWritable value, Mapper.Context context) throws IOException, InterruptedException {//读取内容直接输出context.write(key, value);}
}

Reducer

package com.lagou.mr.sequence;import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class SequenceFileReducer extends Reducer {@Overrideprotected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {//输出value值,其中只有一个BytesWritable 所以直接next取出即可context.write(key, values.iterator().next());}
}

Driver

package com.lagou.mr.sequence;import com.lagou.mr.wc.WordCountDriver;
import com.lagou.mr.wc.WordCountMapper;
import com.lagou.mr.wc.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class SequenceDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {/*1. 获取配置文件对象,获取job对象实例2. 指定程序jar的本地路径3. 指定Mapper/Reducer类4. 指定Mapper输出的kv数据类型5. 指定最终输出的kv数据类型6. 指定job处理的原始数据路径7. 指定job输出结果路径8. 提交作业*/
// 1. 获取配置文件对象,获取job对象实例final Configuration cOnf= new Configuration();final Job job = Job.getInstance(conf, "SequenceDriver");
// 2. 指定程序jar的本地路径job.setJarByClass(SequenceDriver.class);
// 3. 指定Mapper/Reducer类job.setMapperClass(SequenceFileMapper.class);job.setReducerClass(SequenceFileReducer.class);
// 4. 指定Mapper输出的kv数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(BytesWritable.class);
// 5. 指定最终输出的kv数据类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(BytesWritable.class);//设置使用自定义InputFormat读取数据job.setInputFormatClass(CustomInputFormat.class);FileInputFormat.setInputPaths(job, new Path("D:\\data\\小文件")); //指定读取数据的原始路径
// 7. 指定job输出结果路径FileOutputFormat.setOutputPath(job, new Path("D:\\out_file\\out_1")); //指定结果数据输出路径
// 8. 提交作业final boolean flag = job.waitForCompletion(true);//jvm退出:正常退出0,非0值则是错误退出System.exit(flag ? 0 : 1);}
}

推荐阅读
  • Maven + Spring + MyBatis + MySQL 环境搭建与实例解析
    本文详细介绍如何使用MySQL数据库进行环境搭建,包括创建数据库表并插入示例数据。随后,逐步指导如何配置Maven项目,整合Spring框架与MyBatis,实现高效的数据访问。 ... [详细]
  • 二维码的实现与应用
    本文介绍了二维码的基本概念、分类及其优缺点,并详细描述了如何使用Java编程语言结合第三方库(如ZXing和qrcode.jar)来实现二维码的生成与解析。 ... [详细]
  • OBS Studio自动化实践:利用脚本批量生成录制场景
    本文探讨了如何利用OBS Studio进行高效录屏,并通过脚本实现场景的自动生成。适合对自动化办公感兴趣的读者。 ... [详细]
  • 在尝试启动Java应用服务器Tomcat时,遇到了org.apache.catalina.LifecycleException异常。本文详细记录了异常的具体表现形式,并提供了有效的解决方案。 ... [详细]
  • 本文介绍如何通过整合SparkSQL与Hive来构建高效的用户画像环境,提高数据处理速度和查询效率。 ... [详细]
  • 在Java开发中,保护代码安全是一个重要的课题。由于Java字节码容易被反编译,因此使用代码混淆工具如ProGuard变得尤为重要。本文将详细介绍如何使用ProGuard进行代码混淆,以及其基本原理和常见问题。 ... [详细]
  • CoreData 表关联详解
    在企业中,通常会有多个部门,每个员工隶属于某个部门。这种情况下,员工表和部门表之间就会形成关联关系。本文将详细介绍如何在CoreData中实现表关联,并通过示例代码展示如何添加和查询关联数据。 ... [详细]
  • 本文介绍了如何利用jQuery实现对网页上多个div元素的显示与隐藏控制,包括基本的toggle方法及更复杂的显示隐藏逻辑。 ... [详细]
  • 本文详细介绍了如何在Oracle VM VirtualBox中实现主机与虚拟机之间的数据交换,包括安装Guest Additions增强功能,以及如何利用这些功能进行文件传输、屏幕调整等操作。 ... [详细]
  • 问题描述现在,不管开发一个多大的系统(至少我现在的部门是这样的),都会带一个日志功能;在实际开发过程中 ... [详细]
  • protobuf 使用心得:解析与编码陷阱
    本文记录了一次在广告系统中使用protobuf进行数据交换时遇到的问题及其解决过程。通过这次经历,我们将探讨protobuf的特性和编码机制,帮助开发者避免类似的陷阱。 ... [详细]
  • 本文探讨了在Windows系统中运行Apache服务器时频繁出现崩溃的问题,并提供了多种可能的解决方案和建议。错误日志显示多个子进程因达到最大请求限制而退出。 ... [详细]
  • 本文介绍了如何在不同操作系统上安装Git,以及一些基本和高级的Git操作,包括项目初始化、文件状态检查、版本控制、分支管理、标签处理、版本回退等,并简要提及了开源许可协议的选择。 ... [详细]
  • Python 日志记录模块详解
    日志记录机制是软件开发中不可或缺的一部分,它帮助开发者追踪和调试程序运行时的各种异常。Python 提供了内置的 logging 模块,使我们在代码中记录和管理日志信息变得更加方便。本文将详细介绍如何使用 Python 的 logging 模块。 ... [详细]
  • 树莓派4B:安装基础操作系统指南
    本文将详细介绍如何为树莓派4B安装基础操作系统,包括所需材料、镜像下载、镜像烧录以及更换国内源等步骤。 ... [详细]
author-avatar
坚强萝卜_854
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有