作者:偶然公大伟 | 来源:互联网 | 2023-09-14 14:38
最近在学了python了,从mapReduce开始,话不多说了,直接上代码了哈map阶段,map.py文件reduce阶段:reduce.py文件map到reduce阶段要做一个排
最近在学了python了,从mapReduce开始 ,话不多说了,直接上代码了哈
map阶段,map.py文件
1 import sys
2
3 # 标准输入
4 # 在终端的话,就需要这样了 cat a.txt | python map_new.py,通过一个管道的形式进行标准输入
5 # strip 就是避免字符串前后有回车或者是隐含字符,一般对于字符串都要strip() 一下。
6
7 for line in sys.stdin:
8 # print(line.strip())
9 ss = line.strip().split(‘ ‘)
10 for word in ss:
11 # word和1之间用制表符进行分割
12 print(‘\t‘.join([word.strip(),‘1‘]))
reduce阶段:reduce.py文件
map到reduce阶段要做一个排序,相同的key放到了一起
1 import sys
2
3 cur_word = None
4 sum = 0
5
6 for line in sys.stdin:
7 ss = line.strip().split(‘\t‘)
8 if len(ss) != 2:
9 continue
10 word,cnt = ss
11 # 当读取第一行时,cur_word肯定是None吧
12 if cur_word == None:
13 cur_word = word
14 if cur_word != word:
15 # 当 cur_word 和 word不相等时,将其输出
16 print(‘\t‘.join([cur_word,str(sum)]))
17 cur_word = word
18 sum = 0
19
20 sum += int(cnt)
21 # 对最后一行进行输出
22 print(‘\t‘.join([cur_word,str(sum)]))
还需要一个run.sh
HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"
INPUT_FILE_PATH_1="/1.data"
OUTPUT_PATH="/output"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH -input $INPUT_FILE_PATH_1 -output $OUTPUT_PATH -mapper "python map.py" -reducer "python reduce.py" -file ./map_new.py -file ./red_new.py
# HADOOP_CMD: hadoop的bin的路径
# STREAM_JAR_PATH:streaming jar包的路径
# INPUT_FILE_PATH:hadoop集群上的资源输入路径
# OUTPUT_PATH:hadoop集群上的结果输出路径
执行和查看
# cat data.txt | pyton map.py | sort -k1 | python reduce.py > result.txt
# cat result.txt | sort -k2 -rn | head
写的比较简单哈