情况:
在进行高级的数据处理时,你会发现你的程序不能放在一个的MapReduce job之中了。Hadoop支持将多个MapReduce Job串成一条链来形成一个更大的MapReduce Job。同时你会发现Hadoop数据处理过程中通常包括多个数据源,我们将探索一些join技术同时处理多个数据源。
1.将具有复杂依赖关系的多个MapReduce Job串联起来。
情况:有三个Job,分别成为Job1,Job2,Job3,这三个Job的关系是Job1、Job2可以同时运行,但Job3必须等待Job1、Job2都完成后才能运行。
解决方案:Hadoop提供解决这样复杂依赖关系的类,Job以及JobControl(mapred包内,新API还相当不完善。)
使用Job的addDependingJob()函数来添加依赖关系,例如:Job1.addDependingJob(Job2)表示Job2不完成,Job1则不会开始。
2.串联一个Job之上的预处理和后处理Mapper步骤。
情况:有许多的数据处理工作包括针对一条记录的预处理和后处理,例如进行文档信息检索的时候,我们需要首先去除掉a,the等无太大意义的词汇,然后再转换单词格式(finish,finished等不同格式统一转换为finish)然后再进行处理步骤。
解决方案:Hadoop提供ChainMapper和ChainReducer提供这样的功能。详见《Hadoop in Action》
The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task.
通过ChainMapper可以将多个map类合并成一个map任务。
下面个这个例子没什么实际意思,但是很好的演示了ChainMapper的作用。
源文件
100 tom 90
101 mary 85
102 kate 60
map00的结果,过滤掉100的记录
101 mary 85
102 kate 60
map01的结果,过滤掉101的记录
102 kate 60
reduce结果
102 kate 60
package org.myorg;
importjava.io.IOException;
importjava.util.*;
importjava.lang.String;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.conf.*;
importorg.apache.hadoop.io.*;
importorg.apache.hadoop.mapred.*;
importorg.apache.hadoop.util.*;
importorg.apache.hadoop.mapred.lib.*;
publicclassWordCount
{
publicstaticclassMap00 extendsMapReduceBase implementsMapper
{
publicvoidmap(Text key, Text value, OutputCollector output, Reporter reporter) throwsIOException
{
Text ft =newText(“100″);
if(!key.equals(ft))
{
output.collect(key, value);
}
}
}
publicstaticclassMap01 extendsMapReduceBase implementsMapper
{
publicvoidmap(Text key, Text value, OutputCollector output, Reporter reporter) throwsIOException
{
Text ft =newText(“101″);
if(!key.equals(ft))
{
output.collect(key, value);
}
}
}
publicstaticclassReduce extendsMapReduceBase implementsReducer
{
publicvoidreduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throwsIOException
{
while(values.hasNext())
{
output.collect(key, values.next());
}
}
}
publicstaticvoidmain(String[] args) throwsException
{
JobConf conf =newJobConf(WordCount.class);
conf.setJobName(“wordcount00″);
conf.setInputFormat(KeyValueTextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
ChainMapper cm =newChainMapper();
JobConf mapAConf =newJobConf(false);
cm.addMapper(conf, Map00.class, Text.class, Text.class, Text.class, Text.class, true, mapAConf);
JobConf mapBConf =newJobConf(false);
cm.addMapper(conf, Map01.class, Text.class, Text.class, Text.class, Text.class, true, mapBConf);
conf.setReducerClass(Reduce.class);
conf00.setOutputKeyClass(Text.class);
conf00.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, newPath(args[0]));
FileOutputFormat.setOutputPath(conf, newPath(args[1]));
JobClient.runJob(conf);
}
}