Thursday, December 15, 2011

4 tips from the trenches of Amazon Elastic MapReduce and Hive

Some things you can learn from others, and some things you can only learn from experience.  In an effort to move some knowledge from the latter category into the former, here are four things we've learned the hard way while working with big data flows using Hive on Amazon's Elastic MapReduce:

1. To prevent data loss, make sure Hive knows if it owns the data.

All of our data processing follows a simple pattern: servers write logs to S3, and we use the basic EMR/S3 integration to read this data in our Hive scripts.  A typical table definition could look something like this:

create external table sample_data(d map<string,string>)
  comment 'data logs uploaded from servers'
  row format delimited
    fields terminated by '\004'
    collection items terminated by '\001'
    map keys terminated by '\002'
  stored as textfile
  location 's3://log-bucket/hive/data/files/sample_data/'
;

(The single map column is our way of supporting dynamic columns in Hive.)

The most easily overlooked part of this table definition is the "external" keyword, which tells Hive that the actual underlying data is managed by some other process.  If you forget to add this keyword and later issue a "drop table" command, Hive will happily nuke all of your log files.

This can be especially troublesome while doing ad hoc analysis, as these usually involve interactive queries in the Hive console with a workflow that often involves an exploratory style of development that includes deleting and recreating tables.

One common use pattern where dropping tables appears is when running scripts that include a create statement.  Normally, trying to recreate a table that already exists in Hive will cause an error, so a script may preemptively issue a drop command in case that table already exists.  An alternative is to change the table definition to tell Hive to ignore create statements for preexisting tables:

create external table if not exists sample_data(d map<string,string>)

In the event that you accidentally forget to specify "external" in your table definition, you can add it later by altering the table:

alter table sample_data set tblproperties ('EXTERNAL'='TRUE') ;

Note that the capitalization in the table properties is significant.  Additionally, this feature is only available in Hive 0.6.0+.  (It was present but buggy prior to 0.6.0.)

2. Use a smart partitioning scheme to create flexible views of the data.

Hive supports partitioning, which can be a huge performance win when querying only a subset of the entire dataset.  This is critical when the entire dataset consists of years of request information but you're only interested in analyzing one day of traffic.  To take advantage of this, we upload our data into hourly paths in S3:

s3://log-bucket/log-name/year=${year}/month=${month}/day=${day}/hour=${hour}/${logfile}.log.gz

We would then load this data with the following statements:

create external table sample_data(d map<string,string>)
  comment 'data logs uploaded from servers'
  partitioned by (
    year string, 
    month string, 
    day string, 
    hour string)
  row format delimited
    fields terminated by '\004'
    collection items terminated by '\001'
    map keys terminated by '\002'
  stored as textfile
  location 's3://log-bucket/log-name/'
;
alter table sample_data recover partitions ;

The year/month/day/hour partitions are now available to be used in select/where statements just like columns.

We could (and originally did) just use a single partition column for the entire date; however, using multiple partition columns allows Hive to complete ignore the presence of other partitions.  The above statement will need to recover metadata for 8-9 thousand partitions of data, which (while less troublesome than not partitioning at all) will still require a lot of time and memory.  Multiple partition columns lets us create (for example) a "view" of a single month:

create external table sample_data_2011_12(d map<string,string>)
  comment 'data logs uploaded from servers'
  partitioned by (
    day string, 
    hour string)
  row format delimited
    fields terminated by '\004'
    collection items terminated by '\001'
    map keys terminated by '\002'
  stored as textfile
  location 's3://log-bucket/log-name/year=2011/month=12/'
;
alter table sample_data recover partitions ;

Now Hive only needs to recover 7-8 hundred partitions.  We use a similar strategy for our daily reporting, which makes recovering partition data even faster.

The only time this scheme breaks down is when the report boundaries don't align with the partition boundaries.  In these cases, you can still get the benefits of partitioning by manually adding the partition information to the table.  For example, to do efficient queries on a week of data, we would replace the "recover partitions" statement with a sequence of statements like these (and tweak the table definition to use only a single partition column):

alter table sample_data add partition(day_hour=2011120600) location "s3://log-bucket/log-name/year=2011/month=12/day=06/hour=00/";

3. Use S3 multipart uploads for large objects.

While S3, Elastic MapReduce, and Hive theoretically allow you easily scale your data storage and analytics, we regularly run up against operational limits as our processing requirements grow.  One surprising problem we recently ran into was S3 throttling EMR because our cluster was accessing the source data too quickly.  After some back-and-forth with support, they suggested a workaround of uploading the source objects (which were multiple GB in size and created by a previous EMR job) with multi-part uploads.

Enabling multi-part uploads in EMR is simply a matter of flipping a configuration switch in the cluster configuration.  When starting a cluster from the command line, simply add the following options (taken from the linked documentation):

elastic-mapreduce --create --alive \
--bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-hadoop \
--bootstrap-name "enable multipart upload" \
--args "-c,fs.s3n.multipart.uploads.enabled=true, \
-c,fs.s3n.multipart.uploads.split.size=524288000"

4. Spot instances can dramatically reduce your data costs.

As our data processing requirements have grown, so have our cluster sizes.  We've found that we can reduce our costs by over 50% by using a mix of on-demand and spot instances.

Each Elastic MapReduce job can have up to three instance groups: master, core, and task.  All data resides on the master and core instances, so jobs can use spot instances to add processing power and additional IO with very little risk.  You can add these instances at startup by simply specifying a bid price:

elastic-mapreduce --create --alive --hive-interactive --hive-versions 0.7 \
--instance-group master --instance-type m1.xlarge --instance-count 1 \
--instance-group core --instance-type m1.xlarge --instance-count 9 \
--instance-group task --instance-type m1.xlarge --instance-count 90 --bid-price 0.25 

For ad hoc queries, I've also found it easy to prototype my job on a subset of data using only the master and core instances, then add the task instances once I'm satisfied that the query is correct.  New task instances are available as Hadoop workers as soon as they're added to the cluster, even if you're already running your MapReduce job.  You can modify the size of the task group using the command-line client:

elastic-mapreduce --jobflow j-xxxxxxxxxxxxx --add-instance-group task  --instance-type m1.xlarge --instance-count 90 --bid-price 0.25

elastic-mapreduce --jobflow j-xxxxxxxxxxxxx --set-num-task-group-instances 90

Note that while the size of the task group can be changed, you cannot modify the instance type or the bid price after the task group is created.

4 comments:

James Holcomb said...

Some really great tips, thanks for posting. What kind of reporting latency are you seeing, for example in your 2011-12 monthly view (and how much data storage is required).

James

Darren Lee said...

We try to keep our report runtimes under a few hours; in our experience, it's often cheaper to increase the number of machines (especially spot instances) then to let a smaller number of instances run for many hours, so throwing more instances at the problem is win-win.

It's actually uncommon for us to use the monthly view; our strategy for monthly reports is usually to do daily rollups of some sort, then aggregate the rollups at the end of the month. I haven't actually run the numbers in a while, but my off-the-cuff guess is that using an entire month of data be somewhere in the 10-100 terabytes range, depending on which services are involved.

g3t r00t said...

In the past, I have explicitly specified column names & data types when creating hive table for importing access logs. If you create a table with the following statement as mentioned in tip 2, how do you query for individual columns?

create external table sample_data_2011_12(d map)

Darren Lee said...

You can access the members of a map using square brackets. eg,

select d["timestamp"], d["id"] from sample_data_2011_12 ;