作者:手机用户2602911885 | 来源:互联网 | 2023-08-01 12:25
mapJoin原理:适用于大表join小表,使用DistributedCache机制将小表存储到各个Mapper进程所在机器的磁盘空间上,各个Mapper进程读取不同的大表分片,将分片中的每一条记录与小表中所有记录进行合并
合并后直接输出map结果即可得到最终结果。
注:不需要进行shuffle流程,也不需要reduce处理
案列:
detail.txt
order_id item_id amout
12 sp001 2
12 sp002 4
12 sp003 3
13 sp001 2
13 sp002 4
iteminfo.txt
item_id item_type
sp001 type001
sp002 type002
sp003 type002
package squencefile;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;public class MapJoin {public static class MyMapper extends Mapper{private Map iteminfoMap &#61; new HashMap<>();/***将小表中记录加载到mapper进程机器内存中*/&#64;Overrideprotected void setup(Context context) throws IOException, InterruptedException {super.setup(context);//1、读磁盘空间上的对应小表(废弃)
// URI[] uri&#61;DistributedCache.getCacheFiles(context.getConfiguration());URI[] paths&#61;context.getCacheFiles();for(URI uri:paths){String pathName &#61; uri.toString();//判断是否是iteminfo小表if(!pathName.endsWith("iteminfo.txt")) return;//通过输入流读取磁盘上的文件BufferedReader reader&#61;new BufferedReader(new FileReader(pathName));String str &#61; null;while((str &#61; reader.readLine())!&#61;null){String[] itemInfoArr&#61;str.split("\t");if(itemInfoArr.length&#61;&#61;2){iteminfoMap.put(itemInfoArr[0],itemInfoArr[1]);}}}}/***通过读取大表中的每条记录*/&#64;Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//判断是否是大表数据&#xff0c;需要获取到输入分片的文件名&#xff0c;和大表的文件名进行对比String fileName &#61; ((FileSplit)context.getInputSplit()).getPath().getName();if(fileName.endsWith("detail.txt")){//将分片中的每一条记录与小表中所有记录进行合并String detail&#61;value.toString();String[] detailArr &#61; detail.split("\t");if(detailArr.length !&#61; 3) return;String itemType&#61;iteminfoMap.get(detailArr[1]);if(itemType &#61;&#61; null) return;System.out.print(detailArr);//输出格式&#xff1a;//拼接StringBuffer sb&#61;new StringBuffer();sb.append(itemType).append("\t").append(detailArr[0]).append("\t").append(detailArr[2]);//输出数据context.write(new Text(detailArr[1]),new Text(sb.toString()));}}}//public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//创建一个job&#xff0c;也就是一个运行环境Configuration conf &#61; new Configuration();//判断输出目录是否存在&#xff0c;如果存在就删除FileSystem fs&#61;FileSystem.get(conf);if(fs.exists(new Path("F:\\filnk_package\\hadoop-2.10.1\\data\\test6\\out"))){fs.delete(new Path("F:\\filnk_package\\hadoop-2.10.1\\data\\test6\\out"),true);}//将小表加载到各个Mapper进程所在的机器的磁盘上&#xff08;废弃掉了&#xff09;
// DistributedCache.addCacheFile(new Path("").toUri(),conf);//本地运行Job job&#61;Job.getInstance(conf,"MapJoin");//程序入口&#xff08;打jar包&#xff09;job.setJarByClass(MapJoin.class);//需要输入个文件&#xff1a;输入文件FileInputFormat.addInputPath(job,new Path("F:\\filnk_package\\hadoop-2.10.1\\data\\test6\\detail.txt"));//将小表加载到各个Mapper进程所在机器的磁盘上job.addCacheFile(new Path("F:\\filnk_package\\hadoop-2.10.1\\data\\test6\\iteminfo.txt").toUri());//编写mapper处理逻辑job.setMapperClass(MapJoin.MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//输出文件FileOutputFormat.setOutputPath(job,new Path("F:\\filnk_package\\hadoop-2.10.1\\data\\test6\\out"));//运行job&#xff0c;需要放到Yarn上运行boolean result &#61;job.waitForCompletion(true);System.out.print(result?1:0);}
}