Tuesday, October 6, 2009

reduce scripts in hive

In a previous post, I discussed writing custom map scripts in hive. Now, let's talk about reduce tasks.

The basics

As before, you are not writing an org.apache.hadoop.mapred.Reducer class. Your reducer is just a simple script that reads from stdin (columns separated by \t) and should write rows to stdout (again, columns separated by \t).

Another thing to mention is that you can't run a reduce without first doing a map.

The rows to your reduce script will be sorted by key (you specify which column this is), so that all rows with the same key will be consecutive. One thing that's kind of a pain with hive reducers, is that you need to keep track of when keys change yourself. Unlike a hadoop reducer where you get a (K key, Iterator<V> values), here you just get row after row of columns.

An example

We'll use a similar example to the map script.

We will attempt to condense a table (kv_input) that looks like:
k1 v1
k2 v1
k4 v1
k2 v3
k3 v1
k1 v2
k4 v2
k2 v2
...

into one (kv_condensed) that looks like:

k1 v1,v2
k2 v1,v2,v3
...

The reduce script

#!/usr/bin/perl                                                                                       

undef $currentKey;
@vals=();

while (<STDIN>) {
  chomp();
  processRow(split(/\t/));
}

output();

sub output() {
  print $currentKey . "\t" . join(",", sort @vals) . "\n";
}

sub processRow() {
  my ($k, $v) = @_;

  if (! defined($currentKey)) {
    $currentKey = $k;
    push(@vals, $v);
    return;
  }

  if ($currentKey ne $k) {
    output();
    $currentKey = $k;
    @vals=($v);
    return;
  }

  push(@vals, $v);
}

Please forgive my perl. It's been a long time (I usually write these in java, but thought perl would make for an easier blog example).

As you can see, a lot of the work goes in to just keeping track of when the keys change.

The nice thing about these simple reduce scripts is that it's very easy to test locally, without going through hadoop and hive. Just call your script and pass in some example text separated by tabs. If you do this, you need to remember to sort the input by key before passing into your script (this is usually done by hadoop/hive).

Reducing from Hive

Okay, now that we have our reduce script working, let's run it from Hive.

First, we need to add our map and reduce scripts:

add file identity.pl;
add file condense.pl;

Now for the real work:






01
02
03
04
05
06
07
08
09
10
11


from (
  from kv_input
  MAP k, v
  USING './identity.pl'
  as k, v
 cluster by k) map_output
insert overwrite table kv_condensed
reduce k, v
  using './condense.pl'
  as k, v
;


This is fairly dense, so I will attempt to give a line by line breakdown:

On line 3 we are specifying the columns to pass to our reduce script from the input table (specified on line 2).

As I mentioned, You must specify a map script in order to reduce. For this example, we're just using a simple identity perl script. On line 5 we name the columns the map script will output.

Line 6 specifies the column which is the key. This is how the rows will be sorted when passed to your reduce script.

Line 8 specifies the columns to pass into our reducer (from the map output columns on line 5).

Finally, line 10 names the output columns from our reducer.

(Here's my full hive session for this example, and an example input file).

I hope this was helpful. Next time, I'll talk about some java code I put together to simplify the process of writing reduce scripts.

2 comments:

gurufrequent said...

Thanks man!!
I found this post very useful.

~guru

ameet said...

Thanks Larry,

really helpful. I do have 3 questions and would much appreciate your feedback.

1. Why do we need a map side script? If we are just selecting 2 columns from a table and then performing some calculation in the reduce side with a script, can we not just "select" those 2 columns and then use the construct in reduce like:
select transform(kv_my_input.col1,kv_my_input.col2) using 'reduce_script' as newcol1,newcol2
from kv_my_input cluster by col1;
2. Is the map using 'map script' and reduce using 'reduce script' construct equivalent to transform ( list of columns )? If so, how do we know that the transform in question 1 will produce a "reduce" logic from your script and not the "map" logic. Or it performs whatever is inside the script?
3. Is clustering mandatory? Is it clustering which actually enables one to perform the reduce side script like you have shown? i.e. without clustering, your rows will be scattered ?

thanks again

ameet