热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

hive的实践部分

一.hive的事务(1)什么是事务要知道hive的事务,首先要知道什么是transaction(事务)?事务就是一组单元化操作,这些操作要么都执行,要么都不执行,是一个不可分割的工
 

一.hive的事务

(1)什么是事务

要知道hive的事务,首先要知道什么是transaction(事务)?事务就是一组单元化操作,这些操作要么都执行,要么都不执行,是一个不可分割的工作单位。

事务有四大特性:A、C、I、D (原子性、一致性、隔离性、持久性)

 Atomicity: 不可再分割的工作单位,事务中的所有操作要么都发,要么都不发。

Consistency: 事务开始之前和事务结束以后,数据库的完整性约束没有被破坏。这是说数据库事务不能破坏关系数据的完整性以及业务逻辑上的 一致性。 

Isolation: 多个事务并发访问,事务之间是隔离的

Durability: 意味着在事务完成以后,该事务锁对数据库所作的更改便持久的保存在数据库之中,并不会被回滚。 

(2)hive事务的特点与局限性

从hive的0.14版本开始支持低等级的事务

支持事务的增删改查,从hive的2.2版本开始,开始支持merge

不支持事务的begin、commit以及rollback(事务的回滚)

不支持使用update更新分桶列和分区列

想使用事务的话,文件格式必须是ORC

需要压缩工作,需要时间,资源和空间

支持S(共享锁)和X(排它锁)

不允许从一个非ACID连接写入/读取ACID表

(3)hive的事务开启

hive的事务开启有三种方式:a.通过Ambari UI-Hive Config 

               b.通过hive-xml 的配置文件添加如下内容

  
hive.support.concurrency  
true  
  
  
hive.txn.manager 
org.apache.hadoop.hive.ql.lockmgr.DbTxnManager 

  

                                     c.通过命令行,在beeline这种交互式环境下:

set hive.support.cOncurrency= true; 
set hive.enforce.bucketing = true; 
set hive.exec.dynamic.partition.mode = nonstrict; 
set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager; 
set hive.compactor.initiator.on = true; 
set hive.compactor.worker.threads = 1;

(4)hive的merge

merge的语法:

MERGE INTO AS T USING AS S ON

WHEN MATCHED [AND ] THEN UPDATE SET

WHEN MATCHED [AND ] THEN DELETE

WHEN NOT MATCHED [AND ] THEN INSERT VALUES

merge的局限性:

最多三条when语句,只支持update/delete/insert。when not matched 必须在when语句的最后面。

如果出现update和delete的时候 ,两个条件是分开的,而且必须在条件前面加上AND.像 [AND ]

(5)例子

a.创建两个事务表

CREATE TABLE IF NOT EXISTS employee ( 
emp_id int,  
emp_name string,  
dept_name string, 
work_loc string )  
PARTITIONED BY (start_date string) 
CLUSTERED BY (emp_id) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES('transactional'='true'); 


create table employee_state(
emp_id int,
emp_name string,
dept_name string,
work_loc string,
start_date string,
state string
)
STORED AS ORC;

b.开启事务(见上面的开启事务的c,一般有些默认的设置是开的,我这里就只开了自动分区和分桶)

c.插入数据

 

INSERT INTO table employee PARTITION (start_date) VALUES (1,'Will','IT','Toronto','20100701'), 
(2,'Wyne','IT','Toronto','20100701'), 
(3,'Judy','HR','Beijing','20100701'), 
(4,'Lili','HR','Beijing','20101201'), 
(5,'Mike','Sales','Beijing','20101201'), (6,'Bang','Sales','Toronto','20101201'), (7,'Wendy','Finance','Beijing','20101201');

insert into table employee_state values
(2,’Wyne’,’IT’,’Beijing’,’20100701’,’update’),
(4,’Lili’,’HR’,’Beijing’,’20101201’,’quit’),
(8,’James’,’IT’,’Toronto’,’20170101’,’new’)

d.检验数据是否被插入

 

e.这里通过merge操作,完成更新、删除、插入操作。

employe字段解释:id为2的员工之前的工作地在Toronto,现在在Beijing,state的状态为update。所以需要更新表employee中员工2的信息

id为4的员工的state状态为quit,说明目前员工已经离职,所以需要在employee表中删除关于id为4的员工的信息。

id为8的员工的state状态为new,说明是新员工,所以需要插入empoyee中。

MERGE INTO employee AS T
USING employee_state AS S
ON T.emp_id = S.emp_id and T.start_date = S.start_date
WHEN MATCHED AND S.state = 'update' THEN UPDATE SET dept_name = S.dept_name,work_loc = S.work_loc
WHEN MATCHED AND S.state = 'quit' THEN DELETE
WHEN NOT MATCHED THEN INSERT VALUES(S.emp_id,S.emp_name,S.dept_name,S.work_loc,S.start_date);
--这里目标表为employee,源表为employee_state
--这里新员工是属于第三中情况,未在目标表中匹配到,所以直接插入到目标表中。

  

二.hive的udf

(1)什么是hive的udf?

User-defined function (UDF): 这提供了一种使用外部函数(在Java中)扩展功能的方法,可以在HQL中进行评估

(2)hive的udf分类

hive的udf一般分为三种:

  a.UDF:用户定义的简单函数,按行操作并为一行输出一个结果,例如大多数内置数学和字符串函数

  b.UDAF: 用户定义的聚合函数,按行或按组操作,并为每个组输出一行或一行,例如MAX和COUNT内置函数。

  c.UDTF:用户定义的表生成函数也按行运行,但结果会生成多行/表,例如EXPLODE函数。 UDTF可以在SELECT之后或在LATERAL VIEW语句之后使用。

 (3)hive的udf使用举例

  a.对于hive的udf,这里我写了一个把字符串的大写全部换成小写和一个判断字符串是否在一个array数组里面的函数

--将字符串的所有大写改成小写
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
 
public final class StringLower extends UDF {
  public Text evaluate(final Text s) {
    if (s == null) { return null; }
    return new Text(s.toString().toLowerCase());
  }
}
--判断当前字符串是否在数组里面
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.BooleanWritable;

@Description(name = "Arraycontains",
        value="_FUNC_(array, value) - Returns TRUE if the array contains value.",
        extended="Example:\n"
                + "  > SELECT _FUNC_(array(1, 2, 3), 2) FROM src LIMIT 1;\n"
                + "  true")


public class ArrayContains extends GenericUDF {
    private static final int ARRAY_IDX = 0;
    private static final int VALUE_IDX = 1;
    private static final int ARG_COUNT = 2;//这个udf函数需要参数的个数
    private static final String FUNC_NAME = "ARRAYCONTAINS";//外部名字

    private transient  ObjectInspector valueOI;
    private transient ListObjectInspector arrayOI;
    private transient ObjectInspector arrayElementOI;
    private BooleanWritable result;
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        //检查是否传入了两个参数
        if (arguments.length != ARG_COUNT) {
            throw new UDFArgumentException("the function" + FUNC_NAME + "accepts"
                    + ARG_COUNT + "arguments");
        }
        //检查参数是否是属于LIST类型
        if (!arguments[ARRAY_IDX].getCategory().equals(ObjectInspector.Category.LIST)) {
            throw new UDFArgumentTypeException(ARRAY_IDX, "\"" + serdeConstants.LIST_TYPE_NAME + "\""
                    + "expected at function ARRAY_CONTAINS,but"
                    + "\"" + arguments[ARRAY_IDX].getTypeName() + "\""
                    + "is found");

        }

        arrayOI = (ListObjectInspector) arguments[ARRAY_IDX];
        arrayElementOI = arrayOI.getListElementObjectInspector();

        valueOI = arguments[VALUE_IDX];

        //检查list的元素和传入的值是否属于同一个类型
        if (!ObjectInspectorUtils.compareTypes(arrayElementOI, valueOI)) {
            throw new UDFArgumentTypeException(VALUE_IDX, "\"" + arrayElementOI.getTypeName() + "\""
                    + "expectd at function ARRAY_CONTAINS,but"
                    + "\"" + valueOI.getTypeName() + "\""
                    + "is found");
        }

        //检查此类型是否支持比较
        if (!ObjectInspectorUtils.compareSupported(valueOI)){
            throw new UDFArgumentException("this function" + FUNC_NAME
                    +"does not support comparison for"
                    +"\"" + valueOI.getTypeName() + "\""
                    + "types");

        }
        result = new BooleanWritable(false);
        return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector;
    }
    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
       result.set(false);
       Object array = arguments[ARRAY_IDX].get();
       Object value = arguments[VALUE_IDX].get();

       int arrayLength = arrayOI.getListLength(array);

       //检查数组是否null还是空value是否为null
        if (value == null || arrayLength <= 0)//判断value是否为空,若真则不判断右边,不然为假继续判断右边
        {
            return result;//满足条件直接返回result初始状态值
        }

        //将值与数组的每个元素进行比较,直到找到匹配项
        for (int i=0;i 

  b.然后通过编译器打包到hdfs文件系统上,通过执行hive命令构造函数

DROP FUNCTION IF EXISTS str_lower; 
DROP FUNCTION IF EXISTS Array_contains; 
CREATE FUNCTION str_lower AS 'com.data.hiveudf.udf.StringLower'  
USING JAR 'hdfs:////apps/hive/functions/df-hiveudf-1.0-SNAPSHOT.jar'; 
CREATE FUNCTION Array_contains AS 'com.data.hiveudf.gudf.ArrayContains'  
USING JAR 'hdfs:////apps/hive/functions/df-hiveudf-1.0-SNAPSHOT.jar';

  c.使用自定义函数

这里使用了另一库里的一张employee表,里面使用了string类型、array类型...。表描述与内容如下:

然后使用str_lower的函数:

使用Array_contains函数

 


推荐阅读
  • 深入浅出:Hadoop架构详解
    Hadoop作为大数据处理的核心技术,包含了一系列组件如HDFS(分布式文件系统)、YARN(资源管理框架)和MapReduce(并行计算模型)。本文将通过实例解析Hadoop的工作原理及其优势。 ... [详细]
  • 本文档提供了详细的MySQL安装步骤,包括解压安装文件、选择安装类型、配置MySQL服务以及设置管理员密码等关键环节,帮助用户顺利完成MySQL的安装。 ... [详细]
  • 深入探讨Web服务器与动态语言的交互机制:CGI、FastCGI与PHP-FPM
    本文详细解析了Web服务器(如Apache、Nginx等)与动态语言(如PHP)之间通过CGI、FastCGI及PHP-FPM进行交互的具体过程,旨在帮助开发者更好地理解这些技术背后的原理。 ... [详细]
  • 时序数据是指按时间顺序排列的数据集。通过时间轴上的数据点连接,可以构建多维度报表,揭示数据的趋势、规律及异常情况。 ... [详细]
  • 本教程旨在指导开发者如何在Android应用中通过ViewPager组件实现图片轮播功能,适用于初学者和有一定经验的开发者,帮助提升应用的视觉吸引力。 ... [详细]
  • 本文介绍了在解决Hive表中复杂数据结构平铺化问题后,如何通过创建视图来准确计算广告日志的曝光PV,特别是针对用户对应多个标签的情况。同时,详细探讨了UDF的使用方法及其在实际项目中的应用。 ... [详细]
  • 本文详细介绍了如何处理Oracle数据库中的ORA-00227错误,即控制文件中检测到损坏块的问题,并提供了具体的解决方案。 ... [详细]
  • 本文探讨了在SharePoint环境中使用BDC(Business Data Catalog)时遇到的问题及其解决策略,包括XML文件导入SSP后的不可见性问题以及与远程SQL Server 2005连接的难题。 ... [详细]
  • Hadoop集群搭建:实现SSH无密码登录
    本文介绍了如何在CentOS 7 64位操作系统环境下配置Hadoop集群中的SSH无密码登录,包括环境准备、用户创建、密钥生成及配置等步骤。 ... [详细]
  • 本文总结了在多人协作开发环境中使用 Git 时常见的问题及其解决方案,包括错误合并分支的处理、使用 SourceTree 查找问题提交、Git 自动生成的提交信息解释、删除远程仓库文件夹而不删除本地文件的方法、合并冲突时的注意事项以及如何将多个提交合并为一个。 ... [详细]
  • 本文详细介绍了Socket在Linux内核中的实现机制,包括基本的Socket结构、协议操作集以及不同协议下的具体实现。通过这些内容,读者可以更好地理解Socket的工作原理。 ... [详细]
  • 本文探讨了在Go语言中处理切片并发修改时如何有效避免竞争条件的方法。 ... [详细]
  • java datarow_DataSet  DataTable DataRow 深入浅出
    本篇文章适合有一定的基础的人去查看,最好学习过一定net编程基础在来查看此文章。1.概念DataSet是ADO.NET的中心概念。可以把DataSet当成内存中的数据 ... [详细]
  • Java高级工程师学习路径及面试准备指南
    本文基于一位朋友的PDF面试经验整理,涵盖了Java高级工程师所需掌握的核心知识点,包括数据结构与算法、计算机网络、数据库、操作系统等多个方面,并提供了详细的参考资料和学习建议。 ... [详细]
  • Maven快照版本管理及更新策略详解
    本文深入探讨了Maven中的快照版本管理和更新策略,解释了快照版本与正式版本的区别,并提供了如何配置快照更新策略的方法,以确保项目依赖始终保持最新。 ... [详细]
author-avatar
天崖人B
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有