热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

mapreduce之join连接

1、reducesidejoin(reduce端表连接)使用分布式缓存API,完成两个数据集的连接操作优点:操作简单缺点&

1、reduce side join(reduce端表连接)使用分布式缓存API,完成两个数据集的连接操作优点:操作简单
缺点:map端shffule后传递给reduce端的数据量过大,极大的降低了性能连接方法:
(1)map端读入输入数据,以连接键为Key,待连接的内容为value,但是value需要添加特别的标识,表示的内容为表的表示,即若value来自于表1,则标识位设置为1,若来自表2,则设置为2,然后将map的内容输出到reduce。
(2)reduce端接收来自map端shuffle后的结果,即内容,然后遍历values,对每一个value进行处理主要的处理过程是:判断每一个标志位,如果来自1表,则将value放置在特地为1表创建的数组之中若来自2表,则将value放置在为2表创建的数组中,最后对两个数组进行求笛卡儿积,然后输出结果,即为最终表的连接结果。

2、问题分析MapReduce连接取决于数据集的规模及分区方式
如果一个数据集很大而另外一个数据集很小,小的分发到集群中的每一个节点mapper阶段读取大数据集中的数据
reducer获取本节点上的数据(也就是小数据集中的数据)并完成连接操作

3、缓存在本地的目录设置以下为默认值:
<property><name>mapred.local.dirname><value>${hadoop.tmp.dir}/mapred/localdir/filecachevalue>
property><property><name>local.cache.sizename><value>10737418240value>
property>

4、使用方式旧版本的DistributedCache已经被注解为过时&#xff0c;以下为Hadoop-2.2.0以上的新API接口&#xff0c;测试的Hadoop版本为2.7.2
Job job &#61; Job.getInstance(conf);
//将hdfs上的文件加入分布式缓存
job.addCacheFile(new URI("hdfs://url:port/filename#symlink"));之后在map/reduce函数中可以通过context来访问到缓存的文件&#xff0c;一般是重写setup方法来进行初始化&#xff1a;
直接使用hadoop方式&#xff1a;
protected void setup(Context context) throws IOException, InterruptedException {super.setup(context);if (context.getCacheFiles() !&#61; null && context.getCacheFiles().length > 0) {String path &#61; context.getLocalCacheFiles()[0].getName();File itermOccurrenceMatrix &#61; new File(path);FileReader fileReader &#61; new FileReader(itermOccurrenceMatrix);BufferedReader bufferedReader &#61; new BufferedReader(fileReader);String s;while ((s &#61; bufferedReader.readLine()) !&#61; null) {//TODO:读取每行内容进行相关的操作}bufferedReader.close();fileReader.close();}
}
或者采用以下方法&#xff1a;将hadoop方式转化为java方式进行处理Configuration config&#61;context.getConfiguration();FileSystem fs&#61;FileSystem.get(config);FSDataInputStream in&#61;fs.open(new Path(path));Text line&#61;new Text(“ ”);LineReader lineReader&#61;new LineReader(in,config);int offset&#61;0;do{offset&#61;lineReader.readLine(line); //读入path中一行到Text类型的line中&#xff0c;返回字节数if(offset>0){String[] tokens&#61;line.toString().split(“,”); countryCodesTreeMap.put(tokens[0],tokens[1]);
}
}while(offset!&#61;0); 得到的path为本地文件系统上的路径这里的getLocalCacheFiles方法也被注解为过时了&#xff0c;只能使用context.getCacheFiles方法&#xff0c;
和getLocalCacheFiles不同的是&#xff0c;getCacheFiles得到的路径是HDFS上的文件路径&#xff0c;
如果使用这个方法&#xff0c;那么程序中读取的就不再试缓存在各个节点上的数据了&#xff0c;相当于共同访问HDFS上的同一个文件。
可以直接通过符号连接来跳过getLocalCacheFiles获得本地的文件。

5、实现步骤1)把数据放到缓存中的方法
public void addCacheFile(URI uri);
public void addCacheArchive(URI uri);// 以上两组方法将文件或存档添加到分布式缓存
public void setCacheFiles(URI[] files);
public void setCacheArchives(URI[] archives);// 以上两组方法将一次性向分布式缓存中添加一组文件或存档
public void addFileToClassPath(Path file);
public void addArchiveToClassPath(Path archive);// 以上两组方法将文件或存档添加到 MapReduce 任务的类路径在缓存中可以存放两类对象&#xff1a;文件&#xff08;files&#xff09;和存档&#xff08;achives&#xff09;。
文件被直接放置在任务节点上&#xff0c;而存档则会被解档之后再将具体文件放置在任务节点上。 2)其次掌握在map或者reduce任务中&#xff0c;使用API从缓存中读取数据。
可以通过 getFileClassPaths()和getArchivesClassPaths()方法获取被添加到任务的类路径下的文件和文档。

1map side joinmap端进行表的连接&#xff0c;对表的大小有要求&#xff0c;首先有一个表必须足够小&#xff0c;可以读入内存&#xff0c;另外的一个表很大&#xff0c;
与reduce端连接比较&#xff0c;map端的连接&#xff0c;不会产生大量数据的传递&#xff0c;而是在map端连接完毕之后就进行输出&#xff0c;效率极大的提高连接方法&#xff1a;
&#xff08;1&#xff09;首先要重写Mapper类下面的setup方法&#xff0c;因为这个方法是先于map方法执行的&#xff0c;将较小的表先读入到一个HashMap中。
&#xff08;2&#xff09;重写map函数&#xff0c;一行行读入大表的内容&#xff0c;逐一的与HashMap中的内容进行比较&#xff0c;若Key相同&#xff0c;则对数据进行格式化处理&#xff0c;然后直接输出。

2、Map侧的连接两个数据集中一个非常小&#xff0c;可以让小数据集存入缓存。
在作业开始这些文件会被复制到运行task的节点上。一开始&#xff0c;它的setup方法会检索缓存文件。

3Map侧连接需要满足条件与reduce侧连接不同&#xff0c;Map侧连接需要等待参与连接的数据集满足如下条件&#xff1a;1.除了连接键外&#xff0c;所有的输入都必须按照连接键排序。输入的各种数据集必须有相同的分区数。所有具有相同键的记录需要放在同一分区中。
Map任务对其他Mapreduce作业的结果进行处理时&#xff08;Cleanup时&#xff09;&#xff0c;Map侧的连接条件都自动满足。
CompositeInputFormat类用于执行Map侧的连接&#xff0c;而输入和连接类型的配置可以通过属性指定。2.如果其中一个数据集足够小&#xff0c;旁路的分布式通道可以用在Map侧的连接中。

输入&#xff1a;
num1文件和num2文件&#xff1a;
xm&#64;master:~/workspace$ hadoop fs -text /b/num1
1,Beijing
2,Guangzhou
3,Shenzhen
4,Xian
xm&#64;master:~/workspace$ hadoop fs -text /b/num2
Beijing Red Star,1
Shenzhen Thunder,3
Guangzhou Honda,2
Beijing Rising,1
Guangzhou Development Bank,2
Tencent,3
Back of Beijing,1

输出&#xff1a;
Back of Beijing Beijing
Beijing Red Star Beijing
Beijing Rising Beijing
Guangzhou Development Bank Guangzhou
Guangzhou Honda Guangzhou
Shenzhen Thunder Shenzhen
Tencent Shenzhen

实现代码&#xff1a;
package mr_01;import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.LineReader;public class reduceJoin { static String INPUT_PATH&#61;"hdfs://master:9000/b/num2";static String OUTPUT_PATH&#61;"hdfs://master:9000/output";//Map直接写入&#xff0c;不需修改static class MyMapper extends Mapper{ Text output_key&#61;new Text(); //fnameText output_value&#61;new Text(); //add-idprotected void map(Object key, Object value, Context context) throws IOException, InterruptedException{String[] tokens&#61;value.toString().split(","); if(tokens!&#61;null && tokens.length&#61;&#61;2){output_key.set(tokens[0]); output_value.set(tokens[1]);context.write(output_key,output_value);}}}static class MyReduce extends Reducer{Text output_key&#61;new Text();Text output_value&#61;new Text();Map addMap&#61;new HashMap(); //a( addr-id,addr-name )//setup方法将文件中的数据写入hashmap中protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException{URI uri&#61;context.getCacheFiles()[0];Path path&#61;new Path(uri);System.out.println("path&#61;"&#43;uri.toString());FileSystem fs&#61; path.getFileSystem(context.getConfiguration());LineReader lineReader&#61;new LineReader(fs.open(path));Text line&#61;new Text();while(lineReader.readLine(line)>0){String[] tokens&#61;line.toString().split(",");if(tokens!&#61;null && tokens.length&#61;&#61;2)addMap.put(tokens[0], tokens[1]); }System.out.println("addMap.size&#61;"&#43;addMap.size());}//reduce进行取用&#xff08;key-->value对应&#xff09;protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException{//idif(values&#61;&#61;null) return;String addrName&#61; addMap.get(values.iterator().next().toString());output_value.set(addrName);context.write(key,output_value);}}public static void main(String[] args) throws Exception{Path outputpath&#61;new Path(OUTPUT_PATH);Path cacheFile&#61;new Path("hdfs://master:9000/b/num1");Configuration conf&#61;new Configuration();FileSystem fs&#61;outputpath.getFileSystem(conf);if(fs.exists(outputpath)){fs.delete(outputpath, true);}Job job&#61;Job.getInstance(conf);FileInputFormat.setInputPaths(job, INPUT_PATH);FileOutputFormat.setOutputPath(job, outputpath);URI uri&#61;cacheFile.toUri();job.setCacheFiles(new URI[]{uri});job.setMapperClass(MyMapper.class); //mapjob.setReducerClass(MyReduce.class); //reducejob.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.waitForCompletion(true);}
}

如果要实现多表连接&#xff0c;那么只需将多个表存进缓存中取用即可。

1.避免生成太多依赖I/O的map任务&#xff0c;数量由输入决定
2.作业加速主要来源于Map任务&#xff0c;有更高的并行度
3.Combiner对效率的提高&#xff0c;不仅在map reduce任务之间的数据传输&#xff0c;而且体现在降低了map侧I/O负载
4.自定义分区器可以在不同的reduce之间做负载均衡
5.分布式缓存对于小文件场景很有用&#xff0c;但应该避免过多或大的文件存储在缓存中


推荐阅读
  • MapReduce工作流程最详细解释
    MapReduce是我们再进行离线大数据处理的时候经常要使用的计算模型,MapReduce的计算过程被封装的很好,我们只用使用Map和Reduce函数,所以对其整体的计算过程不是太 ... [详细]
  • 一、Hadoop来历Hadoop的思想来源于Google在做搜索引擎的时候出现一个很大的问题就是这么多网页我如何才能以最快的速度来搜索到,由于这个问题Google发明 ... [详细]
  • 本文整理了Java中org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc.getTypeInfo()方法的一些代码示例,展 ... [详细]
  • mapreduce源码分析总结
    这篇文章总结的非常到位,故而转之一MapReduce概述MapReduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的ÿ ... [详细]
  • HashMap的相关问题及其底层数据结构和操作流程
    本文介绍了关于HashMap的相关问题,包括其底层数据结构、JDK1.7和JDK1.8的差异、红黑树的使用、扩容和树化的条件、退化为链表的情况、索引的计算方法、hashcode和hash()方法的作用、数组容量的选择、Put方法的流程以及并发问题下的操作。文章还提到了扩容死链和数据错乱的问题,并探讨了key的设计要求。对于对Java面试中的HashMap问题感兴趣的读者,本文将为您提供一些有用的技术和经验。 ... [详细]
  • 本文介绍了在sqoop1.4.*版本中,如何实现自定义分隔符的方法及步骤。通过修改sqoop生成的java文件,并重新编译,可以满足实际开发中对分隔符的需求。具体步骤包括修改java文件中的一行代码,重新编译所需的hadoop包等。详细步骤和编译方法在本文中都有详细说明。 ... [详细]
  • 深入理解Java虚拟机的并发编程与性能优化
    本文主要介绍了Java内存模型与线程的相关概念,探讨了并发编程在服务端应用中的重要性。同时,介绍了Java语言和虚拟机提供的工具,帮助开发人员处理并发方面的问题,提高程序的并发能力和性能优化。文章指出,充分利用计算机处理器的能力和协调线程之间的并发操作是提高服务端程序性能的关键。 ... [详细]
  •     这里使用自己编译的hadoop-2.7.0版本部署在windows上,记得几年前,部署hadoop需要借助于cygwin,还需要开启ssh服务,最近发现,原来不需要借助cy ... [详细]
  • 什么是大数据lambda架构
    一、什么是Lambda架构Lambda架构由Storm的作者[NathanMarz]提出,根据维基百科的定义,Lambda架构的设计是为了在处理大规模数 ... [详细]
  • Hadoop源码解析1Hadoop工程包架构解析
    1 Hadoop中各工程包依赖简述   Google的核心竞争技术是它的计算平台。Google的大牛们用了下面5篇文章,介绍了它们的计算设施。   GoogleCluster:ht ... [详细]
  • 对于开源的东东,尤其是刚出来不久,我认为最好的学习方式就是能够看源代码和doc,測试它的样例为了方便查看源代码,关联导入源代 ... [详细]
  • 前言折腾了一段时间hadoop的部署管理,写下此系列博客记录一下。为了避免各位做部署这种重复性的劳动,我已经把部署的步骤写成脚本,各位只需要按着本文把脚本执行完,整个环境基本就部署 ... [详细]
  • 我们在之前的文章中已经初步介绍了Cloudera。hadoop基础----hadoop实战(零)-----hadoop的平台版本选择从版本选择这篇文章中我们了解到除了hadoop官方版本外很多 ... [详细]
  • Azkaban(三)Azkaban的使用
    界面介绍首页有四个菜单projects:最重要的部分,创建一个工程,所有flows将在工程中运行。scheduling:显示定时任务executing:显示当前运行的任务histo ... [详细]
  • Java开发实战讲解!字节跳动三场技术面+HR面
    二、回顾整理阿里面试题基本就这样了,还有一些零星的问题想不起来了,答案也整理出来了。自我介绍JVM如何加载一个类的过程,双亲委派模型中有 ... [详细]
author-avatar
jojo
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有