作者:mobiledu2502914555 | 来源:互联网 | 2023-05-17 20:17
----------------------------------主程序入口----------------------------------packagecom.demo01
----------------------------------主程序入口----------------------------------
package com.demo01.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool {
/**
*主程序入口
* @param args
*/
public static void main(String[] args) throws Exception {
//这里执行完成,返回一个程序退出状态码 0成功
//这里设置configguration相当于给父类赋值了
int run = ToolRunner.run(new Configuration(),new JobMain(),args);
System.exit(run);
}
/**
*
* run方法很重要,用来组装8个类,用Job组装在一起
* @param strings
* @return
* @throws Exception
*/
@Override
public int run(String[] strings) throws Exception {
//1.读取文件解析成value对
//第一个是configuration配置文件,第二个定义job的名字
Job job = Job.getInstance(super.getConf(),"XXX");
//设置程序入口类
job.setJarByClass(JobMain.class);
//设置job接收的的数据类型
job.setInputFormatClass(TextInputFormat.class);
//设置需要处理的文件
//hdfs集群下执行
// FileInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount"));
//本地测试
FileInputFormat.addInputPath(job,new Path("file:///D:\\dsj\\baishi课件\\hadoop\\3、大数据离线第三天\\3、大数据离线第三天\\wordcount\\input"));
//2.自定义mapper类
job.setMapperClass(WordCountMapper.class);
//设置key2和value2的类
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
/**
* 第三到六步:
* 分区 相同key的value,放松到一个reduce,key合并,value形成一个集合
* 排序
* 规约
* 分组
*/
//7.自定义reduce逻辑
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//8.输出文件
//路径一定要不存在,存在就报错
// TextOutputFormat.setOutputPath(job,new Path("hdfs://node01/wordcountoutput"));
//本地测试
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\dsj\\baishi课件\\hadoop\\3、大数据离线第三天\\3、大数据离线第三天\\wordcount\\output"));
//提交任务到集群上
boolean b = job.waitForCompletion(true);
return b?0:1;
}
}
----------------------------------mapper程序----------------------------------
package com.demo01.wordcount;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
//此处泛型hadoop对java基础类型进行了包装,加快网络传输, 4个参数代表
public class WordCountMapper extends Mapper {
//重写map方法:自定义k1 v1转换到k2 v2的方法
/**
*
* @param key k1
* @param value v1
* @param context 上下文对象,对接我们上面的组件与下面的组件
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//hive,sqoop,flume,hello
String[] split = value.toString().split(",");
//遍历k2和v2往下发送
for (String word : split) {
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1);
context.write(k2,v2);
}
}
}
----------------------------REDUCE程序--------------------------------------
package com.demo01.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
//k2,v2,k3,v3
public class WordCountReduce extends Reducer {
/**
*
* @param key k2
* @param values 一个集合,集合类型是v2的类型
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int num = 0;
for (LongWritable value : values) {
//IntWritable这个类没有加方法,通过get()编程编程java类型
num += value.get();
}
context.write(key,new LongWritable(num));
}
}