Wednesday, January 18, 2012

Using GenericUDFs to return multiple values in Apache Hive

A basic user defined function (UDF) in Hive is very easy to write: you simply subclass org.apache.hadoop.hive.ql.exec.UDF and implement an evaluate method.  We've previously written about this strategy, and it works well for most simple cases.

The first case where this breaks down is when you want to return multiple values from your UDF.  For me, this often arises when we have serialized data stored in a single Hive field and want to extract multiple pieces of information from it.

For example, suppose we have a simple Person object (leaving out all of the error checking code):

case class Person(val firstName: String, val lastName: String)


object Person {
  def serialize(p: Person): String = {
    p.firstName + "|" + p.lastName
  }


  def deserialize(s: String): Person = {
    val parts = s.split("|")
    Person(parts(0), parts(1))
  }
}

We want to convert a data table containing these serialized objects into one containing firstName and lastName columns.

create table input(serializedPerson string) ;
load data local inpath ... ;

create table output(firstName string, lastName string) ;


So, what should our UDF and query look like?


Using the previous strategy, we could create two separate UDFs:


insert overwrite table output
select firstName(serializedPerson), lastName(serializedPerson)
from input ;


Unfortunately, the two invocations will have to separately deserialize their inputs, which could be expensive in less trivial examples.  It also requires writing two separate implementation classes whose only difference is which field to pull out of your model object.


An alternative is to use a GenericUDF and return a struct instead of a simple string.  This requires using object inspectors to specify the input and output types, just like in a UDTF:


class DeserializePerson extends GenericUDF {
  private var inputInspector: PrimitiveObjectInspector = _

  def initialize(inputs: Array[ObjectInspector]): StructObjectInspector = {
    this.inputInspector = inputs(0).asInstanceOf[PrimitiveObjectInspector]

    val stringOI =    PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(STRING)

    val outputFieldNames = Seq("firstName", "lastName")
    val outputInspectors = Seq(stringOI, stringOI)
    ObjectInspectorFactory.getStandardStructObjectInspector(outputFieldNames,
      outputInspectors)
  }

  def getDisplayString(children: Array[String]): String = {
    "deserialize(" + children.mkString(",") + ")"
  }

  def evaluate(args: Array[DeferredObject]): Object = {
    val input = inputInspector.getPrimitiveJavaObject(args(0).get)
    val person = Person.deserialize(input.asInstanceOf[String])
    Array(person.firstName, person.lastName)
  }

}


Here, we're specifying that we expect a single primitive object inspector as an input (error handling code omitted) and returning a struct containing two fields, both of which are strings.  We can now use the following query:


create temporary function deserializePerson as 'com.bizo.udf.DeserializePerson' ;


insert overwrite table output
select person.firstName, person.lastName
from (
  select deserializePerson(serializedPerson)
  from input
) parsed ;


This query deserializes the person only once but gives you access to both of the values returned by the UDF.


Note that this method does not allow you to return multiple rows -- for that, you still need to use a UDTF.

4 comments:

Matthew Rathbone said...

The problem with this though is that the hive-users need to have knowledge of the object before hand to properly take advantage of the system. It's not so easy for them to browse each table and find what they want. Do you have any tools to allow people to do that?

Anonymous said...

I'd recommend having the query that loads data expan the object into its individual fields, as in the "output" table in the post. That way, the only time users need to be aware of the object structure is if they're using the UDF directly.

For that case, I'd recommend documenting the object format in a @Description annotation (http://hive.apache.org/docs/r0.7.0/api/org/apache/hadoop/hive/ql/exec/Description.html) on your UDF. The "value" and "extended" fields will then be available inside the Hive console via "describe function foo" and "describe extended function foo".

Satpreet said...

Why not return an array?

Anonymous said...

The main reason would be that structs are a better description of what you're actually returning. In the example, we can access the data inside of the result using "firstName" and "lastName". It might seem pretty intuitive to the developer to simply make these two fields of a string array, but what happens if the return type has a large number of fields or if there is no natural order to the fields?

The other reason is that arrays are homogeneous in Hive, so you can't return multiple types of data in a single array. With a struct, for example, you could return a firstName, lastName, age (integer-valued), aliases (array of strings), and known addresses (array of structs).