作者:邵子轩一号 | 来源:互联网 | 2023-07-31 09:51
序:终于开始接触hadoop了,从wordcount开始1.采用hadoopstreamming模式优点:支持C++pathonshell等多种语言,学习成本较低,不需要了解had
序:终于开始接触hadoop了,从wordcount开始
1. 采用hadoop streamming模式
优点:支持C++ pathon shell 等多种语言,学习成本较低,不需要了解hadoop内部结构
调试方便:cat input | ./map | sort | ./reduce > output
hadoop 就是提供了一个分布式平台实现了上述脚本的功能,这是一次mapreduce的过程
一个例子:
1 #!/bin/bash
2 source build.env
3 $hadoop_bin fs -rmr $env_root
4 $hadoop_bin fs -mkdir $env_root
5 $hadoop_bin fs -copyFromLocal ./txt $env_root/txt
6 $hadoop_bin streaming \
7 -jobconf mapred.job.name="word count fuck you man~!" \
8 -input $env_root/txt \ //map程序的输入:cat input | ./map
9 -output $env_root/outputxt \ //reduce程序的输出 : ./reduce > output
10 -mapper "./wordcount_map"\
11 -reducer "./wordcount_reducer"\
12 -file ./wordcount_map\
13 -file ./wordcount_reducer\
14 -jobconf mapred.job.map.capacity=1000 \
15 -jobconf mapred.job.reduce.capacity=1000 \
16 -jobconf mapred.child.ulimit=20000000 \
17 -jobconf mapred.job.queue.name=ns-webgis \
18 -jobconf mapred.job.priority=HIGH \
19 -jobconf mapred.map.tasks.speculative.execution=false \
20 -jobconf mapred.map.tasks=10 \
21 -jobconf mapred.reduce.tasks=2
22 if [ $? -ne 0 ]
23 then
24 echo "error~~~~" >&2
25 exit -1
26 fi
27 $hadoop_bin fs -get $env_root/outputxt .
2. map :cat input | ./map >> temp
1)hadoop平台做了什么:
a.切分文件:把input文件按照一定的策略切割分成若干个小文件
b.将若干个小文件分别分发到不同节点上
c. 每个节点上都有一个map程序,然后将任务分发到不同的节点上
2)自己要写的wordcount_map要做什么:
wordcount_map从input中按行进行读取,然后按照业务逻辑将读取到的内容拼成 key \t value的形式 ,这个输出将会作为reduce程序的输入
在这里输出的是 word 1 此处 word是key 1是value
注意:此处是标准输出、输入 std::cout std::cin in C++
key与value之间用\t分割,第一个\t之前的值作为key,之后的都作为value 注意:这个key会被hadoop平台用到 平台不关注value值
1 #include
2 #include<string>
3 #include
4 using namespace std;
5 void split(string src,vector<string>& dest,string separator)
6 {
7 string str = src;
8 string substring;
9 string::size_type start = 0, index;
10
11 do
12 {
13 index = str.find_first_of(separator,start);
14 if (index != string::npos)
15 {
16 substring = str.substr(start,index-start);
17 dest.push_back(substring);
18 start = str.find_first_not_of(separator,index);
19 if (start == string::npos) return; }
20 }while(index != string::npos);
21 substring = str.substr(start);
22 dest.push_back(substring);
23 }
24 void map()
25 {
26 string line;
27 vector<string> vec(2);
28 while(cin>>line)
29 {
30 vec.clear();
31 split(line,vec," ");
32 vector<string>::iterator it=vec.begin();
33 for(;it!=vec.end();++it)
34 {
35 cout<<*it<<"\t"<<"1"<<"\t"<<"fuck"<<endl;
36 }
37 }
38 }
39 int main()
40 {
41 map();
42 }
wordcount_map
3. reduce: sort | ./reduce > output
等到所有的map任务都结束:
1)hadoop平台做了这些事情
a.将所有的map程序的输出结果中key相同的key value pair 放到相同的节点上,这点很重要,这是保证最终输出结果正确的保证,后面会按照key进行hash , 并且相同 key之间不会有其他的key,其实是按照key值做了一个排序
注意:相同的key一定在一个节点上,但是一个节点上不止有一个个key
b 然后在各个节点上开始reduce任务
2)自己写的wordcount_map做了什么
a. 读取这些具有相同key的键值对,处理自己的业务逻辑,此处就是将统计相同的key值的键值对一共出现了几次,然后将结果输出,此处也是标准输入和标准输出
#include
#include
wordcount_reduce