通过案例学习MAPREDUCE教程
1. wordcount示例开发
1.1. wordcount程序整体运行流程示意图
map阶段&#xff1a; 将每一行文本数据变成<单词,1>这样的kv数据
reduce阶段&#xff1a;将相同单词的一组kv数据进行聚合&#xff1a;累加所有的v
注意点&#xff1a;mapreduce程序中&#xff0c;
map阶段的进、出数据&#xff0c;
reduce阶段的进、出数据&#xff0c;
类型都应该是实现了HADOOP序列化框架的类型&#xff0c;如&#xff1a;
String对应Text
Integer对应IntWritable
Long对应LongWritable
1.2. 编码实现
WordcountMapper类开发
WordcountReducer类开发
JobSubmitter客户端类开发
《详见代码》
1.3. 运行mr程序
1) 将工程整体打成一个jar包并上传到linux机器上&#xff0c;
2) 准备好要处理的数据文件放到hdfs的指定目录中
3) 用命令启动jar包中的Jobsubmitter&#xff0c;让它去提交jar包给yarn来运行其中的mapreduce程序 &#xff1a; hadoop jar wc.jar cn.edu360.mr.wordcount.JobSubmitter .....
4) 去hdfs的输出目录中查看结果
2. yarn快速理解
2.1. yarn的基本概念
yarn是一个分布式程序的运行调度平台
yarn中有两大核心角色&#xff1a;
1、Resource Manager
接受用户提交的分布式计算程序&#xff0c;并为其划分资源
管理、监控各个Node Manager上的资源情况&#xff0c;以便于均衡负载
2、Node Manager
管理它所在机器的运算资源&#xff08;cpu &#43; 内存&#xff09;
负责接受Resource Manager分配的任务&#xff0c;创建容器、回收资源
2.2. YARN的安装
node manager在物理上应该跟data node部署在一起
resource manager在物理上应该独立部署在一台专门的机器上
1、修改配置文件&#xff1a;
vi yarn-site.xml
|
2、scp这个yarn-site.xml到其他节点
3、启动yarn集群&#xff1a;start-yarn.sh &#xff08;注&#xff1a;该命令应该在resourcemanager所在的机器上执行&#xff09;
4、用jps检查yarn的进程&#xff0c;用web浏览器查看yarn的web控制台
http://hdp20-01:8088
3. mr编程案例&#xff08;一&#xff09;
3.1. mr编程案例1——求TOPN
1、读取附件中的文件request.dat&#xff0c;
需求1&#xff1a;求出每一个url被访问的总次数&#xff0c;并将结果输出到一个结果文件中
思路&#xff1a;就是一个wordcount
map阶段&#xff1a; 解析数据&#xff0c;将url作为key&#xff0c;1作为value发出
reduce阶段&#xff1a;将一组数据的value累加
需求2&#xff1a;求出每个网站被访问次数最多的top3个url《分组TOPN》
思路&#xff1a;
map阶段——切字段&#xff0c;抽取域名作为key&#xff0c;url作为value&#xff0c;返回即可
reduce阶段——用迭代器&#xff0c;将一个域名的一组url迭代出来&#xff0c;挨个放入一个hashmap中进行计数&#xff0c;最后从这个hashmap中挑出次数最多的3个url作为结果返回
需求3&#xff1a;求访问次数最多的topn个网站&#xff08;只能有1个reduce worker&#xff09;《全局TOPN》
思路&#xff1a;
map阶段&#xff1a;解析数据&#xff0c;将域名作为key&#xff0c;1作为value
reduce阶段&#xff1a;
reduce方法中——对一个域名的一组1累加&#xff0c;然后将 <域名,总次数>放入一个成员变量Treemap中
cleanup方法中——从treemap中挑出次数最高的n个域名作为结果输出
要点1&#xff1a;每一个reduce worker程序&#xff0c;会在处理完自己的所有数据后&#xff0c;调用一次cleanup方法
要点2&#xff1a;如何向map和reduce传自定义参数
从JobSubmitter的main方法中&#xff0c;可以向map worker和reduce worker传递自定义参数&#xff08;通过configuration对象来写入自定义参数&#xff09;&#xff1b;然后&#xff0c;我们的map方法和reduce方法中&#xff0c;可以通过context.getConfiguration()来取自定义参数
3.2. mr编程案例2——自定义类型
需求&#xff1a;统计一下文件中&#xff0c;每一个用户所耗费的总上行流量&#xff0c;总下行流量&#xff0c;总流量
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 |
思路&#xff1a;
map阶段&#xff1a;将每一行按tab切分成各字段&#xff0c;提取其中的手机号作为输出key&#xff0c;流量信息封装到FlowBean对象中&#xff0c;作为输出的value
要点&#xff1a;自定义类型如何实现hadoop的序列化接口
FlowBean这种自定义数据类型必须实现hadoop的序列化接口&#xff1a;Writable&#xff0c;实现其中的两个方法&#xff1a;
readFields(in) 反序列化方法
write(out) 序列化方法
reduce阶段&#xff1a;遍历一组数据的所有value&#xff08;flowbean&#xff09;&#xff0c;进行累加&#xff0c;然后以手机号作为key输出&#xff0c;以总流量信息bean作为value输出
3.3. mr编程案例3——自定义Partitioner
统计每一个用户的总流量信息&#xff0c;并且按照其归属地&#xff0c;将统计结果输出在不同的文件中
1、思路&#xff1a;
想办法让map端worker在将数据分区时&#xff0c;按照我们需要的按归属地划分
实现方式&#xff1a;自定义一个Partitioner
2、实现
先写一个自定义Paritioner
&#xff08;代码见工程&#xff09;
3.4. mr编程案例4——全局排序
需求&#xff1a;统计request.dat中每个页面被访问的总次数&#xff0c;同时&#xff0c;要求输出结果文件中的数据按照次数大小倒序排序
关键技术点&#xff1a;
mapreduce程序内置了一个排序机制&#xff1a;
map worker 和reduce worker &#xff0c;都会对数据按照key的大小来排序
所以最终的输出结果中&#xff0c;一定是按照key有顺序的结果
思路&#xff1a;
本案例中&#xff0c;就可以利用这个机制来实现需求&#xff1a;
1、先写一个mr程序&#xff0c;将每个页面的访问总次数统计出来
2、再写第二个mr程序&#xff1a;
map阶段&#xff1a; 读取第一个mr产生的结果文件&#xff0c;将每一条数据解析成一个java对象UrlCountBean&#xff08;封装着一个url和它的总次数&#xff09;&#xff0c;然后将这个对象作为key&#xff0c;null作为value返回
要点&#xff1a;这个java对象要实现WritableComparable接口&#xff0c;以让worker可以调用对象的compareTo方法来进行排序
reduce阶段&#xff1a;由于worker已经对收到的数据按照UrlCountBean的compareTo方法排了序&#xff0c;所以&#xff0c;在reduce方法中&#xff0c;只要将数据输出即可&#xff0c;最后的结果自然是按总次数大小的有序结果
看示意图&#xff1a;
3.5. mr编程案例5——倒排索引创建
需求&#xff1a;有如下数据&#xff1a;
a.txt
hello tom hello jim hello kitty hello rose |
b.txt
hello jerry hello jim hello kitty hello jack |
c.txt
hello jerry hello java hello c&#43;&#43; hello c&#43;&#43; |
思路&#xff1a;
1、先写一个mr程序&#xff1a;统计出每个单词在每个文件中的总次数
hello-a.txt 4
hello-b.txt 4
hello-c.txt 4
java-c.txt 1
jerry-b.txt 1
jerry-c.txt 1
要点1&#xff1a;map方法中&#xff0c;如何获取所处理的这一行数据所在的文件名&#xff1f;
worker在调map方法时&#xff0c;会传入一个context&#xff0c;而context中包含了这个worker所读取的数据切片信息&#xff0c;而切片信息又包含这个切片所在的文件信息
那么&#xff0c;就可以在map中&#xff1a;
FileSplit split &#61; context.getInputSplit();
String fileName &#61; split.getpath().getName();
要点2&#xff1a;setup方法
worker在正式处理数据之前&#xff0c;会先调用一次setup方法&#xff0c;所以&#xff0c;常利用这个机制来做一些初始化操作&#xff1b;
2、然后在写一个mr程序&#xff0c;读取上述结果数据&#xff1a;
map&#xff1a; 根据-切&#xff0c;以单词做key&#xff0c;后面一段作为value
reduce&#xff1a; 拼接values里面的每一段&#xff0c;以单词做key&#xff0c;拼接结果做value&#xff0c;输出即可
3.6. mr编程案例6——自定义GroupingComparator
需求&#xff1a;一下数据&#xff0c;表示线段的左端点和右端点
1,4 2,5 3,4 2,6 4,7 5,8 5,9 6,10 10,15 11,16 12,18 13,17 |
需求1&#xff1a;求所有交错的层数
map&#xff1a;将一条线段的范围内坐标点作为key&#xff0c;1作为value
reduce&#xff1a;累加1&#xff0c;就得到了坐标点上的重叠次数
需求2&#xff1a;求出重叠次数最高的坐标点及其重叠次数&#xff08;全局TOPN&#xff09;
map&#xff1a;读数据&#xff0c;将<标点,重叠次数>封装为一个PointTimes对象&#xff0c;作为key返回&#xff0c;null为value
注意&#xff1a;PointTimes类要实现WritableComparable接口&#xff0c;以规定如何比较两个PointTimes对象的大小
reduce端&#xff1a;
注意&#xff1a;要想办法让worker将所有的PointTimes对象看成相同&#xff0c;以让worker把所有的数据看成一组&#xff0c;来调一次reduce方法。
实现方式就是自定义一个GroupingComparator(//具体见代码)
然后&#xff0c;reduce方法中&#xff0c;输出这一组&#xff08;其实是这个worker所收到的全部数据&#xff09;的前n个key即可
3.7. mr编程案例7——控制输入、输出格式
需求&#xff1a;还是对案例3中的流量数据进行汇总&#xff0c;然后求出汇总结果中的流量最大的TOPN条
步骤1:——
思路&#xff1a;统计逻辑跟之前的流量统计一致&#xff1a;
map&#xff1a;以手机号作为key&#xff0c;flowbean作为value
注&#xff1a;步骤1输出的结果文件通过指定SequenceFileOutputFormat来产生SequenceFile文件&#xff1b;SequenceFile文件是hadoop定义的一种文件&#xff0c;里面存放的是大量key-value的对象序列化字节&#xff08;文件头部还存放了key和value所属的类型名&#xff09;&#xff1b;
步骤2&#xff1a;
思路&#xff1a;读取步骤1的SequenceFile结果文件&#xff0c;需要指定inputformatclass为SequenceFileInputFormat组件
既然使用了这种输入组件&#xff0c;那么我们的map方法中直接就接收一对KEY-VALUE数据
如何实现topn呢&#xff1f;
通过把所有汇总数据发给同一个reduce端的worker&#xff0c;并且通过定义一个GroupingComparator来让这个worker把所有手机号的flowbean对象看成一组&#xff0c;调用一次reduce方法&#xff0c;我们就只需要在reduce方法中输出前n个即可
4. mapreduce框架的运作机制
4.1. 核心角色&#xff1a;
MRAppmaster
Worker(map task)
Worker(reduce task)
4.2. 运行机制全图&#xff1a;
5. mr编程案例&#xff08;二&#xff09;
5.1. mr编程案例8——join算法
5.1.1. 数据&#xff1a;
select a.*,b.* from a join b on a.uid&#61;b.uid;
有订单数据&#xff1a;
order001,u001 order002,u001 order003,u005 order004,u002 order005,u003 order006,u004 |
有用户数据&#xff1a;
u001,senge,18,angelababy u002,laozhao,48,ruhua u003,xiaoxu,16,chunge u004,laoyang,28,zengge u005,nana,14,huangbo |
5.1.2. 需求&#xff1a;
5.1.3. 思路&#xff1a;
map端&#xff1a;
不管worker读到的是什么文件&#xff0c;我们的map方法中是可以通过context来区分的
对于order数据&#xff0c;map中切字段&#xff0c;封装为一个joinbean&#xff0c;打标记&#xff1a;t_order
对于user数据&#xff0c;map中切字段&#xff0c;封装为一个joinbean&#xff0c;打标记&#xff1a;t_user
然后&#xff0c;以uid作为key&#xff0c;以joinbean作为value返回
reduce端&#xff1a;
用迭代器迭代出一组相同uid的所有数据joinbean&#xff0c;然后判断
如果是标记字段为t_order的&#xff0c;则加入一个arraylist
如果标记字段为t_user的&#xff0c;则放入一个Joinbean对象中
然后&#xff0c;遍历arraylist&#xff0c;对里面的每一个JoinBean填充userBean中的user数据&#xff0c;然后输出这个joinBean即可
5.2. mr编程案例9——数据倾斜场景
5.2.1. 数据&#xff1a;
a a a a a a b b b a a a a a a a c c b c a a a c a b b c a a d d e e f f f g a a a b a b h h g j |
5.2.2. 需求&#xff1a;
需要做wordcount
但是&#xff0c;会有一个问题存在&#xff1a;
a特别多&#xff0c;
负责处理a这个单词数据的reduce worker就会很累&#xff08;负载不均衡&#xff0c;过大&#xff09;
思考&#xff1a;如何处理&#xff1f;会让整个数据处理过程中&#xff0c;数据倾斜的状况得到缓解
5.2.3. 思路&#xff1a;
将key进行打散&#xff08;具体方案&#xff1a;给key拼接一个随机字符串&#xff09;&#xff0c;以均衡reduce端worker的负载&#xff1b;
5.3. mr编程案例10——json数据解析
5.3.1. 数据&#xff1a;
{"movie":"1193","rate":"5","timeStamp":"978300760","uid":"1"} {"movie":"661","rate":"3","timeStamp":"978302109","uid":"1"} {"movie":"914","rate":"3","timeStamp":"978301968","uid":"1"} {"movie":"1357","rate":"5","timeStamp":"978298709","uid":"2"} {"movie":"3068","rate":"4","timeStamp":"978299000","uid":"2"} {"movie":"1537","rate":"4","timeStamp":"978299620","uid":"2"} {"movie":"647","rate":"3","timeStamp":"978299351","uid":"3"} {"movie":"2194","rate":"4","timeStamp":"978299297","uid":"3"} |
求&#xff1a;
每个用户评分最高的N条记录
每个用户评分的平均值
最大方&#xff08;评分给得高&#xff09;的N个用户
最热门的N部电影
评价最高的N部电影
.......
5.3.2. 需求&#xff1a;
求出每个用户的评分总和
5.3.3. 思路&#xff1a;
典型的wordcount
5.4. mr编程案例11——共同好友分析
5.4.1. 数据&#xff1a;
冒号左边为用户id&#xff0c;冒号右边为用户的好友列表
A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:A,O J:B,O K:A,C,D L:D,E,F M:E,F,G O:A,H,I,J |
5.4.2. 需求&#xff1a;
求出有共同好友的用户对&#xff0c;及他们的共同好友&#xff0c;如&#xff1a;
A与B&#xff0c;有共同好友C,E
A与C&#xff0c;有共同好友D
......
5.4.3. 思路&#xff1a;
6. mr客户端提交job到集群运行
6.1. yarn的资源参数配置
yarn.scheduler.minimum-allocation-mb 默认值&#xff1a;1024 // yarn分配一个容器时最低内存
yarn.scheduler.maximum-allocation-mb 默认值&#xff1a;8192 // yarn分配一个容器时最大内存
yarn.scheduler.minimum-allocation-vcores 默认值&#xff1a;1 // yarn分配一个容器时最少cpu核数
yarn.scheduler.maximum-allocation-vcores 默认值&#xff1a;32 // yarn分配一个容器时最多cpu核数
// 1个nodemanager拥有的总内存资源
yarn.nodemanager.resource.memory-mb 默认值&#xff1a;8192
// 1个nodemanager拥有的总cpu资源&#xff08;逻辑的&#xff0c;表示比例而已&#xff09;
yarn.nodemanager.resource.cpu-vcores 默认值&#xff1a;8
7. zookeeper 集群搭建
上传安装包&#xff0c;解压
修改conf/zoo.cfg
# The number of milliseconds of each tick tickTime&#61;2000 # The number of ticks that the initial # synchronization phase can take initLimit&#61;10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit&#61;5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir&#61;/root/zkdata # the port at which the clients will connect clientPort&#61;2181 # Set to "0" to disable auto purge feature #autopurge.purgeInterval&#61;1 server.1&#61;hdp-01:2888:3888 server.2&#61;hdp-02:2888:3888 server.3&#61;hdp-03:2888:3888 |
配置文件修改完后&#xff0c;将安装包拷贝给hdp-02 和 hdp-03
接着&#xff0c;到hdp-01上&#xff0c;新建数据目录/root/zkdata&#xff0c;并在目录中生成一个文件myid&#xff0c;内容为1
接着&#xff0c;到hdp-02上&#xff0c;新建数据目录/root/zkdata&#xff0c;并在目录中生成一个文件myid&#xff0c;内容为2
接着&#xff0c;到hdp-03上&#xff0c;新建数据目录/root/zkdata&#xff0c;并在目录中生成一个文件myid&#xff0c;内容为3
启动zookeeper集群&#xff1a;