热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

MapReduceday04第三章框架原理3.3Shuffle机制

1:shuffle机制shuffle是处于Map方法之后,Reduce方法之前的数据处理过程。2:Partition分区1:

1:shuffle机制

shuffle是处于Map方法之后,Reduce方法之前的数据处理过程。

 2:Partition分区

        1:要求将统计结果按照条件输出到不同的文件中(分区)。将统计结果按照手机归属地不同省份输出到不同文件中(分区)。

        2:默认的Partition分区

public class HashPartitioner extends Partitioner {public int getPartition(K key, V value, int numReduceTasks) {// 相当于key的hashcode值与numReduceTasks取余得到。return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}
}

        3:自定义Partitioner步骤

# 1.自定义类继承Partitioner,重写getPartition方法
public class CustomPartitioner extends Partitioner {@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {// 控制分区代码逻辑… …return partition;}
}# 2.在job驱动中,设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);# 3.自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);

3:自定义分区案例

        1:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

        2:输入的数据phone_data.txt

        3:期望输出数据:136、137、138、139开头都放到一个独立的4个文件中,其他开头的放在一个文件中。

        4:136对应分区0,137对应分区1,138对应分区2,139对应分区3,其他对应分区4        

        5:Driver类中,// 指定自定义数据分区 job.setPartitionerClass(ProvincePartitioner. class);

                                  // 同时指定相应数量的reduceTask job.setNumReduceTasks(5);

        6:代码

public class ProvincePartitioner extends Partitioner {@Overridepublic int getPartition(Text text, FlowBean flowBean, int numPartitions) {//获取手机号前三位 prePhoneString phone = text.toString();String prePhone = phone.substring(0, 3);//定义一个分区号变量 partition,根据 prePhone 设置分区号int partition;if("136".equals(prePhone)){partition = 0;}else if("137".equals(prePhone)){partition = 1;}else if("138".equals(prePhone)){partition = 2;}else if("139".equals(prePhone)){partition = 3;}else {partition = 4;}// 上卖弄partition参数的设置,需要连续,不能有空//最后返回分区号 partitionreturn partition;}
}# 在driver类中添加如下代码
//8 指定自定义分区器
job.setPartitionerClass(ProvincePartitioner.class);
//9 同时指定相应数量的 ReduceTask// 这个数值的设置,设置2-4都会报IO异常,设置5走自定义类,设置6会多出来一个空文件,设置1只有会有个分区文件
job.setNumReduceTasks(5);

4:排序

        MapTask:一次快排,在环形缓冲区溢写前  ,对key的索引按照字典的顺序(a、b、c...)进行排序,进行分区内的快排,一次归并,对溢写文件进行归并,溢写文件是区内有序,因为快排,归并排序是把所有溢写文件中同属于一个分区的所有数据进行排序,使得整个分区一内排好顺序。

        ReduceTask:一次归并排序。例如Reduce Task1只负责partition0内的数据,但是MapTask1和MapTask2都有partition0的数据,所以ReduceTask1会主动把这两个MapTask内的partition0数据进行拉取,然后进行归并排序,排成只有一个文件。 

        一定要进行排序,因为最后到Reducer的时候,是相同key为一组进行操作的,事先排好序的话,效率会比较高。 

        1:部分排序。MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部有序

        2:全排序。最终输出结果只有一个文件,且文件内部有序。

        3:二次排序。在自定义排序中,如果compareTo中判断条件为两个即为二次排序。

5:全排序案例以及二次排序案例(使用之前的序列化案例,二次排序是在总流量相同的情况下,设置上行流量按照升序的方式排列,在compareTo方法里面设置就好)

# 根据手机总流量进行降序
# 输入数据,既然对流量进行排序,就把流量认为key
137 2481 24681 27162
138 264 0 264
139 132 1512 1644
135 7355 110349 117684# 输出数据
135 7355 110349 117684
137 2481 24681 27162
139 132 1512 1644
138 264 0 264 # FlowBean实现WritableCompareable接口重写compareTo方法
# public class FlowBean implements WritableComparable
@Override
public int compareTo(FlowBean o) {// 倒序排列,按照总流量从大到小if(this.sumFlow > o.getSunFlow){return -1;}else if(this.sunFlow o.upFlow){return 1}else if(this.upFlow public class FlowMapper extends Mapper {private FlowBean outK = new FlowBean();private Text outV = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {//1 获取一行数据String line = value.toString();//2 按照"\t",切割数据String[] split = line.split("\t");//3 封装 outK outVoutK.setUpFlow(Long.parseLong(split[1]));outK.setDownFlow(Long.parseLong(split[2]));outK.setSumFlow();outV.set(split[0]);//4 写出 outK outVcontext.write(outK,outV);}
}
# Reducer类
// 循环输出,避免总流量相同情况
public class FlowReducer extends Reducer {@Overrideprotected void reduce(FlowBean key, Iterable values, Context
context) throws IOException, InterruptedException {//遍历 values 集合,循环写出,避免总流量相同的情况for (Text value : values) {//调换 KV 位置,反向写出context.write(value,key);}}
}#driver类
//4 修改设置 Map 端输出数据的 KV 类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);

6:区排序(分区加排序的共同使用,分区类也要注意key和value是什么)

# 根据上面案例的输出情况,把136、137、138等等开头的手机号放在不同的文件夹中,同时各个文件中也 # 按照总流量进行排序,这样其实就是分区加排序的综合使用# 分区类
public class ProvincePartitioner2 extends Partitioner {@Overridepublic int getPartition(FlowBean flowBean, Text text, int numPartitions) {//获取手机号前三位String phone = text.toString();String prePhone = phone.substring(0, 3);//定义一个分区号变量 partition,根据 prePhone 设置分区号int partition;if("136".equals(prePhone)){partition = 0;}else if("137".equals(prePhone)){partition = 1;}else if("138".equals(prePhone)){partition = 2;}else if("139".equals(prePhone)){partition = 3;}else {partition = 4;}//最后返回分区号 partitionreturn partition;}
}# 在driver类添加,进行绑定
job.setPartitionerClass(ProvincePartitioner2.class);
// 设置对应的 ReduceTask 的个数
job.setNumReduceTasks(5);

7:Combiner合并

        1:Combiner可有可无,但是为了考虑效率问题,一般需要使用

        2:Combiner父类是Reducer

        3:Combiner和Reducer区别在于运行的位置:Combiner只负责一个MapTask,Reducer负责全局所有的MapTask

        4:Combiner意思是对每一个MapTask的输出进行局部汇总,减少传输量。

        5:Combiner可以用的前提是不能影响最终的业务逻辑(算平均值不可,求和可以),且Combiner的输出kv要和Reducer的输入kv对应起来。

Mapper Reducer
3 5 7 -> (3+5+7)/3=5 (3+5+7+2+6)/5=23/5 不等于 (5+4)/2=9/2
2 6 -> (2+6)/2=4-> 3+5+7=15 (15+8)=23 等于 (3+5+7+2+6)=23-> 2+6=8

8:Combiner案例 (wordcount案例基础之上)

# 输入数据 希望在mapper阶段即可得到这个
banzhang ni hao
xihuan hadoop
banzhang
banzhang ni hao
xihuan hadoop
banzhang# 期望结果,在日志文件中可以看到combine input records = 12 combine output records = 5
# 方案一:新建WordCountCombiner类
public class WordCountCombiner extends Reducer {private IntWritable outV = new IntWritable();@Overrideprotected void reduce(Text key, Iterable values, Context
context) throws IOException, InterruptedException {int sum = 0;for (IntWritable value : values) {sum += value.get();}//封装 outKVoutV.set(sum);//写出 outKVcontext.write(key,outV);}
}// 指定需要使用 combiner,以及用哪个类作为 combiner 的逻辑
job.setCombinerClass(WordCountCombiner.class);
# 如果设置 job.setNumReduceTasks(0),即取消Reduce阶段,那么shuffle整个都不会执行,因为
# shuffle是连接map和reduce的,所以combiner也不会执行。 # 方案二:因为上面类的代码和Reducer代码一样
// 指定需要使用 Combiner,以及用哪个类作为 Combiner 的逻辑
job.setCombinerClass(WordCountReducer.class);


推荐阅读
  • 本文介绍了一种轻巧方便的工具——集算器,通过使用集算器可以将文本日志变成结构化数据,然后可以使用SQL式查询。集算器利用集算语言的优点,将日志内容结构化为数据表结构,SPL支持直接对结构化的文件进行SQL查询,不再需要安装配置第三方数据库软件。本文还详细介绍了具体的实施过程。 ... [详细]
  • PDO MySQL
    PDOMySQL如果文章有成千上万篇,该怎样保存?数据保存有多种方式,比如单机文件、单机数据库(SQLite)、网络数据库(MySQL、MariaDB)等等。根据项目来选择,做We ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • Nginx使用AWStats日志分析的步骤及注意事项
    本文介绍了在Centos7操作系统上使用Nginx和AWStats进行日志分析的步骤和注意事项。通过AWStats可以统计网站的访问量、IP地址、操作系统、浏览器等信息,并提供精确到每月、每日、每小时的数据。在部署AWStats之前需要确认服务器上已经安装了Perl环境,并进行DNS解析。 ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • 本文介绍了在Win10上安装WinPythonHadoop的详细步骤,包括安装Python环境、安装JDK8、安装pyspark、安装Hadoop和Spark、设置环境变量、下载winutils.exe等。同时提醒注意Hadoop版本与pyspark版本的一致性,并建议重启电脑以确保安装成功。 ... [详细]
  • 图解redis的持久化存储机制RDB和AOF的原理和优缺点
    本文通过图解的方式介绍了redis的持久化存储机制RDB和AOF的原理和优缺点。RDB是将redis内存中的数据保存为快照文件,恢复速度较快但不支持拉链式快照。AOF是将操作日志保存到磁盘,实时存储数据但恢复速度较慢。文章详细分析了两种机制的优缺点,帮助读者更好地理解redis的持久化存储策略。 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • 本文介绍了Web学习历程记录中关于Tomcat的基本概念和配置。首先解释了Web静态Web资源和动态Web资源的概念,以及C/S架构和B/S架构的区别。然后介绍了常见的Web服务器,包括Weblogic、WebSphere和Tomcat。接着详细讲解了Tomcat的虚拟主机、web应用和虚拟路径映射的概念和配置过程。最后简要介绍了http协议的作用。本文内容详实,适合初学者了解Tomcat的基础知识。 ... [详细]
  • 本文介绍了sqlserver云存储和本地存储的区别,云存储是将数据存储在网络上,方便查看和调用;本地存储是将数据存储在电脑磁盘上,只能在存储的电脑上查看。同时提供了几种启动sqlserver的方法。此外,还介绍了如何导出数据库的步骤和工具。 ... [详细]
  • 本文介绍了自动化测试专家Elfriede Dustin在2008年的文章中讨论了自动化测试项目失败的原因。同时,引用了IDT在2007年进行的一次软件自动化测试的研究调查结果,调查显示很多公司认为自动化测试很有用,但很少有公司成功实施。调查结果表明,缺乏资源是导致自动化测试失败的主要原因,其中37%的人认为缺乏时间。 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • 一次上线事故,30岁+的程序员踩坑经验之谈
    本文主要介绍了一位30岁+的程序员在一次上线事故中踩坑的经验之谈。文章提到了在双十一活动期间,作为一个在线医疗项目,他们进行了优惠折扣活动的升级改造。然而,在上线前的最后一天,由于大量数据请求,导致部分接口出现问题。作者通过部署两台opentsdb来解决问题,但读数据的opentsdb仍然经常假死。作者只能查询最近24小时的数据。这次事故给他带来了很多教训和经验。 ... [详细]
  • 本文整理了Java中org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc.getTypeInfo()方法的一些代码示例,展 ... [详细]
author-avatar
手机用户2602921303_852
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有