作者:Tandbs | 来源:互联网 | 2023-06-23 19:11
job提交源码分析
在eclipse中的写的代码如何提交作业到JobTracker中的哪?
(1)在eclipse中调用的job.waitForCompletion(true)实际上执行如下方法
connect();
info = jobClient.submitJobInternal(conf);
(2)在connect()方法中,实际上创建了一个JobClient对象。
在调用该对象的构造方法时,获得了JobTracker的客户端代理对象JobSubmissionProtocol。
JobSubmissionProtocol的实现类是JobTracker。
(3)在jobClient.submitJobInternal(conf)方法中,调用了
JobSubmissionProtocol.submitJob(...),
即执行的是JobTracker.submitJob(...)。
Hadoop数据类型
1.Hadoop的数据类型要求必须实现Writable接口。
2.java基本类型与Hadoop常见基本类型的对照
Long LongWritable
Integer IntWritable
Boolean BooleanWritable
String Text
java类型如何转化为hadoop基本类型?
调用hadoop类型的构造方法,或者调用set()方法。
new LongWritable(123L);
hadoop基本类型如何转化为java类型?
对于Text,需要调用toString()方法,其他类型调用get()方法。
使用Hadoop自定义类型处理手机上网日志
1、首先,将手机上网日志文件HTTP_20130313143750.dat通过WinSCP工具复制到/usr/local目录下
2、将日志文件上传到hdfs://chaoren:9000/wlan文件夹下
日志文件:
日志文件中各字段含义:
3、编写Java代码将日志文件中想要的数据统计出来。
1 package mapreduce;
2
3 import java.io.DataInput;
4 import java.io.DataOutput;
5 import java.io.IOException;
6
7 import org.apache.hadoop.conf.Configuration;
8 import org.apache.hadoop.fs.Path;
9 import org.apache.hadoop.io.LongWritable;
10 import org.apache.hadoop.io.Text;
11 import org.apache.hadoop.io.Writable;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Reducer;
15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
18 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
19 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
20
21 public class KpiApp {
22 static final String INPUT_PATH = "hdfs://chaoren:9000/wlan";//wlan是个文件夹,日志文件放在/wlan目录下
23 static final String OUT_PATH = "hdfs://chaoren:9000/out";
24
25 public static void main(String[] args) throws Exception {
26 final Job job = new Job(new Configuration(),
27 KpiApp.class.getSimpleName());
28 // 1.1 指定输入文件路径
29 FileInputFormat.setInputPaths(job, INPUT_PATH);
30 // 指定哪个类用来格式化输入文件
31 job.setInputFormatClass(TextInputFormat.class);
32
33 // 1.2指定自定义的Mapper类
34 job.setMapperClass(MyMapper.class);
35 // 指定输出的类型
36 job.setMapOutputKeyClass(Text.class);
37 job.setMapOutputValueClass(KpiWritable.class);
38
39 // 1.3 指定分区类
40 job.setPartitionerClass(HashPartitioner.class);
41 job.setNumReduceTasks(1);
42
43 // 1.4 TODO 排序、分区
44
45 // 1.5 TODO (可选)归约
46
47 // 2.2 指定自定义的reduce类
48 job.setReducerClass(MyReducer.class);
49 // 指定输出的类型
50 job.setOutputKeyClass(Text.class);
51 job.setOutputValueClass(KpiWritable.class);
52
53 // 2.3 指定输出到哪里
54 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
55 // 设定输出文件的格式化类
56 job.setOutputFormatClass(TextOutputFormat.class);
57
58 // 把代码提交给JobTracker执行
59 job.waitForCompletion(true);
60 }
61
62 static class MyMapper extends Mapper {
63 protected void map(
64 LongWritable key,
65 Text value,
66 org.apache.hadoop.mapreduce.Mapper.Context context)
67 throws IOException, InterruptedException {
68 final String[] splited = value.toString().split("\t");
69 final String msisdn = splited[1];
70 final Text k2 = new Text(msisdn);
71 final KpiWritable v2 = new KpiWritable(splited[6], splited[7],
72 splited[8], splited[9]);
73 context.write(k2, v2);
74 };
75 }
76
77 static class MyReducer extends
78 Reducer {
79 /**
80 * @param k2
81 * 表示整个文件中不同的手机号码
82 * @param v2s
83 * 表示该手机号在不同时段的流量的集合
84 */
85 protected void reduce(
86 Text k2,
87 java.lang.Iterable v2s,
88 org.apache.hadoop.mapreduce.Reducer.Context context)
89 throws IOException, InterruptedException {
90 long upPackNum = 0L;
91 long downPackNum = 0L;
92 long upPayLoad = 0L;
93 long downPayLoad = 0L;
94
95 for (KpiWritable kpiWritable : v2s) {
96 upPackNum += kpiWritable.upPackNum;
97 downPackNum += kpiWritable.downPackNum;
98 upPayLoad += kpiWritable.upPayLoad;
99 downPayLoad += kpiWritable.downPayLoad;
100 }
101
102 final KpiWritable v3 = new KpiWritable(upPackNum + "", downPackNum
103 + "", upPayLoad + "", downPayLoad + "");
104 context.write(k2, v3);
105 };
106 }
107 }
108
109 class KpiWritable implements Writable {
110 long upPackNum;
111 long downPackNum;
112 long upPayLoad;
113 long downPayLoad;
114
115 public KpiWritable() {
116 }
117
118 public KpiWritable(String upPackNum, String downPackNum, String upPayLoad,
119 String downPayLoad) {
120 this.upPackNum = Long.parseLong(upPackNum);
121 this.downPackNum = Long.parseLong(downPackNum);
122 this.upPayLoad = Long.parseLong(upPayLoad);
123 this.downPayLoad = Long.parseLong(downPayLoad);
124 }
125
126 public void readFields(DataInput in) throws IOException {
127 this.upPackNum = in.readLong();
128 this.downPackNum = in.readLong();
129 this.upPayLoad = in.readLong();
130 this.downPayLoad = in.readLong();
131 }
132
133 public void write(DataOutput out) throws IOException {
134 out.writeLong(upPackNum);
135 out.writeLong(downPackNum);
136 out.writeLong(upPayLoad);
137 out.writeLong(downPayLoad);
138 }
139
140 @Override
141 public String toString() {
142 return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t"
143 + downPayLoad;
144 }
145 }
4、运行结果