Monday, February 18, 2013

Map-side aggregations in Apache Hive

When running large scale Hive reports, one error we occasionally run into is the following:

Possible error:
 Out of memory due to hash maps used in map-side aggregation.

Solution:
 Currently hive.map.aggr.hash.percentmemory is set to 0.5. Try setting it to a lower value. i.e 'set hive.map.aggr.hash.percentmemory = 0.25;'

What’s going on is that Hive is trying to optimize the query by performing a map-side aggregation.  This is a map-side optimization that does a partial aggregation inside of the mapper, which results in the mapper outputting fewer rows.  In turn, this reduces the amount of information that Hadoop needs to sort and distribute to the reducers.

Let’s think about what the Hadoop job looks like with the canonical word count example.

In the word count example, the naive approach is for the mapper to tokenize each row of input and output the key-value pair (#{token}, 1).  The Hadoop framework will sort these pairs by the tokens, and the reducer sums the values to produce the total counts for each token.

Using a map-side aggregation, the mappers would instead tokenize each row and store partial counts in an in-memory hash map.  (More precisely, the mappers are storing each key with the corresponding partial aggregation, which is just a count in this case.)  Periodically, the mappers will output the pairs (#{token}, #{token_count}).  The Hadoop framework again sorts these pairs and the reducers sum the values to produce the total counts for each token.  In this case, the mappers will each output one row for each token every time the map is flushed instead of one row for each occurrence of each token.  The tradeoff is that they need to keep a map of all tokens in memory.

By default, Hive will try to use the map-side aggregation optimization, but it falls back to the standard approach if the hash map is not producing enough of a memory savings.  After processing 100,000 rows (modifiable via hive.groupby.mapaggr.checkinterval), Hive will check the number of items in the hash map.  If it exceeds 50% (modifiable via hive.map.aggr.hash.min.reduction) of the number of rows read, the map-side aggregation will be aborted.

Hive will also estimate the amount of memory needed for each entry in the hash map and flush the map to the reducers whenever the size of the map exceeds 50% of the available mapper memory (modifiable via hive.map.aggr.hash.percentmemory).  This, however, is an estimate based on the number of rows and the expected size of each row, so if the memory usage is per row is unexpectedly high, the mappers may run out of memory before the hash map is flushed to the reducers.

In particular, if a query uses a count distinct aggregation, the partial aggregations actually contain a list of all values seen.  As more distinct values are seen, the amount of memory used by the map will increase without necessarily increasing the number of rows of the map, which is what Hive uses to determine when to flush the partial aggregations to the reducers.

Whenever a mapper runs out of memory, a group by clause is present, and map-side aggregation is turned on, Hive will helpfully suggest that you reduce the flush threshold to avoid running out of memory.  This will lower the threshold (in rows) of when Hive will automatically flush the map, but it may not help if the map size (in bytes) is growing independently of the number of rows.

Some alternate solutions include simply turning off map-side aggregations (set hive.map.aggr = false), allocating more memory to your mappers via the Hadoop configuration, or restructuring the query so that Hive will pick a different query plan.

For example, a simple
 select count(distinct v) from tbl
can be rewritten as
 select count(1) from (select v from tbl group by v) t.

This latter query will avoid using the count distinct aggregation and may be more efficient for some queries.

1 comment:

Anonymous said...

great explanation. Simple and to the point. Thanks for writing about map side aggregation technique