What is Hive UDF? What’s the use? How does it work? How does that work? This article starts from the use of UDF, a brief introduction to the relevant source code, UDF from scratch.

This article is shared by Tontesa from Hive UDF, That’s it.

Hive provides many built-in functions and supports user expansion. After being added based on rules, Hive can be used in SQL execution. Hive supports UDF, UDTF, and UDAF.

UDF (User Defined Function) User-defined functions

User-defined Table Generating Function (UDTF) User-defined Table Generating Function that generates multiple rows for one row of data

UDAF (User-defined Aggregation Function) is a user-defined Aggregation Function that generates one row of data

1. Introduction of UDF

Udfs are classified into two types: 1. Temporary functions are valid only in the current session. 2. Permanent function registers UDF information in MetaStore metadata, which can be used permanently.

Implementing udFs requires either inheriting a specific UDF or GenericUDF.

  • Apache. Hadoop. Hive. Ql. Exec. The UDF, deal with basic data types, and returns an int, string, Boolean, double, etc.;

  • Apache. Hadoop. Hive. Ql. Udf. Generic. GenericUDF, can handle and return complex data types, such as Map, List, Array, etc., at the same time support nested;

2. Udf-related syntax

UDF uses the REQUIRED UDF classes to compile into JAR packages and add them to Hive, creating temporary or permanent functions as required.

2.1. The resources operation

Hive supports adding resources to sessions, including files, jars, and archives. After being added, resources can be directly referenced in SQL, which is valid only for the current session. The local path, such as HDFS, is read by default. For example, add jar /opt/ht/ addudf.jar.

ADD resources ADD {FILE | JAR [S] [S] | ARCHIVE [S]} < filepath1 > / < filepath2 >] * view resource LIST {FILE | JAR [S] [S] | ARCHIVE [S]} [<filepath1> <filepath2> ..] DELETE DELETE resources {FILE | JAR [S] [S] | ARCHIVE [S]} [< filepath1 > < filepath2 >..]Copy the code

2.2. Temporary functions

This parameter is valid only for the current session. The database cannot be specified. The USING path must be quoted.

CREATE TEMPORARY FUNCTION function_name AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
DROP TEMPORARY FUNCTION [IF EXISTS] function_name;
Copy the code

2.3. Permanent functions

Function information is stored permanently. The USING path must be quoted. You can use the USING statement for temporary and permanent functions. Hive automatically adds specified files to the current environment. The effect is the same as that of the Add statement.

CREATE FUNCTION [db_name.]function_name AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
DROP FUNCTION [IF EXISTS] function_name;
RELOAD (FUNCTIONS|FUNCTION);
Copy the code

2.4. View functions

View all functions, regardless of temporary function and permanent function show functions; Select * from 'x 'where 'x' = 'x '; Desc function function_name; Desc function extended function_name;Copy the code

3. The Description notes

Hive defined annotation type org. Apache. Hadoop. Hive. Ql. Exec. Description, used to perform desc function extended function introduced function when the function_name, built-in function the same as the custom function usage.

[Remarks] If the Description annotation name is different from the name specified during UDF creation, the name specified during UDF creation prevails.

Public @interface Description {// Function simple String value() default "FUNC is undocumented"; String extended() default ""; // Function name String name() default ""; }Copy the code

For example: The code of the Hive built-in CeiL function GenericUDFCeil is defined as follows

desc function ceil;

desc function extended ceil;

4.UDF​

An inherited UDF class must implement the EVALUATE method, which supports defining multiple EVALUATE methods with different argument lists for handling different types of data, as shown below

Public evaluate(Text s) public int evaluate(Integer s)...Copy the code

4.1. UDF example

Implement UDF functions that concatenate strings and add ints.

@Description( name="my_plus", value="my_plus() - if string, do concat; if integer, do plus", extended = "Example : \n >select my_plus('a', 'b'); \n >ab\n >select my_plus(3, 5); \n >8" ) public class AddUDF extends UDF { public String evaluate(String... parameters) { if (parameters == null || parameters.length == 0) { return null; } StringBuilder sb = new StringBuilder(); for (String param : parameters) { sb.append(param); } return sb.toString(); } public int evaluate(IntWritable... parameters) { if (parameters == null || parameters.length == 0) { return 0; } long sum = 0; for (IntWritable currentNum : parameters) { sum = Math.addExact(sum, currentNum.get()); } return (int) sum; }}Copy the code

hdfs dfs -put AddUDF.jar /tmp/ht/

create function my_plus as ‘com.huawei.ht.test.AddUDF’ using jar ‘hdfs:///tmp/ht/AddUDF.jar’;

desc function my_plus;

desc function extended my_plus;

The UDF is added and recorded in the FUNCS, FUNC_RU metadata tables

4.2. Source code analysis

DefaultUDFMethodResolver is the default method parser. When executing, the parser reflects the EVALUATE method of the UDF class and executes it. The code of the UDF class is as follows:

UDF

Public class UDF {// UDF method resolver private UDFMethodResolver; // Default constructor DefaultUDFMethodResolver public UDF() {RSLV = new DefaultUDFMethodResolver(this.getClass()); } protected UDF(UDFMethodResolver rslv) { this.rslv = rslv; } public void setResolver(UDFMethodResolver rslv) { this.rslv = rslv; } public UDFMethodResolver getResolver() { return rslv; } public String[] getRequiredJars() { return null; } public String[] getRequiredFiles() { return null; }}Copy the code

DefaultUDFMethodResolver

public class DefaultUDFMethodResolver implements UDFMethodResolver { //The class of the UDF. private final Class<? extends UDF> udfClass; public DefaultUDFMethodResolver(Class<? extends UDF> udfClass) { this.udfClass = udfClass; } @Override public Method getEvalMethod(List<TypeInfo> argClasses) throws UDFArgumentException { return FunctionRegistry.getMethodInternal(udfClass, "evaluate", false, argClasses); }}Copy the code

5.GenericUDF​

GenericUDF is more versatile than UDF and supports all parameter types encapsulated by ObjectInspector. The parameter Writable class is encapsulated by DeferredObject. When used, simple types can be obtained directly from Writable and complex types can be resolved by ObjectInspector.

GenericUDF must implement the following three interfaces:

// ObjectInspector is a data type encapsulated class with no actual parameter values. Public ObjectInspector Initialize (ObjectInspector[] objectInspectors) throws UDFArgumentException {return null; Public Object evaluate(DeferredObject[] deferredObjects) throws HiveException { return null; } public String getDisplayString(String[] strings) {return null; }Copy the code

5.1. GenericUDF example

User-defined functions implement the count function and support the int and long types. Hive does not have the long type and the corresponding type is Bigint. Create function and database storage are the same as UDF.

The initialize, traverse ObjectInspector [] check each parameter type, depending on the type of parameter structure ObjectInspectorConverters. The Converter, Used to convert parameter types passed by Hive to ObjectInspector, the corresponding Writable encapsulated object, for subsequent unified processing.

Evaluate, initialize with the specific type of each parameter recorded, get the object from DeferredObject, and use the corresponding Converter object to convert it to Writable to perform the calculation based on the type.

Example: Handling int types,

When UDF queries constants, the encapsulation type in DeferredObject is IntWritable.

When UDF queries table fields, the encapsulation type in DeferredObject is LazyInteger.

@Description( name="my_count", value="my_count(...) - count int or long type numbers", extended = "Example :\n >select my_count(3, 5); \n >8\n >select my_count(3, 5, 25); \n >33" ) public class MyCountUDF extends GenericUDF { private PrimitiveObjectInspector.PrimitiveCategory[] inputType; private transient ObjectInspectorConverters.Converter intConverter; private transient ObjectInspectorConverters.Converter longConverter; @Override public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException { int length  = objectInspectors.length; inputType = new PrimitiveObjectInspector.PrimitiveCategory[length]; for (int i = 0; i < length; i++) { ObjectInspector currentOI = objectInspectors[i]; ObjectInspector.Category type = currentOI.getCategory(); if (type ! = ObjectInspector.Category.PRIMITIVE) { throw new UDFArgumentException("The function my_count need PRIMITIVE Category, but get " + type); } PrimitiveObjectInspector.PrimitiveCategory primitiveType = ((PrimitiveObjectInspector) currentOI).getPrimitiveCategory(); inputType[i] = primitiveType; switch (primitiveType) { case INT: if (intConverter == null) { ObjectInspector intOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType); intConverter = ObjectInspectorConverters.getConverter(currentOI, intOI); } break; case LONG: if (longConverter == null) { ObjectInspector longOI = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveType); longConverter = ObjectInspectorConverters.getConverter(currentOI, longOI); } break; default: throw new UDFArgumentException("The function my_count need INT OR BIGINT, but get " + primitiveType); } } return PrimitiveObjectInspectorFactory.writableLongObjectInspector; } @Override public Object evaluate(DeferredObject[] deferredObjects) throws HiveException { LongWritable out = new LongWritable(); for (int i = 0; i < deferredObjects.length; i++) { PrimitiveObjectInspector.PrimitiveCategory type = this.inputType[i]; Object param = deferredObjects[i].get(); switch (type) { case INT: Object intObject = intConverter.convert(param); out.set(Math.addExact(out.get(), ((IntWritable) intObject).get())); break; case LONG: Object longObject = longConverter.convert(param); out.set(Math.addExact(out.get(), ((LongWritable) longObject).get())); break; default: throw new IllegalStateException("Unexpected type in MyCountUDF evaluate : " + type); } } return out; } @Override public String getDisplayString(String[] strings) { return "my_count(" + Joiner.on(", ").join(strings) + ")";  } } create function my_count as 'com.huawei.ht.test.MyCountUDF' using jar 'hdfs:///tmp/countUDF.jar'; create table test_numeric(i1 int, b1 bigint, b2 bigint, i2 int, i3 int); insert into table test_numeric values(0, -10, 25, 300, 15), (11, 22, 33, 44, 55); select *, my_count(*) from test_numeric;Copy the code

5.2. Source code Analysis

GenericUDF internally defines the method call order, and subclasses can implement the corresponding functions. When calling, the UDF object is obtained from FunctionRegistry according to the function name, and the execution result is returned.

Hive uses ObjectInspector to encapsulate all data types. Enumeration categories are defined to distinguish PRIMITIVE, LIST, MAP, STRUCT, and UNION types. PRIMITIVE indicates a common type (int, long, double, etc.).

ObjectInspector

Public interface ObjectInspector extends Cloneable {String getTypeName(); Objectinspector.category getCategory(); public static enum Category { PRIMITIVE, LIST, MAP, STRUCT, UNION; private Category() { } } }Copy the code

PrimitiveObjectInspector PrimitiveCategory, basic types

Public static enum PrimitiveCategory {VOID, BOOLEAN, BYTE, SHORT, INT, LONG... }Copy the code

GenericUDF. initializeAndFoldConstants

Initialize gets the output ObjectInspector, or evaluate if it is a constant.

In the compilation phase of this method, when the AST constructor Operator traverses the SQL node, constants are directly calculated. For other types, only initialize is executed.

When evaluating a table field, in tasks such as MR, initialize and evaluate are called during Operator execution (e.g., SelectOperator).

public ObjectInspector initializeAndFoldConstants(ObjectInspector[] arguments) throws UDFArgumentException { ObjectInspector oi = this.initialize(arguments); if (this.getRequiredFiles() == null && this.getRequiredJars() == null) { boolean allConstant = true; for(int ii = 0; ii < arguments.length; ++ii) { if (! ObjectInspectorUtils.isConstantObjectInspector(arguments[ii])) { allConstant = false; break; } } if (allConstant && ! ObjectInspectorUtils.isConstantObjectInspector((ObjectInspector)oi) && FunctionRegistry.isConsistentWithinQuery(this) &&  ObjectInspectorUtils.supportsConstantObjectInspector((ObjectInspector)oi)) { GenericUDF.DeferredObject[] argumentValues  = new GenericUDF.DeferredJavaObject[arguments.length]; for(int ii = 0; ii < arguments.length; ++ii) { argumentValues[ii] = new GenericUDF.DeferredJavaObject(((ConstantObjectInspector)arguments[ii]).getWritableConstantValue()); } try { Object constantValue = this.evaluate(argumentValues); oi = ObjectInspectorUtils.getConstantObjectInspector((ObjectInspector)oi, constantValue); } catch (HiveException var6) { throw new UDFArgumentException(var6); } } return (ObjectInspector)oi; } else { return (ObjectInspector)oi; }}Copy the code

6.UDF related source code

6.1. The operator

In Hive SQL, operators such as +, -, *, /, and = are UDF functions declared in FunctionRegistry. All UDFs are parsed when the AST generates the Operator tree during compilation. Constants are directly calculated. Gets the output type used to generate the Operator tree, which then evaluates the resulting value when the Operator actually executes.

static {
  HIVE_OPERATORS.addAll(Arrays.asList(
      "+", "-", "*", "/", "%", "div", "&", "|", "^", "~",
      "and", "or", "not", "!",
      "=", "==", "<=>", "!=", "<>", "<", "<=", ">", ">=",
      "index"));
}
Copy the code

6.2. Function types

Hive contains BUILTIN, PERSISTENT, and TEMPORARY functions.

public static enum FunctionType {
  BUILTIN, PERSISTENT, TEMPORARY;
}
Copy the code

6.3. FunctionRegistry​

All Hive UDFs are managed by FunctionRegistry, which only manages UDFs in memory and does not operate databases.

The built-in functions are initialized in the FunctionRegistry static block and are not recorded in the database; User-defined UDF additions and deletions are performed locally in HiveServer. Temporary functions are processed in SessionState, and permanent functions are processed by FunctionTask calling FunctionRegistry corresponding methods. FunctionTask is responsible for writing library after loading.

Public Final Class FunctionRegistry {... private static final Registry system = new Registry(true); static { system.registerGenericUDF("concat", GenericUDFConcat.class); system.registerUDF("substr", UDFSubstr.class, false); ... }... public static void registerTemporaryMacro( String macroName, ExprNodeDesc body, List<String> colNames, List<TypeInfo> colTypes) { SessionState.getRegistryForWrite().registerMacro(macroName, body, colNames, colTypes); } public static FunctionInfo registerPermanentFunction(String functionName, String className, boolean registerToSession, FunctionResource[] resources) { return system.registerPermanentFunction(functionName, className, registerToSession, resources); }... }Copy the code

6.4. GenericUDFBridge​

In Hive, BOTH UDF and GenericUDF are processed in GenericUDF mode. GenericUDFBridge ADAPTS to GenericUDF.

When a UDF is added, FunctionRegistry calls the Registry object to add the UDF, and Registry encapsulates the UDF as GenericUDFBridge and stores it in the built-in.

Registry

private FunctionInfo registerUDF(String functionName, FunctionType functionType,
    Class<? extends UDF> UDFClass, boolean isOperator, String displayName,
    FunctionResource... resources) {
  validateClass(UDFClass, UDF.class);
  FunctionInfo fI = new FunctionInfo(functionType, displayName,
      new GenericUDFBridge(displayName, isOperator, UDFClass.getName()), resources);
  addFunction(functionName, fI);
  return fI;
}
Copy the code

GenericUDFBridge

The UDF obtains the EVALUATE method of the UDF class based on parameter reflection and ADAPTS the parameter to the corresponding type automatically. Therefore, the UDF does not need to know whether the local execution of the function is consistent with the specific type of yarn.

Part of the code is as follows:

public GenericUDFBridge(String udfName, boolean isOperator, String udfClassName) { this.udfName = udfName; this.isOperator = isOperator; this.udfClassName = udfClassName; } @override public ObjectInspector Initialize (ObjectInspector[] arguments) throws UDFArgumentException {// Initialize the UDF object try { udf = (UDF)getUdfClassInternal().newInstance(); } catch (Exception e) { throw new UDFArgumentException( "Unable to instantiate UDF implementation class " + udfClassName  + ": " + e); } // Resolve for the method based on argument types ArrayList<TypeInfo> argumentTypeInfos = new ArrayList<TypeInfo>( arguments.length); for (ObjectInspector argument : arguments) { argumentTypeInfos.add(TypeInfoUtils .getTypeInfoFromObjectInspector(argument)); } udfMethod = udf.getResolver().getEvalMethod(argumentTypeInfos); udfMethod.setAccessible(true); // Create parameter converters conversionHelper = new ConversionHelper(udfMethod, arguments); // Create the non-deferred realArgument realArguments = new Object[arguments.length]; // Get the return ObjectInspector. ObjectInspector returnOI = ObjectInspectorFactory .getReflectionObjectInspector(udfMethod.getGenericReturnType(), ObjectInspectorOptions.JAVA); return returnOI; } @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { assert (arguments.length == realArguments.length); // Calculate all the arguments for (int i = 0; i < realArguments.length; i++) { realArguments[i] = arguments[i].get(); } // Call the function, Object result = FunctionRegistry. Invoke (udfMethod, UDF, conversionHelper .convertIfNecessary(realArguments)); // For non-generic UDF, type info isn't available. This poses a problem for Hive Decimal. // If the returned value is HiveDecimal, we assume maximum precision/scale. if (result ! = null && result instanceof HiveDecimalWritable) { result = HiveDecimalWritable.enforcePrecisionScale ((HiveDecimalWritable) result, HiveDecimal.SYSTEM_DEFAULT_PRECISION, HiveDecimal.SYSTEM_DEFAULT_SCALE); } return result; }Copy the code

6.5. Function call entry

When a function is used in SQL, there may be three calls. The number of lines of code in different versions may be different, and the flow is similar.

1. Traverse the syntax tree at compile time to convert operators.

TypeCheckProcFactory. GetXpathOrFuncExprNodeDesc generated according to operator or UDF names in SQL expression object ExprNodeGenericFuncDesc, internal call GenericUDF method.

2. Enable constant propagation optimizer for optimization, ConstantPropagate in traverse tree procedure call;

The optimizer default open, parameter control “hive. Optimize. Constant. The propagation”.

ConstantPropagate optimization in traverse the nodes, try to calculate in advance constant expressions, by ConstantPropagateProcFactory. EvaluateFunction UDF.

3.UDF parameters are not constant. When Operator is executed during SQL execution as planned;

When the Operator actually perform ExprNodeGenericFuncEvaluator. _evaluate processing each row data, calculate the UDF the result value.

@Override
protected Object _evaluate(Object row, int version) throws HiveException {
  if (isConstant) {
    // The output of this UDF is constant, so don't even bother evaluating.
    return ((ConstantObjectInspector) outputOI).getWritableConstantValue();
  }
  rowObject = row;
  for (GenericUDF.DeferredObject deferredObject : childrenNeedingPrepare) {
    deferredObject.prepare(version);
  }
  return genericUDF.evaluate(deferredChildren);
}
Copy the code

Click to follow, the first time to learn about Huawei cloud fresh technology ~