preface

In big data development, there are always some special or complex data processing scenarios that cannot be implemented by stacking Hive functions. In this case, we need to implement customized data processing functions (UDF) that can be embedded in Hive. According to the inherited classes, UDF functions can be divided into two types, one is UDF and the other is GenericUDF. GenericUDF development is more complicated than UDF, so GenericUDF is generally considered in the following scenarios:

  1. To support this scenario, we need to implement N evaluate() methods for each of N scenarios. In GenericUDF, we only need to add judgment logic in one method. Route different inputs to different processing logic. For example, a UDF parameter must support both String list and Integer list parameters. You might think we could just keep overloading methods, but Java doesn’t support overloading the same method except for generic types, so GenericUDF is the only way to use this scenario.
  2. You need to pass non-writable or complex data types as parameters. For example, nested data structures, key-values passed to a Map with a value of the list data type, or Struct structures with an indefinite number of data fields, are better suited to using GenericUDF to capture the internal structure of the data at run time.
  3. This UDF is widely and frequently used, so from the perspective of profit, it will optimize everything that can be optimized as much as possible. GenericUDF is more suitable to be considered than UDF in operator, which avoids resource consumption of multiple reflection transformation (more on this later).
  4. The UDF function is expected to have many reconstruction and extension scenarios in the future, so GenericUDF is more excellent in this aspect if it needs to be sufficiently extensible.

Knowledge points involved

reflection

Java can dynamically load classes and obtain class details while the program is running, so as to manipulate the properties and methods of a class or object. In essence, the JVM gets a class object, and then decompiles the class object to obtain various information about the object. Reflection supports dynamic loading of certain classes at run time, as opposed to the general declarative method of creating objects and then getting information about clAS objects, and is common in configurations of various common frameworks. Reflection also consumes system resources, so do not use reflection when it is not needed. Udfs use a reflection mechanism to pass parameter objects.

The generic

By decoupling the constraints between a class or method and the class it uses, the concept of “parameterized typing” is implemented so that code can be applied to multiple types, known as generics. In GenericUDF, all parameter objects are passed as the Object class.

ObjectInspector

Java’s ObjectInspector class is used to help Hive understand the internal architecture of complex objects. It also supports creating specific ObjectInspector objects instead of creating concrete class objects to store information about certain objects in memory. In udFS, ObjectInspector is used to help the Hive engine determine the input and output data types when converting HQL into MR Jobs. Hive statements generate MapReduce jobs, so the Hadoop data format is used instead of the Java data type used to write UDFs. For example, Java int is IntWritable in Hadoop and String is Text in Hadoop. Therefore, we need to convert the Java data type in the UDF to the correct Hadoop data type to support Hive to generate MapReduce jobs from HQL.

Serialize/deserialize

In the OSI seven-layer protocol model, the main function of the Presentation Layer is to convert the data structure or Object of the application Layer (class Object in Java) into a sequence of binary strings (byte array in Java), or vice versa. Converts binary strings (generated during serialization) into application-level objects – these two functions are serialization and deserialization. When the UDF function processing logic involves multi-step operations, it involves serializing data for transmission and deserializing data for processing.

Deferred Object

Lazy-evaluation, which does not perform the evaluation until it is needed (in this case, creating and assigning values to objects). A typical example is the Iterator class, which performs the evaluation and returns the next element only when it is needed. One is short-circuiting, which means that a logic knows the result before it performs an operation. Therefore, it will return the result and exit when it needs to return the result to avoid unnecessary calculation. For example, (false && XXX && XXX) the judgment logic will not perform the following operation because the result must be false.

GenericUDF Code logic layer — outside

Udfs are user-defined Hive functions used to perform processing logic that cannot be implemented in Hive native functions. org.apache.hadoop.hive.ql.udf.generic.GenericUDFThe API provides a generic interface to call and output objects of any data type as generic objects (you can even use Struct objects as maps as long as you provide the corresponding ObjectInspector). This method requires us to override GenericUDF’sinitialize()andevaluate()Method, whose execution logic is explained next.

Execute the process

  1. When Hive parses query, it gets the parameter type of the UDF parameter passed in and callsinitialize()Methods. The method receives an ObjectInspector for each parameter of the UDF, and the method must return an ObjectInspector representing the return value type. By calling this method, Hive knows what data type the UDF will return, so it can continue parsing the Query.
  2. For each row of Hive records, we set theinitialize()The ObjectInspector method reads the ObjectInspector parameters and checks the number of arguments passed and the data type.
  3. inevaluate()Method, we useinitialize()Method to read the ObjectInspector received in theevaluate()Method receives a list of generic Objects (actually Deferredobobjects), which ObjectInspector parses into objects of a specific type to perform data processing, and finally outputs the result.

Code sample

Class ComplexUDFExample extends GenericUDF {// 0. ObjectInspector, which is usually created as a member variable. StringObjectInspector elementOI; @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { // 1. Check that the record is passed with the correct number of arguments if (arguments.length! = 2) { throw new UDFArgumentLengthException("arrayContainsExample only takes 2 arguments: List<T>, T"); } ObjectInspector a = arguments[0]; ObjectInspector b = arguments[1]; if (! (a instanceof ListObjectInspector) || ! (b instanceof StringObjectInspector)) { throw new UDFArgumentException("first argument must be a list / array, second argument must be a string"); } // 3. After the check passes, assign the parameter to the ObjectInspector member variable, in order to evaluate() using this.listOI = (ListObjectInspector) a; this.elementOI = (StringObjectInspector) b; // 4. Check if the array contains a string, if(! (listOI.getListElementObjectInspector() instanceof StringObjectInspector)) { throw new UDFArgumentException("first argument must be a list of strings"); } / / 5. Generated by the factory class used to represent a return value of ObjectInspector return PrimitiveObjectInspectorFactory. JavaBooleanObjectInspector; } @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { // get the list and string from the deferred objects using the object inspectors List<String> list = (List<String>) this.listOI.getList(arguments[0].get()); String arg = elementOI.getPrimitiveJavaObject(arguments[1].get()); // check for nulls if (list == null || arg == null) { return null; } // see if our list contains the value we need for(String s: list) { if (arg.equals(s)) return new Boolean(true); } return new Boolean(false); } @Override public String getDisplayString(String[] arg0) { return "arrayContainsExample()"; // this should probably be better }Copy the code

Development details

Real business development is certainly not as simple as the example above, and there is a good way to use ObjectInspector and convert DeferredObject in real development.

ObjectInspector

When creating ObjectInspector, do not create it as new. Instead, create it in factory mode to ensure that there is only one instance of the same type of ObjectInspector and that the same ObjectInspector can be used in multiple places in the code. Examples created:

ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspector listElementObjectInspector))
Copy the code

Since there is only one ObjectInspector per type, we typically declare the ObjectInspector that will be used throughout the code at the beginning of the class and use it later with this.objecTinSpector.xx. An ObjectInspector class typically has two methods. One is used to get an ObjectInspector of a specific type. This is common when initializing an ObjectInspector. A method used to convert an passed generic Object to a specific type of Object, seen in the evaluate() method. ObjectInspector class inheriting classes:

  • PrimitiveObjectInspector: Java’s String and numerical types are primitive types
  • ListObjectInspector: Indicates the array corresponding to Hive
  • MapObjectInspector: Map corresponding to Hive
  • StructObjectInspector: Structs corresponding to Hive
  • StandardListObjectInspector: ListObjectInspector supports more methods and types than ListObjectInspector. It also supports java.util.List and java.util.Map
  • StandardStructObjectInspector: contains domain of complex objects, provides methods to access the domain, these methods can read ObjectInspector class source code to understand
  • StandardMapObjectInspector

ObjectInspector creates instance methods: StandardListObjectInspector. GetList (inputObject), MapObjectInspector getMap (object), ObjectInspecotor. Get (object), and so on. ObjectInspector accesses the internal value of an object: StandardListObjectInspector. GetListLength (object), StandardListObjectInspector getListElement (input_list, list_size). If Hive is earlier than 1.3.0, You may encounter when cluster debugging UDF org). Apache hadoop. Hive. Serde2. Lazybinary. LazyBinaryArray always be cast to [Ljava. Lang. Object, This problem may require the following line of code to be compatible:

Object input_obj = arguments[0].get();
Object input_list = (input_obj instanceof LazyBinaryArray) ? ((LazyBinaryArray) input_obj).getList() : stdListInspector.getList(input_obj);
Copy the code

DeferredObject conversion

There are generally two ways to convert a generic DeferredObject passed in to a Java type that we can execute, either directly or using Converter. For example, if the DeferredObject we get is a Text object from the Hive code, we can operate on the Text object directly. Put the generic object into the Text object, Text resultText = new Text(object); , or treat it as a String: String resultText = object.toString(); . Method 2, in the initialize () method to create a ObjectInspectorConverters object, in the evaluate () method with the object implementing the generic object into a specific type of operation.

/ / in the initialize () method: ObjectInspectorConverters. The Converter ObjectInspectorConverters; objectInspectorConverters = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableStringObjectInspector); . / / in the evaluate () method: t = Text (Text) enclosing objectInspectorConverters. Convert (the arguments [0]. The get ()); .Copy the code

To output data objects, we also need to convert the object to writable, such as Integer to IntWritable, IntWritable = New IntWritable(object); . To convert ObjectInspector from Primitive to Writable, use the following method:

ObjectInspector before;
  after = ObjectInspectorUtils.getStandardObjectInspector(before, ObjectInspectorCopyOption.WRITABLE)
Copy the code

Note that UDF type checking can only be checked at run time, not at compile time.

Get the element object from the container ObjectInspector

Taking List as an example, suppose we want to get a generic object from input_list and store it as a concrete type into our custom tmp_list.

// Create a temporary list Object based on the input_list length tmp_list = stdListInspector. Create (currSize); List input_list = listInspectorList.getList(args[i].get()); // Iterate over the input_list, fetch the element as a generic object, and cast it as Java for (int j = 0; j < input_list.size(); ++j) { Object genObj = input_list.get(j); Object stdObj = ObjectInspectorUtils.copyToStandardObject(genObj, listInspectorList.getListElementObjectInspector(), ObjectInspectorUtils.ObjectInspectorCopyOption.JAVA); // Put the transformed elements into tmp_list stdListInspector. Set (tmp_list, lastIdx + j, stdObj); } lastIdx += input_list.size();Copy the code

Or you can

    Object input_obj = arguments[0].get();
    for (; list_size < stdListInspector.getListLength(input_list); list_size++) {
    tmp_list.add(stdListInspector.getListElement(input_list, list_size).toString()); 
    }
Copy the code

GenericUDF operator execution layer — inside

UDF/UDAF/UDTF will convert the code into a MapReduce Job, convert various logical execution in the code into Map or Reduce tasks one by one, and form these operators into an operator tree. GenericUDF’s execution logic is: When Hive pars a query that calls a UDF function, it passes a parameter object (which may be a field, a constant, or even the return value of another UDF function) to the UDF function along with the corresponding ObjectInspector. This is passed to the first operator of the MapReduce Job, which calls the initializeOp() method, which receives Object and ObjectInspector, parses the code to know what data structure it needs to output. Pass object and ObjectInspector, which should be output after processing, to the next operator, child Opertor. The Child Operator receives the last operator’s argument object and ObjectInspector and parentId, and so on until the entire Opertor tree is traversed. Where the data Object needs to be serialized for transmission between read and write or between various operator transfers, and also needs to be deserialized into a specific Object to execute code logic. GenericUDF sets the Object type of the parameter Object to Object during transmission. It avoids unnecessary serialization and deserialization (converting to a concrete object when not required) resource consumption.

Testing and Use

GenericUDF test

GenericUDF also supports local tests, but local tests cannot simulate a transition from Hadoop to Java in Hive, and can only test evaluate() logic. In containsString’s case, the test process is to create an object of class new ComplexUDFExample(), and then manually generate ObjectInspector required for the parameters and pass in the Initialize () method. After the local initialization is complete, load the parameter data to be tested by creating a new DeferredObject array, and use assert.assertequals () to compare the returned value to the expected value.

public class ComplexUDFExampleTest { @Test public void testComplexUDFReturnsCorrectValues() throws HiveException { // set up the models we need ComplexUDFExample example = new ComplexUDFExample(); ObjectInspector stringOI = PrimitiveObjectInspectorFactory.javaStringObjectInspector; ObjectInspector listOI = ObjectInspectorFactory.getStandardListObjectInspector(stringOI); JavaBooleanObjectInspector resultInspector = (JavaBooleanObjectInspector) example.initialize(new ObjectInspector[]{listOI, stringOI}); // create the actual UDF arguments List<String> list = new ArrayList<String>(); list.add("a"); list.add("b"); list.add("c"); // test our results // the value exists Object result = example.evaluate(new DeferredObject[]{new DeferredJavaObject(list), new DeferredJavaObject("a")}); Assert.assertEquals(true, resultInspector.get(result)); // the value doesn't exist Object result2 = example.evaluate(new DeferredObject[]{new DeferredJavaObject(list), new DeferredJavaObject("d")}); Assert.assertEquals(false, resultInspector.get(result2)); // arguments are null Object result3 = example.evaluate(new DeferredObject[]{new DeferredJavaObject(null), new DeferredJavaObject(null)}); Assert.assertNull(result3); }}Copy the code

Register UDF functions

1. Package the customized UDF project into a JAR package and upload it to the specified address 2 on the server. Create a temporary UDF function to a JAR package addresses the create temporary function my_lower as’ com. Example. Hive. The UDF. The Lower ‘USING JAR’ HDFS: / / / path/to/JAR ‘; 3. Create a UDF function that points to the JAR package address. Permanent UDF is enabled under certain cluster configuration need to restart the hiveserver can use) the create function my_db. My_lower as’ com. Example. Hive. The UDF. The Lower ‘USING the JAR ‘hdfs:///path/to/jar’; Hive > select my_lower(title), sum(freq) from titles group by my_lower(title); 5. DROP FUNCTION DROP FUNCTION [IF EXISTS] function_name; Note: The same UDF name will cause an error. Try to see if it already exists before naming it

I will write a GenericUDAF development note later if I have time.

The resources

If the English level is good, it is recommended to read all the reference materials, I learned the knowledge in my own language must not be as clear and detailed as the original document. Blog.matthewrathbone.com/2013/08/10/… Blog.csdn.net/czw698/arti… Blog.dataiku.com/2013/05/01/… Cwiki.apache.org/confluence/…