-
小表 join 大表
将key相对分散并且数据量小的表放在join的左边,这样既可以减少内存溢出错误发生的几率,又可以使用Group让小的维度表(1000条以下的记录条数)先进内存,在map端完成reduce。
早起版本需要考虑此种优化方式,现在版本的hive有了一个优化器,可以自动选择出小表,并将小表的数据一次性放到内存
-
多个表关联时,最好分拆成小段,避免大sql(无法控制中间Job)
-
大表 join 大表
原则:尽量减少输入量
(1) 空key过滤
有时一些数据表中会出现大量的相同key对应着空字段,如果这些key对应的是异常数据,这时就可以过滤空key来优化Hive的运行速度
INSERT OVERWRITE TABLE jointable
SELECT a.* FROM(
SELECT * FROM nullidtable
WHERE id is NOT NULL) AS a
JOIN ori AS b
ON a.id=b.id;
(2) 空key转换
如果key对应的不是异常数据,且其对应的值必须包含在join的结果中,那么就可以将主表中key为空的字段赋一个随机值,这样可以使数据随机均匀地分不到不同的reducer上,避免数据倾斜
set set hive.exec.reducers.bytes.per.reducer=32123456;
设置每个reduce任务处理的数据量
set mapreduce.job.reduces=7;
设置执行任务的reduce个数
INSERT OVERWRITE TABLE jointable
SELECT a.* FROM nullidtable AS a
LEFT JOIN ori AS b
ON CASE WHEN a.id is NULL THEN concat('hive',rand())
ELSE a.id END = b.id
-
MapJoin
这就是前面提到的,新版本hive自动选择小表的设定
set hive.auto.convert.join = true;
开启MapJoin,一般默认就是开启的
set hive.mapjoin.smalltable.filesize=25123456;
设置数据大小是多少值以下为小表,默认是25M
工作机制:
-
Group By
一般情况下,Map阶段的Key相同的数据会分发给一个Reduce,如果Key相同的数据过大时就会发生数据倾斜。其实,并不是所有的聚合操作要在Reduce阶段完成,很多聚合操作可以现在Map端进行部分聚合
set hive.map.aggr = true;
开启在Map端的聚合操作,默认为true
set hive.groupby.mapaggr.checkinterval = 100000;
在Map端进行聚合操作的条目数目,默认100000条
set hive.groupby.skewindata = true;
有数据倾斜的时候进行负载均衡,默认是false
负载均衡的选项设置为true后,查询计划会生成两个MapReduce Job。
第一个MapReduce Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,在这一步中相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;(这一步主要在做负载均衡)
第二个MapReduce Job中,会根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。(这一步主要在分组)
-
Count (Distinct)
在数据量很大的情况下,COUNT(DISTINCT)的操作需要用一个ReduceTask来完成,这样这个ReduceTask要处理的数据量巨大,这种情况可以用 先GROUP BY再COUNT 的方法替换,从而优化hive
比如:
SELECT COUNT(DISTINCT id)FROM bigtable;
可以替换为
SELECT COUNT(id) FROM(
SELECT id FROM bigtable
GROUP BY id) AS a;
-
笛卡尔积
尽量避免笛卡尔积,即避免join的时候不加on条件,或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积。
-
分区剪裁、列剪裁
尽量只拿需要的列,有分区就要带上分区条件,另外遇到如下情况,尽量先过滤再Join
SELECT a.id FROM bigtable AS a
LEFT JOIN ori AS b
ON a.id=b.id
WHERE b.id<=10;
可以替换为
SELECT a.id FROM bigtable AS a
LEFT JOIN ori AS b
ON (b.id<=10 AND a.id=b.id);
或者
SELECT a.id FROM bigtable AS a
LEFT JOIN (
SELECT id FROM ori
WHERE id <=10) AS b
ON a.id=b.id;
-
动态分区调整
主要适用于分区表,以第一个表的分区规则,来对应第二个表的分区规则,将第一个表的所有分区,全部拷贝到第二个表中来,第二个表在加载数据的时候,不需要指定分区,直接用第一个表的分区即可
set hive.exec.dynamic.partition = true;
开启动态分区功能,默认为true
set hive.exec.dynamic.partition.mode = nonstrict;
设置为非严格模式,strict为严格模式,表示必须至少指定一个分区为静态分区,nonstrict表示允许所有的分区字段都可以使用动态分区)
set hive.exec.max.dynamic.partitiOns= 1000;
在所有执行MR的节点上,最大一共可以创建多少个动态分区,默认1000个
set hive.exec.max.dynamic.partitions.pernode = 100;
在每个执行MR的节点上,最大可以创建多少个动态分区。这是要根据实际情况来决定的,比如拿到的源数据中为一年的数据,这时就需要设置的值就要大于365,否则就会报错
set hive.exec.max.created.files = 100000;
**整个MR Job中,最多可以创建多少个HDFS文件。**在linux系统当中,每个linux用户最多可以开启1024个进程,每一个进程最多可以打开2048个文件,即持有2048个文件句柄,这里设置的值越大,可以打开的文件句柄越大
set hive.error.on.empty.partition = false;
当有空分区生成时,是否抛出异常。 一般不需要设置
当不使用动态分区插入数据时,需要手动指定分区值
INSERT OVERWRITE TABLE ori_partitioned_target
PARTITION (p_time='20130812')
SELECT id,time, uid, keyword, url_rank, click_num, click_url
FROM ori_partitioned;
使用动态分区插入数据时,不用手动指定分区,但是select最后面的字段,一定要是分区字段,分区字段的值不能是中文
INSERT OVERWRITE TABLE ori.partitioned_target
PARTITION (p_time)
SELECT id, time, uid, keyword ,url_rank, click_num, click_url, p_time
FROM ori_partitioned;
-
分桶表
分桶表可以将大文件按照规则分成多个小文件,也能达到优化hive的效果,详情请见【Hadoop离线基础总结】Hive的基本操作
-
概述
发生数据倾斜的原因在【Hadoop离线基础总结】MapReduce参数优化提到过,就是大量的数据都涌到同一个reduceTask里面去,造成一个reduceTask里面处理得数据量太大,迟迟不能完成。
通常情况下,Job会通过input的目录产生一个或者多个MapTask。生成MapTask数的主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小(128M)
是不是MapTask数越多越好?不是。如果一个任务有大量的小文件(远远小于128M),每一个小文件就会产生一个block块,也就会对应产生一个MapTask,这样一个MapTask的启动和初始化时间远远超过了逻辑处理的时间,就会造成资源浪费。而且MapTask同时可执行的数量也是有限的。
那保证每个文件都接近128M是不是就可以了?不一定。如果有一个文件是127M,它只会用一个MapTask,但这个文件中只有一个或者两个字段,却有上千上万条记录,同时如果Map处理的逻辑也比较复杂的话,用一个MapTask会很耗时。
所以要根据实际情况来 增加 或者 减少Map数
-
减少Map数(合并小文件)
set mapred.max.split.size=112345600;
每个切片最大的值
set mapred.min.split.size.per.node=112345600;
每个节点上切片最小的大小
set mapred.min.split.size.per.rack=112345600;
每个交换机上切片最小的大小
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
hive默认的hive.input.format
这些参数表示在执行任务前按照这些参数来合并小文件,前面三个参数确定合并文件块的大小,大于文件块大小128m的,按照128m来分隔,小于128m,大于100m的,按照100m来分隔,把那些小于100m的(包括小文件和分隔大文件剩下的),进行合并。(蓝字这些我觉得说的不对,弄明白了再来改)
-
增加Map数:
现在如果只有一个表a,大小为120M,但包含着几千万条记录,如果用一个MapTask去做肯定会很耗时,该怎么做?
首先,先用DISTRIBUTE BY把表a打散成10个小文件
set mapreduce.job.reduces =10;
CREATE TABLE a_1
SELECT * FROM a
DISTRIBUTE BY rand(123);
完成后用表a_1执行任务,就会同时有10个MapTask去完成任务
根据实际情况,控制map数量需要遵循两个原则:使大数据量利用合适的map数;使单个map任务处理合适的数据量;
-
那么,是不是reduce数越多越好?
不一定。过多的启动和初始化reduce也会消耗时间和资源。另外,相应的reduce数会产生相应的输出文件数,如果要利用这些输出文件作为下一个任务的输入文件的话,过多的输入文件也会产生问题。
控制reduce数同样也需要遵循那两个原则:处理大数据量利用合适的reduce数;使单个reduce任务处理数据量大小要合适;
-
调整Reduce
方法一:估算reduce个数
set hive.exec.reducers.bytes.per.reducer=256123456;
每个Reduce处理的数据量默认是256MB
set hive.exec.reducers.max=1009;
每个任务最大的reduce数,默认为1009
N=min(参数2,总输入数据量/参数1)
计算reduce数公式
方法二:
在hadoop的mapred-default.xml
文件中修改
方法三:
set mapreduce.job.reduces = 15;
设置reduce个数