Wednesday, October 7, 2009

hive map reduce in java

In my last post, I went through an example of writing custom reduce scripts in hive.

Writing a streaming reducer requires a lot of the same work to check for when keys change. Additionally, in java, there's a decent amount of boilerplate to go through just to read the columns from stdin.

To help with this, I put together a really simple little framework that more closely resembles the hadoop Mapper and Reducer interfaces.

To use it, you just need to write a really simple reduce method:
  void reduce(String key, Iterator<String[]> records, Output output);

The helper code will handle all IO, as well as the grouping together of records that have the same key. The 'records' Iterator will run you through all rows that have the key specified in key. It is assumed that the first column is the key. Each element in the String[] record represents a column. These rows aren't buffered in memory or anything, so it can handle any arbitrary number of rows.

Here's the complete example from the my reduce example, in java (even shorter than perl).
public class Condenser {
  public static void main(final String[] args) {
    new GenericMR().reduce(System.in, System.out, new Reducer() {
      void reduce(String key, Iterator records, Output output) throws Exception {
        final StringBuilder vals = new StringBuilder();
        while (records.hasNext()) {
          // note we use col[1] -- the key is provided again as col[0]
          vals.append(records.next()[1]);
          if (records.hasNext()) { vals.append(","); }
        }
        output.collect(new String[] { key, vals.toString() });
      }
    });
  }
}

Here's a wordcount reduce example:

public class WordCountReduce {
  public static void main(final String[] args) {
    new GenericMR().reduce(System.in, System.out, new Reducer() {
      public void reduce(String key, Iterator<String[]> records, Output output) throws Exception {
        int count = 0;
        
        while (records.hasNext()) {
          count += Integer.parseInt(records.next()[1]);
        }
        
        output.collect(new String[] { key, String.valueOf(count) });
      }
    });
  }
}


Although the real value is in making it easy to write reducers, there's also support for helping with mappers. Here's my key value split mapper from a previous example:

  public class KeyValueSplit {
    public static void main(final String[] args) {
      new GenericMR().map(System.in, System.out, new Mapper() {
      public void map(String[] record, Output output) throws Exception {
        for (final String kvs : record[0].split(",")) {
          final String[] kv = kvs.split("=");
          output.collect(new String[] { kv[0], kv[1] });
        }
      }
    }
  }

The full source code is available here. Or you can download a prebuilt jar here.

The only dependency is apache commons-lang.

I'd love to hear any feedback you may have.

4 comments:

tim said...

Thanks for this very informative article. I have a question though. How do you invoke the custom java map/reduce functions since there is no script anymore, just a jar file?

larry ogrodnek said...

Sorry Tim, I missed this comment.

To invoke, I just run java -cp , e.g.:

reduce x, y
using 'java -cp something.jar com.bizo.SomeReducer'

Unknown said...

Thanks for your work. It's very useful. Is there an easy way to modify your code to obtain a null key assigned to a value which contains the filepath of the hive table ?

Unknown said...

This is great, Larry.

Couple of syntax fixes for the Condenser class to avoid compile time errors:

1) As show in your other examples, reduce method should specify Iterator instead of just Iterator for the type of records.

2) The main() method should have "throws Exception".