Hive SQL是一种声明试语言,用户会提交声明式的查询,而Hive会将其转换成MapReduce job,大多数情况下,用户不需要了解Hive内部的实现原理的,这样就可以专注业务的事情,不再关注底层实现了。
不过,当用户对于Hive具有越来越多的经验后,了解一下Hive背后的理论知识和底层的一些实现细节,会让用户更加高效地使用Hive。
学习Hive是如何工作的,第一步就是学习explain功能,其可以帮助我们学习Hive是如何将查询转化成MapReduce任务的。
举个栗子:
hive> show tables;
OK
stu
t_wc
test_stu_3
toss1
toss2
Time taken: 1.307 seconds, Fetched: 5 row(s)
hive> desc stu;
OK
name string
age int
Time taken: 0.246 seconds, Fetched: 2 row(s)
hive> select sum(age) as age_sum from stu;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
Query ID = zhangchenguang_20200414105331_85552bbd-761a-4688-a77e-9f100ccfc943
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):set hive.exec.reducers.bytes.per.reducer=
In order to limit the maximum number of reducers:set hive.exec.reducers.max=
In order to set a constant number of reducers:set mapreduce.job.reduces=
Starting Job = job_1586832731722_0001, Tracking URL = http://zcg.local:8088/proxy/application_1586832731722_0001/
Kill Command = /Users/zhangchenguang/software/hadoop-2.7.3/bin/hadoop job -kill job_1586832731722_0001
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2020-04-14 10:53:47,036 Stage-1 map = 0%, reduce = 0%
2020-04-14 10:53:54,497 Stage-1 map = 100%, reduce = 0%
2020-04-14 10:54:00,812 Stage-1 map = 100%, reduce = 100%
Ended Job = job_1586832731722_0001
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 HDFS Read: 7753 HDFS Write: 102 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
31
Time taken: 29.927 seconds, Fetched: 1 row(s)
使用explain以后是怎样的呢?
hive> explain select sum(age) as age_sum from stu;
OK
STAGE DEPENDENCIES:Stage-1 is a root stageStage-0 depends on stages: Stage-1STAGE PLANS:Stage: Stage-1Map ReduceMap Operator Tree:TableScanalias: stuStatistics: Num rows: 9 Data size: 38 Basic stats: COMPLETE Column stats: NONESelect Operatorexpressions: age (type: int)outputColumnNames: ageStatistics: Num rows: 9 Data size: 38 Basic stats: COMPLETE Column stats: NONEGroup By Operatoraggregations: sum(age)mode: hashoutputColumnNames: _col0Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONEReduce Output Operatorsort order:Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONEvalue expressions: _col0 (type: bigint)Reduce Operator Tree:Group By Operatoraggregations: sum(VALUE._col0)mode: mergepartialoutputColumnNames: _col0Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONEFile Output Operatorcompressed: falseStatistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONEtable:input format: org.apache.hadoop.mapred.SequenceFileInputFormatoutput format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormatserde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDeStage: Stage-0Fetch Operatorlimit: -1Processor Tree:ListSinkTime taken: 0.173 seconds, Fetched: 44 row(s)
一个Hive任务会包含有一个或多个stage(阶段),不同的stage间会存在着依赖关系。正如用户可以预料到的,越复杂的查询通常将会引入越多的stage,而通常stage越多就需要越多的时间来完成任务。一个stage可以是一个MapReduce任务,也可以是一个抽样阶段,或者一个合并阶段,还可以是一个limit阶段,以及Hive需要的其他某个任务的一个阶段。默认情况下,Hive会一次只执行一个stage(阶段),不过在后面我们会进行探索如何并行执行。
STAGE PLAN部分比较冗长也比较复杂。Stage1包含了这个job的大部分处理过程,而且会触发一个MapReducejob。TableScan以这个表作为输入,然后会产生一个只有字段number的输出。GroupByOperator会应用到sum(number),然后会产生一个输出字段_col0(这是为临时结果字段按规则起的临时字段名)。这些都是发生在job的map处理阶段过程,也就是都是位于MapOperatorTree下面的:
STAGE PLANS:Stage: Stage-1Map ReduceMap Operator Tree:TableScanalias: stuStatistics: Num rows: 9 Data size: 38 Basic stats: COMPLETE Column stats: NONESelect Operatorexpressions: age (type: int)outputColumnNames: ageStatistics: Num rows: 9 Data size: 38 Basic stats: COMPLETE Column stats: NONEGroup By Operatoraggregations: sum(age)mode: hashoutputColumnNames: _col0Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONEReduce Output Operatorsort order:Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONEvalue expressions: _col0 (type: bigint)
在reduce过程这边,也就是ReduceOperatorTree下面,我们可以看到相同的GroupbyOperator,但是这次其应用到的是对_col0字段进行sum操作。最后,在reducer中我们看到了FileOutputOperator,其说明了输出结果将是文本格式,是基于字符串的输出格式:HiveSequenceFileOutputFormat:
Reduce Operator Tree:Group By Operatoraggregations: sum(VALUE._col0)mode: mergepartialoutputColumnNames: _col0Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONEFile Output Operatorcompressed: falseStatistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONEtable:input format: org.apache.hadoop.mapred.SequenceFileInputFormatoutput format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormatserde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
因为这个job没有LIMIT语句,因此stage-0阶段是一个没有任何操作的阶段
Stage: Stage-0Fetch Operatorlimit: -1Processor Tree:ListSink
理解Hive是如何对每个查询进行解析和计划的复杂的细节并非总是有用的。不过这是复杂的或执行效率低的查询的一个不错的方式,特别是当我们需要尝试各种各样的调优方式时。可以在“逻辑层”查看到这些调整会产生什么样的影响,这样可以关联到性能的度量。
执行下面sql查询对应的执行计划:
explain extended select sum(age) as age_sum from stu;
-- 是否添加extend关键字可以对比一下打印输出的结果
explain (extended) select sum(age) as age_sum from stu;
Reduce Operator Tree:Group By Operatoraggregations: sum(VALUE._col0)mode: mergepartialoutputColumnNames: _col0Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONEFile Output Operatorcompressed: falseGlobalTableId: 0directory: hdfs://localhost:9000/tmp/hive/zhangchenguang/bdd7f4ad-9e2c-4ae3-9866-4a2284482102/hive_2020-05-06_10-43-59_797_5203373428857144675-1/-mr-10001/.hive-staging_hive_2020-05-06_10-43-59_797_5203373428857144675-1/-ext-10002NumFilesPerFileSink: 1Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONEStats Publishing Key Prefix: hdfs://localhost:9000/tmp/hive/zhangchenguang/bdd7f4ad-9e2c-4ae3-9866-4a2284482102/hive_2020-05-06_10-43-59_797_5203373428857144675-1/-mr-10001/.hive-staging_hive_2020-05-06_10-43-59_797_5203373428857144675-1/-ext-10002/table:input format: org.apache.hadoop.mapred.SequenceFileInputFormatoutput format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormatproperties:columns _col0columns.types bigintescape.delim \hive.serialization.extend.additional.nesting.levels trueserialization.escape.crlf trueserialization.format 1serialization.lib org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDeserde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDeTotalFiles: 1GatherStats: falseMultiFileSpray: falseStage: Stage-0Fetch Operatorlimit: -1Processor Tree:ListSink
Limit语句是经常使用到的,经常使用CLI的用户都会使用到。不过,在很多情况下Limit语句还是需要执行整个查询语句,然后再返回部分结果的。因为这种情况通常是浪费de,所以应该尽可能避免出现这种情况。Hive有一个属性配置可以开启,当使用Limit语句时,可以对源数据进行抽样:
一旦属性hive.limit.optimize.enable 的值设置为true,那么还会有两个参数可以控制这个操作,也就是hive.limit.row.max.size 和 hive.limit.optimize.limit.file
<property><name>hive.limit.row.max.sizename><value>100000value><description>When trying a smaller subset of data for simple LIMIT, how much size we need to guarantee each row to have at least.description>property><property><name>hive.limit.optimize.limit.filename><value>10value><description>When trying a smaller subset of data for simple LIMIT, maximum number of files we can sample.description>property>
这个功能的一个缺点就是有可能输入中的有用的数据永远不会被处理到&#xff0c;例如任意的一个需要reduce步骤的查询&#xff0c;join和group by操作&#xff0c;以及聚合函数的大多数调用等等操作&#xff0c;将会产生很不同的结果。也许这个差异在很多情况下是可以接受的&#xff0c;但是重要的是要理解。
如果遇到小表join大表的情况&#xff0c;可以完全的将小表载入到内存中&#xff0c;这时Hive可以执行一个map-side JOIN &#xff0c;这样可以减少reduce过程&#xff0c;有时甚至可以减少某些map task任务。有时候如果某些表不适合载入内存&#xff0c;也可以使用mapJOIN&#xff0c;因为reduce阶段可能比将不太大的表分发到每个map task 中会带来更多的好处。
大多数的HadoopJob是需要Hadoop提供的完整的可扩展性来处理大数据集的。不过&#xff0c;有时Hive的输入数据量是非常小的。在这种情况下&#xff0c;为查询触发执行任务的时间消耗可能会比实际job的执行时间要多得多。对于大多数这种情况&#xff0c;Hive可以通过本地模式在单台机器上&#xff08;或某些时候在单个进程中&#xff09;处理所有的任务。对于小数据集&#xff0c;执行时间可以明显被缩短。
用户可以按照如下这个例子中所演示的方式&#xff0c;在执行过程中临时启用本地模式:
set oldjobtracker&#61;${hiveconf:mapred.job.tracker};
set mapred.job.tracker&#61;local;
set mapred.tmp.dir&#61;/home/edward/tmp;select * from people where firstname&#61;bob;
set mapred.job.tracker&#61;${oldjobtracker};
用户可以通过设置hive.exec.mode.local.auto的值为true&#xff0c;来让hive在适当的时候自动启动这个优化。通常用户可以将这个配置写到$HOME/.hiverc文件中。(如果希望所有的用户都使用这个配置&#xff0c;可以将配置添加到 HOME/conf/hive-site.xml
文件内)
<property><name>hive.exec.mode.local.autoname><value>falsevalue><description>Let Hive determine whether to run in local mode automaticallydescription>property>
Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段&#xff0c;或者Hive执行过程中可能需要的其他阶段。默认情况下&#xff0c;Hive一次只会执行一个阶段。不过&#xff0c;某个特定的job可能包含众多的阶段&#xff0c;而这些阶段可能并非完全互相依赖的&#xff0c;也就是说有些阶段是可以并行执行的&#xff0c;这样可能使得整个job的执行时间缩短。不过&#xff0c;如果有更多的阶段可以并行执行&#xff0c;那么job可能就越快完成。
通过设置参数hive.exec.parallel值为true&#xff0c;就可以开启并发执行。不过&#xff0c;在共享集群中&#xff0c;需要注意下&#xff0c;如果job中并行执行的阶段增多&#xff0c;那么集群利用率就会增加&#xff1a;
<property><name>hive.exec.parallelname><value>falsevalue><description>Whether to execute jobs in paralleldescription>property>
Hive提供了一个严格模式&#xff0c;可以防止用户执行那些可能产生意想不到的不好的影响的查询。
通过设置属性hive.mapred.mode值为strict可以禁止3种类型的查询。
其一&#xff0c;对于分区表&#xff0c;除非WHEHRE语句中含有分区字段过滤条件来限制数据范围&#xff0c;否则不允许执行。换句话说&#xff0c;就是用户不允许扫描所有分区。进行这个限制的原因是&#xff0c;通常分区表都拥有非常大的数据集&#xff0c;而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表&#xff1a;
hive> SELECT DISTINCT(planner_id) FROM fracture_ins WHERE planner_id &#61; 5 ;
FAILED:Error in semantic analysis: No Partition Predicate Found for Alias "fracture_ins" Table "fracture_ins"
如下这个语句在WHERE语句中增加了一个分区过滤条件&#xff08;也就是限制了表分区&#xff09;&#xff1a;
hive> SELECT DISTINCT(planner_id) FROM fracture_ins > WHERE planner_id&#61;5 AND hit_date&#61;20120101;
...normal results...
其二&#xff0c;对于使用了ORDER BY语句的查询&#xff0c;要求必须使用LIMIT语句。因为ORDER BY为了执行排序过程会将所有的结果数据分发到同一个reducer中进行处理&#xff0c;强制要求用户增加这个LIMIT语句可以防止reducer额外执行很长一段时间&#xff1a;
hive> SELECT * FROM fracture_ins WHERE hit_date > 2012 ORDER BY planner_id;
FAILED:Error in semantic analysis:line 1:56 In strict mode,limit must be specified if ORDER BY is present planner_id
只需要增加LIMIT语句就可以解决这个问题&#xff1a;
hive> SELECT * FROMfracture_ins WHERE hit_date>2012 ORDER BY planner_id > LIMIT 100000;
...normal results...
其三&#xff0c;也就是最后一种情况&#xff0c;就是限制笛卡尔积的查询。对关系型数据库非常了解的用户可能期望在执行JOIN查询的时候不使用ON语句而是使用WHERE语句&#xff0c;这样关系型数据库的执行优化器就可以高效地将WHERE语句转化成那个ON语句。不幸的是&#xff0c;Hive并不会执行这种优化&#xff0c;因此&#xff0c;如果表足够大&#xff0c;那么这个查询就会出现不可控的情况&#xff1a;
hive> SELECT * FROM fracture_act JOIN fracture_ads> WHERE fracture_act.planner_id &#61; fracture_ads.planner_id;
FAILED: Error in semantic analysis:Instrict mode, cartesian product is not allowed.
If you really want to perform the operation,
&#43; set hive.mapred.mode&#61;nonstrict &#43;
下面这个才是个正确的使用JOIN和ON语句的查询&#xff1a;
hive> SELECT * FROM fracture_act JOIN fracture_ads > ON (fracture_act.planner_id &#61; fracture_ads.planner_id);
...normal results...
Hive通过将查询划分成一个或者多个MapReduce任务达到并行的目的。每个任务都可能具有多个mapper和reducer任务&#xff0c;其中至少有一些是可以并行执行的。确定最佳的mapper个数和reducer个数取决于多个变量&#xff0c;例如输入的数据量大小以及对这些数据执行的操作类型等。
保持平衡性是有必要的。如果有太多的mapper或reducer任务&#xff0c;就会导致启动阶段、调度和运行job过程中产生过多的开销&#xff1b;而如果设置的数量太少&#xff0c;那么就可能没有充分利用好集群内在的并行性。
当执行的Hive查询具有reduce过程时&#xff0c;CLI控制台会打印出调优后的reducer个数。下面我们来看一个包含有GROUP BY语句的例子&#xff0c;因为这种查询总是需要reduce过程的。与此相反&#xff0c;很多其他查询会转换成只需要map阶段的任务&#xff1a;
> select age,count(1) from stu group by age;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
Query ID &#61; zhangchenguang_20200506152318_e0a49e0f-eaa9-42de-b4bb-e6dd86cec5a0
Total jobs &#61; 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):set hive.exec.reducers.bytes.per.reducer&#61;
In order to limit the maximum number of reducers:set hive.exec.reducers.max&#61;
In order to set a constant number of reducers:set mapreduce.job.reduces&#61;
Starting Job &#61; job_1588731287501_0003, Tracking URL &#61; http://zcg.local:8088/proxy/application_1588731287501_0003/
Kill Command &#61; /Users/zhangchenguang/software/hadoop-2.7.3/bin/hadoop job -kill job_1588731287501_0003
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2020-05-06 15:23:34,316 Stage-1 map &#61; 0%, reduce &#61; 0%
2020-05-06 15:23:41,960 Stage-1 map &#61; 100%, reduce &#61; 0%
2020-05-06 15:23:48,294 Stage-1 map &#61; 100%, reduce &#61; 100%
Ended Job &#61; job_1588731287501_0003
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 HDFS Read: 8121 HDFS Write: 153 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
1 1
8 1
10 1
12 1
属性hive.exec.reducers.bytes.per.reducer的默认值是1GB。可以通过调整它的值&#xff0c;来改变reducer的个数
set hive.exec.reducers.bytes.per.reducer&#61;75000000
默认值通常情况下是比较合适的。不过&#xff0c;有些情况下查询的map阶段会产生比实际输入数据量要多得多的数据。如果map阶段产生的数据量非常多&#xff0c;那么根据输入的数据量大小来确定的reducer个数就显得有些少了。同样地&#xff0c;map阶段也可能会过滤掉输入数据集中的很大一部分的数据而这时可能需要少量的reducer就满足计算了。
一个快速的进行验证的方式就是将reducer个数设置为固定的值&#xff0c;而无需Hive来计算得到这个值。如果用户还记得的话&#xff0c;Hive的默认reducer个数应该是3。可以通过设置属性mapred.reduce.tasks的值为不同的值来确定是使用较多还是较少的reducer来缩短执行时间。需要记住&#xff0c;受外部因素影响&#xff0c;像这样的标杆值十分复杂&#xff0c;例如其他用户并发执行job的情况。Hadoop需要消耗好几秒时间来启动和调度map和reduce任务&#xff08;task&#xff09;。在进行性能测试的时候&#xff0c;要考虑到这些影响因子&#xff0c;特别是job比较小的时候。
当在共享集群上处理大任务时&#xff0c;为了控制资源利用情况&#xff0c;属性hive.exec.reducers.max显得非常重要。一个Hadoop集群可以提供的map和reduce资源个数&#xff08;也称为“插槽”&#xff09;&#xff0c;是固定的。某个大job可能就会消耗完所有的插槽&#xff0c;从而导致其他job无法执行。通过设置属性hive.exec.reducers.max可以阻止某个查询消耗太多的reducer资源。有必要将这个属性配置到$HIVE_HOME/conf/hivesite.xml文件中。对这个属性值大小的一个建议的计算公式如下&#xff1a;
&#xff08;集群总Reduce槽位个数*1.5&#xff09;/(执行中的查询的平均个数)
1.5倍数是一个经验系数&#xff0c;用于防止未充分利用集群的情况。
JVM重用是Hadoop调优参数的内容&#xff0c;其对Hive的性能具有非常大的影响&#xff0c;特别是对于很难避免小文件的场景或task特别多的场景&#xff0c;这类场景大多数执行时间都很短。Hadoop的默认配置通常是使用派生JVM来执行map和reduce任务的。这时JVM的启动过程可能会造成相当大的开销&#xff0c;尤其是执行的job包含有成百上千个task任务的情况。JVM重用可以使得JVM实例在同一个job中重新使用N次。N的值可以在Hadoop的mapredsite.xml文件&#xff08;位于$HADOOP_HOME/conf目录下&#xff09;中进行设置&#xff1a;
<property><name>mapred.job.reuse.jvm.num.tasksname> <value>10value> <description>How many tasks to run per jvm. If set to 1,there is no limit. description>
property>
这个功能的一个缺点是&#xff0c;开启JVM重用将会一直占用使用到的task插槽&#xff0c;以便进行重用&#xff0c;直到任务完成后才能释放。如果某个“不平衡的”的job中有某几个reduce task执行的时间要比其他reduce task消耗的时间多得多的话&#xff0c;那么保留的插槽就会一直空闲着却无法被其他的job使用&#xff0c;直到所有的task都结束了才会释放。
索引可以用来加快含有GROUPBY语句的查询的计算速度。Hive从v0.8.0版本后增加了一个bitmap索引实现。Bitmap索引一般在指定的列排重后的值比较小时进行使用。
另一个特别的优化试图将查询中的多个GROUPBY操作组装到单个MapReduce任务中。如果想启动这个优化&#xff0c;那么需要一组常用的GROUP BY键&#xff1a;
<property> <name>hive.exec.rowoffsetname> <value>truevalue> <description> Whether to provide the row offset virtual column description>
property>
参考Hive编程指南