Thursday, July 29, 2010

Extending Hive with Custom UDTFs

Let’s take a look at the canonical word count example in Hive: given a table of documents, create a table containing each word and the number of times it appears across all documents.


Here’s one implementation from the Facebook engineers:



CREATE TABLE docs(contents STRING);

FROM (
MAP docs.contents
USING 'tokenizer_script'
AS
word,
cnt
FROM docs
CLUSTER BY word
) map_output
REDUCE map_output.word, map_output.cnt
USING 'count_script'
AS
word,
cnt
;

In this example, the heavy lifting is being done by calling out to two scripts, ‘tokenizer_script’ and ‘count_script’, that provide custom mapper logic and reducer logic.


Hive 0.5 adds User Defined Table-Generating Functions (UDTF), which offers another option for inserting custom mapper logic. (Reducer logic can be plugged in via a User Defined Aggregation Function, the subject of a future post.) From a user perspective, UDTFs are similar to User Defined Functions except they can produce an arbitrary number of output rows for each input row. For example, the built-in UDTF “explode(array A)” converts a single row of input containing an array into multiple rows of output, each containing one of the elements of A.


So, let’s implement a UDTF that does the same thing as the ‘tokenizer_script’ in the word count example. Basically, we want to convert a document string into multiple rows with the format (word STRING, cnt INT), where the count will always be one.


The Tokenizer UDTF


To start, we extend the org.apache.hadoop.hive.ql.udf.generic.GenericUDTF class. (There is no plain UDTF class.) We need to implement three methods: initialize, process, and close. To emit output, we call the forward method.


Adding a name and description:



@description(name = "tokenize", value = "_FUNC_(doc) - emits (token, 1) for each token in the input document")
public class TokenizerUDTF extends GenericUDTF {

You can add a UDTF name and description using a @description annotation. These will be available on the Hive console via the show functions and describe function tokenize commands.


The initialize method:



public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException

This method will be called exactly once per instance. In addition to performing any custom initialization logic you may need, it is responsible for verifying the input types and specifying the output types.


Hive uses a system of ObjectInspectors to both describe types and to convert Objects into more specific types. For our tokenizer, we want a single String as input, so we’ll check that the input ObjectInspector[] array contains a single PrimitiveObjectInspector of the STRING category. If anything is wrong, we throw a UDFArgumentException with a suitable error message.



if (args.length != 1) {
throw new UDFArgumentException("tokenize() takes exactly one argument");
}

if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE
&& ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
throw new UDFArgumentException("tokenize() takes a string as a parameter");
}

We can actually use this object inspector to convert inputs into Strings in our process method. This is less important for primitive types, but it can be handy for more complex objects. So, assuming stringOI is an instance variable,



stringOI = (PrimitiveObjectInspector) args[0];

Similarly, we want our process method to return an Object[] array containing a String and an Integer, so we’ll return a StandardStructObjectInspector containing a JavaStringObjectInspector and a JavaIntObjectInspector. We’ll also supply names for these output columns, but they’re not really relevant at runtime since the user will supply his or her own aliases.



List fieldNames = new ArrayList(2);
List fieldOIs = new ArrayList(2);
fieldNames.add("word");
fieldNames.add("cnt");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldOIs.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}

The process method:



public void process(Object[] record) throws HiveException

This method is where the heavy lifting occurs. This gets called for each row of the input. The first task is to convert the input into a single String containing the document to process:



String document = (String) stringOI.getPrimitiveJavaObject(record[0]);

We can now implement our custom logic:



if (document == null) {
return;
}
String[] tokens = document.split(“\\s+”);
for (String token : tokens) {
forward(new Object[] { token, Integer.valueOf(1) });
}
}

The close method:



public void close() throws HiveException { }

This method allows us to do any post-processing cleanup. Note that the output stream has already been closed at this point, so this method cannot emit more rows by calling forward. In our case, there’s nothing to do here.


Packaging and use:


We deploy our TokenizeUDTF exactly like a UDF. We deploy the jar file to our Hive machine and enter the following in the console:



> add jar TokenizeUDTF.jar ;
> create temporary function tokenize as ’com.bizo.hive.udtf.TokenizeUDTF’ ;
> select tokenize(contents) as (word, cnt) from docs ;

This gives us the intermediate mapped data, ready to be reduced by a custom UDAF.


The code for this example is available in this gist.