Hive是基于Hadoop的一个数据仓库处理工具,是一种数据库技术,用于查询和管理存储在分布式环境下的大数据集,可以定义数据库和表来分析结构化数据,适合处理相对静态的海量的数据集。可以将结构化的数据映射为一张数据库表,提供简单的SQL的查询功能,将SQL语句转化为MapReduce任务提交到Hadoop集群运行,十分适合数据仓库的统计分析。
Hive并不提供实时的查询和基于行级的数据更新操作,Hive在加载数据的过程中不会对数据进行任何修改,只是将数据
移动到Hive设定(hive-site.xml)的HDFS目录中,将HDFS中的数据变成关系型数据库的逻辑结构,在Hive中创建的数据库其实是假的,所以需要一个映射关系数据(即元数据(metastore)),因此,Hive不支持对数据的改写和添加。Hive的最佳使用场合就是大数据集的批处理操作。
目前Hive将metastore数据存储在RDBMS数据库中,HIve有三种模式可以访问数据库中的metastore内容:
(1)单用户本地模式:使用简单基于内存的数据库derby
(2) 多用户本地模式:使用本地的关系数据库Mysql,提供多用户并发访问
(3) 远程服务器模式:使用单独的机器部署数据库服务器,配置远程访问权限
什么是数据倾斜:由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点
原因:
hadoop框架的特性:
A、不怕数据大,怕数据倾斜
B、Jobs 数比较多的作业运行效率相对比较低,如子查询比较多
C、 sum,count,max,min 等聚集函数,通常不会有数据倾斜问题
主要表现:
任务进度长时间维持在 99%或者 100%的附近,查看任务监控页面,发现只有少量 reduce 子任务未完成,因为其处理的数据量和其他的 reduce 差异过大。 单一 reduce 处理的记录数和平均记录数相差太大,通常达到好几倍之多,最长时间远大 于平均时长。
解决方法
hive设置hive.map.aggr=true和hive.groupby.skewindata=true
有数据倾斜的时候进行负载均衡,当选项设定为true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job在根据预处理的数据结果按照 Group By Key 分布到Reduce中(这个过程可以保证相同的 Group By Key 被分布到同一个Reduce中),最后完成最终的聚合操作。
SQL语句调整:
选用join key 分布最均匀的表作为驱动表。做好列裁剪和filter操作,以达到两表join的时候,数据量相对变小的效果。
大小表Join: 使用map join让小的维度表(1000条以下的记录条数)先进内存。在Map端完成Reduce。
大表Join大表:把空值的Key变成一个字符串加上一个随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终的结果。
count distinct大量相同特殊值:count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在做后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union.
MAPJOIN 当一个大表和一个或多个小表做JOIN时,最好使用MAPJOIN,性能比普通的JOIN要快很多。 另外,MAPJOIN 还能解决数据倾斜的问题。 MAPJOIN的基本原理是:在小数据量情况下,SQL会将用户指定的小表全部加载到执行JOIN操作的程序的内存中,从而加快JOIN的执行速度。
1、小、大表 join
在小表和大表进行join时,将小表放在前边,效率会高。hive会将小表进行缓存。
2、mapjoin
使用mapjoin将小表放入内存,在map端和大表逐一匹配。从而省去reduce。
样例:
select /*+MAPJOIN(b)*/ a.a1,a.a2,b.b2 from tablea a JOIN tableb b ON a.a1=b.b1
缓存多张小表:
select /*+MAPJOIN(b,c)*/ a.a1,a.a2,b.b2 from tablea a JOIN tableb b ON a.a1=b.b1 JOIN tbalec c on a.a1=c.c1
mapjoin的join发生在map阶段,join的join发生在reduce阶段,mapjoin可以提高效率
hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析。
SELECT a.key,a.value
FROM a
WHERE a.key not in (SELECT b.key FROM b)
答案:
select a.key,a.value from a where a.key not exists (select b.key from b)
order by:会对输入做全局排序,因此只有一个reducer(多个reducer无法保证全局有序)。只有一个reducer,会导致当输入规模较大时,需要较长的计算时间。
sort by:不是全局排序,其在数据进入reducer前完成排序。
distribute by:按照指定的字段对数据进行划分输出到不同的reduce中。
cluster by:除了具有 distribute by 的功能外还兼具 sort by 的功能。
方案1:
在每台电脑上求出TOP10,可以采用包含10个元素的堆完成(TOP10小,用最大堆,TOP10大,用最小堆)。
比如求TOP10大,我们首先取前10个元素调整成最小堆,如果发现,然后扫描后面的数据,并与堆顶元素比较,如果比堆顶元素大,那么用该元素替换堆顶,然后再调整为最小堆。
最后堆中的元素就是TOP10大。
方案2
求出每台电脑上的TOP10后,然后把这100台电脑上的TOP10组合起来,共1000个数据
再利用上面类似的方法求出TOP10就可以了。
1.从本地导入: load data local inpath ‘/home/1.txt’ (overwrite)into table student;
2.从Hdfs导入: load data inpath ‘/user/hive/warehouse/1.txt’ (overwrite)into table student;
3.查询导入: create table student1 as select * from student;(也可以具体查询某项数据)
4.查询结果导入:insert (overwrite)into table staff select * from track_log;
创建表时:创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径, 不对数据的位置做任何改变。
删除表时:在删除表的时候,内部表的元数据和数据会被一起删除, 而外部表只删除元数据,不删除数据。这样外部表相对来说更加安全些,数据组织也更加灵活,方便共享源数据。
分区
是指按照数据表的某列或某些列分为多个区,区从形式上可以理解为文件夹,比如我们要收集某个大型网站的日志数据,一个网站每天的日志数据存在同一张表上,由于每天会生成大量的日志,导致数据表的内容巨大,在查询时进行全表扫描耗费的资源非常多。
那其实这个情况下,我们可以按照日期对数据表进行分区,不同日期的数据存放在不同的分区,在查询时只要指定分区字段的值就可以直接从该分区查找
分桶
分桶是相对分区进行更细粒度的划分。
分桶将整个数据内容安装某列属性值得hash值进行区分,如要按照name属性分为3个桶,就是对name属性值的hash值对3取摸,按照取模结果对数据分桶。
如取模结果为0的数据记录存放到一个文件,取模为1的数据存放到一个文件,取模为2的数据存放到一个文件
通用设置
hive.optimize.cp=true:列裁剪
hive.optimize.prunner:分区裁剪
hive.limit.optimize.enable=true:优化LIMIT n语句
hive.limit.row.max.size=1000000:
hive.limit.optimize.limit.file=10:最大文件数
本地模式(小任务)
job的输入数据大小必须小于参数:hive.exec.mode.local.auto.inputbytes.max(默认128MB)
job的map数必须小于参数:hive.exec.mode.local.auto.tasks.max(默认4)
job的reduce数必须为0或者1
hive.exec.mode.local.auto.inputbytes.max=134217728
hive.exec.mode.local.auto.tasks.max=4
hive.exec.mode.local.auto=true
hive.mapred.local.mem:本地模式启动的JVM内存大小
并发执行
hive.exec.parallel=true ,默认为false
hive.exec.parallel.thread.number=8
Strict Mode:
hive.mapred.mode=true,严格模式不允许执行以下查询:
分区表上没有指定了分区
没有limit限制的order by语句
笛卡尔积:JOIN时没有ON语句
动态分区
hive.exec.dynamic.partition.mode=strict:该模式下必须指定一个静态分区
hive.exec.max.dynamic.partitions=1000
hive.exec.max.dynamic.partitions.pernode=100:在每一个mapper/reducer节点允许创建的最大分区数
DATANODE:dfs.datanode.max.xceivers=8192:允许DATANODE打开多少个文件
排序
ORDER BY colName ASC/DESC
hive.mapred.mode=strict时需要跟limit子句
hive.mapred.mode=nonstrict时使用单个reduce完成排序
SORT BY colName ASC/DESC :每个reduce内排序
DISTRIBUTE BY(子查询情况下使用 ):控制特定行应该到哪个reducer,并不保证reduce内数据的顺序
CLUSTER BY :当SORT BY 、DISTRIBUTE BY使用相同的列时。
合并小文件
hive.merg.mapfiles=true:合并map输出
hive.merge.mapredfiles=false:合并reduce输出
hive.merge.size.per.task=256*1000*1000:合并文件的大小
hive.mergejob.maponly=true:如果支持CombineHiveInputFormat则生成只有Map的任务执行merge
hive.merge.smallfiles.avgsize=16000000:文件的平均大小小于该值时,会启动一个MR任务执行merge。
自定义map/reduce数目
减少map数目:
set mapred.max.split.size
set mapred.min.split.size
set mapred.min.split.size.per.node
set mapred.min.split.size.per.rack
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
增加map数目:
当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。
假设有这样一个任务:
select data_desc, count(1), count(distinct id),sum(case when …),sum(case when ...),sum(…) from a group by data_desc
如果表a只有一个文件,大小为120M,但包含几千万的记录,如果用1个map去完成这个任务,肯定是比较耗时的,这种情况下,我们要考虑将这一个文件合理的拆分成多个,这样就可以用多个map任务去完成。
set mapred.reduce.tasks=10;
create table a_1 as select * from a distribute by rand(123);
这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1代替上面sql中的a表,则会用10个map任务去完成。每个map任务处理大于12M(几百万记录)的数据,效率肯定会好很多。
reduce数目设置:
参数1:hive.exec.reducers.bytes.per.reducer=1G:每个reduce任务处理的数据量
参数2:hive.exec.reducers.max=999(0.95*TaskTracker数):每个任务最大的reduce数目
reducer数=min(参数2,总输入数据量/参数1)
set mapred.reduce.tasks:每个任务默认的reduce数目。典型为0.99*reduce槽数,hive将其设置为-1,自动确定reduce数目。
使用索引:
hive.optimize.index.filter:自动使用索引
hive.optimize.index.groupby:使用聚合索引优化GROUP BY操作
CREATE DATABASE yexin; 或者 CREATE SCHEMA yexin; --------------------创建数据库
DROP DATABASE IF EXISTS yexin;或者 DROP SCHEMA yexin--------------------------删除数据库
--创建数据库
create database if not exists sopdm
comment ‘this is test database’
with dbproperties(‘creator’=’gxw’,’date’=’2014-11-12’) --数据库键值对属性信息
location ‘/my/preferred/directory’;
--查看数据库的描述信息和文件目录位置路径信息
describe database sopdm;
--查看数据库的描述信息和文件目录位置路径信息(加上数据库键值对的属性信息)
describe database extended sopdm
--删除数据库
drop database if exists sopdm;
--级联删除数据库(当数据库还有表时,级联删除表后在删除数据库),默认是restrict
drop database if exists sopdm cascade;
--修改数据库
--只能修改数据库的键值对属性值。数据库名和数据库所在的目录位置不能修改
alter database sopdm set dmproperties(‘edited-by’=’gaoxianwei’);
--创建表
--其中tblproperties作用:按照键值对的格式为表增加额外的文档说明,也可用来表示数据库连接的必要的元数据信息
--hive会自动增加二个表属性:last_modified_by(最后修改表的用户名),last_modified_time(最后一次修改的时间)
create table if not exists sopdm.test1(name string comment ‘姓名’,salary float comment ‘薪水’)
comment ‘这是一个测试的表’
tblproperties(‘creator’=’me’,’created_at’=’2014-11-13 09:50:33’)
location ‘/user/hive/warehouse/sopdm.db/test1’
--查看和列举表的tblproperties属性信息
show tblproperties table_name;
--使用like在创建表的时候,拷贝表模式(而无需拷贝数据)
create table if not exists sopdm.test2 like sopdm.test1;
--查看表的详细结构信息(也可以显示表是管理表,还是外部表。还有分区信息)
describe extended sopdm.test1;
--使用formatted信息更多些,可读性更强
describe formatted sopdm.test1;
--创建外部表
--删除表时,表的元数据会被删除掉,但是数据不会被删除
--如果数据被多个工具(如pig等)共享,可以创建外部表
create external table if not exists sopdm.test1(
name string comment ‘姓名’,
salary float comment ‘薪水’)
comment ‘这是一个测试的表’
tblproperties(‘creator’=’me’,’created_at’=’2014-11-13 09:50:33’)
location ‘/user/hive/warehouse/sopdm.db/test1’
--分区表
create table if not exists sopdm.test1(
name string comment ‘姓名’,
salary float comment ‘薪水’)
comment ‘这是一个测试的表’
partitioned by(country string,state string)
STORED AS rcfile
tblproperties(‘creator’=’me’,’created_at’=’2014-11-13 09:50:33’)
location ‘/user/hive/warehouse/sopdm.db/test1’
--查看表中存在的所有分区
show partitions table_name;
--查看表中特定分区
show partitions table_name partition(country=’US’);
--可以在表载入数据的时候创建分区
load data local inpath ‘${env:HOME/employees}’
into table employees
partition(country=’US’,state=’CA’);
--删除表
drop table if exists table_name;
--修改表-表重命名
alter table old_table_name rename to new_table_name;
--增加分区
alter table table_name add if not exists partition(year=2011,month=1,day=1)
location ‘/logs/2011/01/01’;
--修改分区存储路径
alter table table_name partition(year=2011,month=1,day=2)
set location ‘/logs/2011/01/02’;
--删除某个分区
alter table table_name drop if exists partition(year=2011,month=1,day=2);
--修改列信息
alter table table_name
change column old_name new_name int
comment ‘this is comment’
after severity; --字段移到severity字段之后(移动到第一个位置,使用first关键字)
--增加列
alter table table_name add columns(app_name string comment ‘application name’);
--删除或者替换列
alter table table_name replace columns(hms int comment ‘hhh’);
--修改表属性
alter table table_name set tblproperties(‘notes’=’this is a notes’);
--修改存储属性
alter table table_name partition(year=2011,month=1,day=1) set fileformat sequencefile;
--指定新的SerDe,并指定SerDe属性
alter table table_name
set serde “com.example.JSONSerDe”
with serdeproperties(‘prop1’=‘value1’, ‘prop2’=‘value2’);
--增加执行“钩子”——当表中存储的文在hive之外被修改了,就会触发钩子的执行
alter table table_name touch partition(year=2012,month=1,day=1);
--将分区内的文件打成hadoop压缩包文件,只会降低文件系统中的文件数,减轻NameNode的压力,而不会减少任何的存储空间
--使用unarchive替换archive起到反向操作
alter table table_name archive partition(year=2012,month=1,day=1);
--防止分区被删除和被查询(使用enable替代disable可以起到反向的操作目的)
alter table table_name partition(year=2012,month=1,day=1) disable no_drop;
alter table table_name partition(year=2012,month=1,day=1) disable offline;
--向管理表中装载数据
-- inpath为一个目录,而且这个路径下不可以包含任何文件夹
load data local inpath ‘${env:HOME}/table_name’
overwrite into table table_name
partition(country=’US’);
--通过查询语句向表中插入数据
--overwrite是覆盖,into是追加
insert overwrite table table_name
partition(country=’US’)
select * from table_name2 tn where tn.cnty=’US’
--高效方式-查询语句插入多个分区
from table_name2 tn
insert overwrite table table_name
partition(country=’US’,state=’OR’)
select * where tn.cnty=’US’ and tn.st=’OR’
insert overwrite table table_name
partition(country=’US’,state=’CA’)
select * where tn.cnty=’US’ and tn.st=’CA’
--动态插入分区
--hive根据select语句最后2列确定分区字段country和state的值(根据位置)
insert overwrite table table_name
partition(country,state)
select …,se.cnty,se.st
from employees se;
--动态和静态分区结合
--country为静态分区,state为动态分区(静态分区必须在动态分区之前)
insert overwrite table table_name
partition(country=‘US’,state)
select …,se.cnty,se.st
from employees se
where se.cnty=’US’;
--单个查询语句中创建表并加载数据
create table table_name1
as select name,salary,address from table_name2 where state=’CA’;
--导出数据——拷贝文件
--如果数据文件恰好是用户需要的格式,那么只需要简单的拷贝文件或文件夹就可以。
hadoop fs –cp source_path target_path
--导出数据
insert overwrite local directory ‘/tmp/employees’
select name,salary,address from employees se where se.state=’CA’
--导出数据到多个输出文件夹
from employees se
insert overwrite local directory ‘/tmp/or_employees’
select * se where se.cty=’US’ and se.st=’OR’
insert overwrite local directory ‘/tmp/ca_employees’
select * se where se.cty=’US’ and se.st=’CA’