set hive.support.cOncurrency= true;
set hive.enforce.bucketing = true;
set hive.exec.dynamic.partition.mode = nonstrict;
set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
set hive.compactor.initiator.on = true;
set hive.compactor.worker.threads = 1;
(4)hive的merge
merge的语法:
MERGE INTO AS T USING
WHEN MATCHED [AND ] THEN UPDATE SET
WHEN MATCHED [AND ] THEN DELETE
WHEN NOT MATCHED [AND ] THEN INSERT VALUES
merge的局限性:
最多三条when语句,只支持update/delete/insert。when not matched 必须在when语句的最后面。
MERGE INTO employee AS T
USING employee_state AS S
ON T.emp_id = S.emp_id and T.start_date = S.start_date
WHEN MATCHED AND S.state = 'update' THEN UPDATE SET dept_name = S.dept_name,work_loc = S.work_loc
WHEN MATCHED AND S.state = 'quit' THEN DELETE
WHEN NOT MATCHED THEN INSERT VALUES(S.emp_id,S.emp_name,S.dept_name,S.work_loc,S.start_date); --这里目标表为employee,源表为employee_state --这里新员工是属于第三中情况,未在目标表中匹配到,所以直接插入到目标表中。
二.hive的udf
(1)什么是hive的udf?
User-defined function (UDF): 这提供了一种使用外部函数(在Java中)扩展功能的方法,可以在HQL中进行评估
--将字符串的所有大写改成小写
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public final class StringLower extends UDF {
public Text evaluate(final Text s) {
if (s == null) { return null; }
return new Text(s.toString().toLowerCase());
}
}
--判断当前字符串是否在数组里面
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.BooleanWritable;
@Description(name = "Arraycontains",
value="_FUNC_(array, value) - Returns TRUE if the array contains value.",
extended="Example:\n"
+ " > SELECT _FUNC_(array(1, 2, 3), 2) FROM src LIMIT 1;\n"
+ " true")
public class ArrayContains extends GenericUDF {
private static final int ARRAY_IDX = 0;
private static final int VALUE_IDX = 1;
private static final int ARG_COUNT = 2;//这个udf函数需要参数的个数
private static final String FUNC_NAME = "ARRAYCONTAINS";//外部名字
private transient ObjectInspector valueOI;
private transient ListObjectInspector arrayOI;
private transient ObjectInspector arrayElementOI;
private BooleanWritable result;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
//检查是否传入了两个参数
if (arguments.length != ARG_COUNT) {
throw new UDFArgumentException("the function" + FUNC_NAME + "accepts"
+ ARG_COUNT + "arguments");
}
//检查参数是否是属于LIST类型
if (!arguments[ARRAY_IDX].getCategory().equals(ObjectInspector.Category.LIST)) {
throw new UDFArgumentTypeException(ARRAY_IDX, "\"" + serdeConstants.LIST_TYPE_NAME + "\""
+ "expected at function ARRAY_CONTAINS,but"
+ "\"" + arguments[ARRAY_IDX].getTypeName() + "\""
+ "is found");
}
arrayOI = (ListObjectInspector) arguments[ARRAY_IDX];
arrayElementOI = arrayOI.getListElementObjectInspector();
valueOI = arguments[VALUE_IDX];
//检查list的元素和传入的值是否属于同一个类型
if (!ObjectInspectorUtils.compareTypes(arrayElementOI, valueOI)) {
throw new UDFArgumentTypeException(VALUE_IDX, "\"" + arrayElementOI.getTypeName() + "\""
+ "expectd at function ARRAY_CONTAINS,but"
+ "\"" + valueOI.getTypeName() + "\""
+ "is found");
}
//检查此类型是否支持比较
if (!ObjectInspectorUtils.compareSupported(valueOI)){
throw new UDFArgumentException("this function" + FUNC_NAME
+"does not support comparison for"
+"\"" + valueOI.getTypeName() + "\""
+ "types");
}
result = new BooleanWritable(false);
return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
result.set(false);
Object array = arguments[ARRAY_IDX].get();
Object value = arguments[VALUE_IDX].get();
int arrayLength = arrayOI.getListLength(array);
//检查数组是否null还是空value是否为null
if (value == null || arrayLength <= 0)//判断value是否为空,若真则不判断右边,不然为假继续判断右边
{
return result;//满足条件直接返回result初始状态值
}
//将值与数组的每个元素进行比较,直到找到匹配项
for (int i=0;i
b.然后通过编译器打包到hdfs文件系统上,通过执行hive命令构造函数
DROP FUNCTION IF EXISTS str_lower;
DROP FUNCTION IF EXISTS Array_contains;
CREATE FUNCTION str_lower AS 'com.data.hiveudf.udf.StringLower'
USING JAR 'hdfs:////apps/hive/functions/df-hiveudf-1.0-SNAPSHOT.jar';
CREATE FUNCTION Array_contains AS 'com.data.hiveudf.gudf.ArrayContains'
USING JAR 'hdfs:////apps/hive/functions/df-hiveudf-1.0-SNAPSHOT.jar';