1、reduce side join(reduce端表连接)使用分布式缓存API,完成两个数据集的连接操作优点:操作简单
缺点:map端shffule后传递给reduce端的数据量过大,极大的降低了性能连接方法:
(1)map端读入输入数据,以连接键为Key,待连接的内容为value,但是value需要添加特别的标识,表示的内容为表的表示,即若value来自于表1,则标识位设置为1,若来自表2,则设置为2,然后将map的内容输出到reduce。
(2)reduce端接收来自map端shuffle后的结果,即内容,然后遍历values,对每一个value进行处理主要的处理过程是:判断每一个标志位,如果来自1表,则将value放置在特地为1表创建的数组之中若来自2表,则将value放置在为2表创建的数组中,最后对两个数组进行求笛卡儿积,然后输出结果,即为最终表的连接结果。
2、问题分析MapReduce连接取决于数据集的规模及分区方式
如果一个数据集很大而另外一个数据集很小,小的分发到集群中的每一个节点mapper阶段读取大数据集中的数据
reducer获取本节点上的数据(也就是小数据集中的数据)并完成连接操作
3、缓存在本地的目录设置以下为默认值:
<property><name>mapred.local.dirname><value>${hadoop.tmp.dir}/mapred/localdir/filecachevalue>
property><property><name>local.cache.sizename><value>10737418240value>
property>
4、使用方式旧版本的DistributedCache已经被注解为过时&#xff0c;以下为Hadoop-2.2.0以上的新API接口&#xff0c;测试的Hadoop版本为2.7.2。
Job job &#61; Job.getInstance(conf);
job.addCacheFile(new URI("hdfs://url:port/filename#symlink"));之后在map/reduce函数中可以通过context来访问到缓存的文件&#xff0c;一般是重写setup方法来进行初始化&#xff1a;
直接使用hadoop方式&#xff1a;
protected void setup(Context context) throws IOException, InterruptedException {super.setup(context);if (context.getCacheFiles() !&#61; null && context.getCacheFiles().length > 0) {String path &#61; context.getLocalCacheFiles()[0].getName();File itermOccurrenceMatrix &#61; new File(path);FileReader fileReader &#61; new FileReader(itermOccurrenceMatrix);BufferedReader bufferedReader &#61; new BufferedReader(fileReader);String s;while ((s &#61; bufferedReader.readLine()) !&#61; null) {}bufferedReader.close();fileReader.close();}
}
或者采用以下方法&#xff1a;将hadoop方式转化为java方式进行处理Configuration config&#61;context.getConfiguration();FileSystem fs&#61;FileSystem.get(config);FSDataInputStream in&#61;fs.open(new Path(path));Text line&#61;new Text(“ ”);LineReader lineReader&#61;new LineReader(in,config);int offset&#61;0;do{offset&#61;lineReader.readLine(line); if(offset>0){String[] tokens&#61;line.toString().split(“,”); countryCodesTreeMap.put(tokens[0],tokens[1]);
}
}while(offset!&#61;0); 得到的path为本地文件系统上的路径这里的getLocalCacheFiles方法也被注解为过时了&#xff0c;只能使用context.getCacheFiles方法&#xff0c;
和getLocalCacheFiles不同的是&#xff0c;getCacheFiles得到的路径是HDFS上的文件路径&#xff0c;
如果使用这个方法&#xff0c;那么程序中读取的就不再试缓存在各个节点上的数据了&#xff0c;相当于共同访问HDFS上的同一个文件。
可以直接通过符号连接来跳过getLocalCacheFiles获得本地的文件。
5、实现步骤1)把数据放到缓存中的方法
public void addCacheFile(URI uri);
public void addCacheArchive(URI uri);
public void setCacheFiles(URI[] files);
public void setCacheArchives(URI[] archives);
public void addFileToClassPath(Path file);
public void addArchiveToClassPath(Path archive);在缓存中可以存放两类对象&#xff1a;文件&#xff08;files&#xff09;和存档&#xff08;achives&#xff09;。
文件被直接放置在任务节点上&#xff0c;而存档则会被解档之后再将具体文件放置在任务节点上。 2)其次掌握在map或者reduce任务中&#xff0c;使用API从缓存中读取数据。
可以通过 getFileClassPaths()和getArchivesClassPaths()方法获取被添加到任务的类路径下的文件和文档。
1、map side join在map端进行表的连接&#xff0c;对表的大小有要求&#xff0c;首先有一个表必须足够小&#xff0c;可以读入内存&#xff0c;另外的一个表很大&#xff0c;
与reduce端连接比较&#xff0c;map端的连接&#xff0c;不会产生大量数据的传递&#xff0c;而是在map端连接完毕之后就进行输出&#xff0c;效率极大的提高连接方法&#xff1a;
&#xff08;1&#xff09;首先要重写Mapper类下面的setup方法&#xff0c;因为这个方法是先于map方法执行的&#xff0c;将较小的表先读入到一个HashMap中。
&#xff08;2&#xff09;重写map函数&#xff0c;一行行读入大表的内容&#xff0c;逐一的与HashMap中的内容进行比较&#xff0c;若Key相同&#xff0c;则对数据进行格式化处理&#xff0c;然后直接输出。
2、Map侧的连接两个数据集中一个非常小&#xff0c;可以让小数据集存入缓存。
在作业开始这些文件会被复制到运行task的节点上。一开始&#xff0c;它的setup方法会检索缓存文件。
3、Map侧连接需要满足条件与reduce侧连接不同&#xff0c;Map侧连接需要等待参与连接的数据集满足如下条件&#xff1a;1.除了连接键外&#xff0c;所有的输入都必须按照连接键排序。输入的各种数据集必须有相同的分区数。所有具有相同键的记录需要放在同一分区中。
当Map任务对其他Mapreduce作业的结果进行处理时&#xff08;Cleanup时&#xff09;&#xff0c;Map侧的连接条件都自动满足。
CompositeInputFormat类用于执行Map侧的连接&#xff0c;而输入和连接类型的配置可以通过属性指定。2.如果其中一个数据集足够小&#xff0c;旁路的分布式通道可以用在Map侧的连接中。
输入&#xff1a;
num1文件和num2文件&#xff1a;
xm&#64;master:~/workspace$ hadoop fs -text /b/num1
1,Beijing
2,Guangzhou
3,Shenzhen
4,Xian
xm&#64;master:~/workspace$ hadoop fs -text /b/num2
Beijing Red Star,1
Shenzhen Thunder,3
Guangzhou Honda,2
Beijing Rising,1
Guangzhou Development Bank,2
Tencent,3
Back of Beijing,1
输出&#xff1a;
Back of Beijing Beijing
Beijing Red Star Beijing
Beijing Rising Beijing
Guangzhou Development Bank Guangzhou
Guangzhou Honda Guangzhou
Shenzhen Thunder Shenzhen
Tencent Shenzhen
实现代码&#xff1a;
package mr_01import java.io.IOException
import java.net.URI
import java.util.HashMap
import java.util.Mapimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.Reducer
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.util.LineReaderpublic class reduceJoin { static String INPUT_PATH&#61;"hdfs://master:9000/b/num2"static String OUTPUT_PATH&#61;"hdfs://master:9000/output"//Map直接写入&#xff0c;不需修改static class MyMapper extends Mapper{ Text output_key&#61;new Text()Text output_value&#61;new Text()protected void map(Object key, Object value, Context context) throws IOException, InterruptedException{String[] tokens&#61;value.toString().split(",")if(tokens!&#61;null && tokens.length&#61;&#61;2){output_key.set(tokens[0])output_value.set(tokens[1])context.write(output_key,output_value)}}}static class MyReduce extends Reducer{Text output_key&#61;new Text()Text output_value&#61;new Text()Map addMap&#61;new HashMap()//setup方法将文件中的数据写入hashmap中protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException{URI uri&#61;context.getCacheFiles()[0]Path path&#61;new Path(uri)System.out.println("path&#61;"&#43;uri.toString())FileSystem fs&#61; path.getFileSystem(context.getConfiguration())LineReader lineReader&#61;new LineReader(fs.open(path))Text line&#61;new Text()while(lineReader.readLine(line)>0){String[] tokens&#61;line.toString().split(",")if(tokens!&#61;null && tokens.length&#61;&#61;2)addMap.put(tokens[0], tokens[1])}System.out.println("addMap.size&#61;"&#43;addMap.size())}//reduce进行取用&#xff08;key-->value对应&#xff09;protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException{//idif(values&#61;&#61;null) returnString addrName&#61; addMap.get(values.iterator().next().toString())output_value.set(addrName)context.write(key,output_value)}}public static void main(String[] args) throws Exception{Path outputpath&#61;new Path(OUTPUT_PATH)Path cacheFile&#61;new Path("hdfs://master:9000/b/num1")Configuration conf&#61;new Configuration()FileSystem fs&#61;outputpath.getFileSystem(conf)if(fs.exists(outputpath)){fs.delete(outputpath, true)}Job job&#61;Job.getInstance(conf)FileInputFormat.setInputPaths(job, INPUT_PATH)FileOutputFormat.setOutputPath(job, outputpath)URI uri&#61;cacheFile.toUri()job.setCacheFiles(new URI[]{uri})job.setMapperClass(MyMapper.class)job.setReducerClass(MyReduce.class)job.setOutputKeyClass(Text.class)job.setOutputValueClass(Text.class)job.waitForCompletion(true)}
}
如果要实现多表连接&#xff0c;那么只需将多个表存进缓存中取用即可。
1.避免生成太多依赖I/O的map任务&#xff0c;数量由输入决定
2.作业加速主要来源于Map任务&#xff0c;有更高的并行度
3.Combiner对效率的提高&#xff0c;不仅在map reduce任务之间的数据传输&#xff0c;而且体现在降低了map侧I/O负载
4.自定义分区器可以在不同的reduce之间做负载均衡
5.分布式缓存对于小文件场景很有用&#xff0c;但应该避免过多或大的文件存储在缓存中