Monday, December 19, 2011

Get Your Company To Blog More With A Game

At Bizo we try to blog fairly often. But writing blog posts with any degree of frequency at a startup is tough - there are often ten other important tasks that needed to be done yesterday. Finding the time to sit down and write a post when you have features to build, code to review and the occasional meeting is difficult to say the least.

We needed something that would encourage us to blog more frequently, and there's no better way to motivate a bunch of engineers than a game. So during our last Open Source Day we built a Blog Scoreboard that ranks authors based on the number of posts and comments they have. It's setup on our office big screen tv serving as constant reminder that Larry is by a landslide, the king of blogging. For a live demo of the scoreboard and to see just how much Larry is dominating us, checkout the live example here.

Currently we just optimize towards the volume of posts so the points are currently assigned as: 10 points per blog post and 1 point per comment. The weighting scheme is intentionally naive due since this had to be built in a few hours and will undoubtably change as time goes on

The best part about our scoreboard is that it is an open source sinatra (ruby) app, and it works with any Blogger blog! All you have to do is edit a few lines of YAML and you'll have your very own big screen blog scoreboard. You can grab the source code and install instructions on github: Happy blogging!

Thursday, December 15, 2011

4 tips from the trenches of Amazon Elastic MapReduce and Hive

Some things you can learn from others, and some things you can only learn from experience.  In an effort to move some knowledge from the latter category into the former, here are four things we've learned the hard way while working with big data flows using Hive on Amazon's Elastic MapReduce:

1. To prevent data loss, make sure Hive knows if it owns the data.

All of our data processing follows a simple pattern: servers write logs to S3, and we use the basic EMR/S3 integration to read this data in our Hive scripts.  A typical table definition could look something like this:

create external table sample_data(d map<string,string>)
  comment 'data logs uploaded from servers'
  row format delimited
    fields terminated by '\004'
    collection items terminated by '\001'
    map keys terminated by '\002'
  stored as textfile
  location 's3://log-bucket/hive/data/files/sample_data/'

(The single map column is our way of supporting dynamic columns in Hive.)

The most easily overlooked part of this table definition is the "external" keyword, which tells Hive that the actual underlying data is managed by some other process.  If you forget to add this keyword and later issue a "drop table" command, Hive will happily nuke all of your log files.

This can be especially troublesome while doing ad hoc analysis, as these usually involve interactive queries in the Hive console with a workflow that often involves an exploratory style of development that includes deleting and recreating tables.

One common use pattern where dropping tables appears is when running scripts that include a create statement.  Normally, trying to recreate a table that already exists in Hive will cause an error, so a script may preemptively issue a drop command in case that table already exists.  An alternative is to change the table definition to tell Hive to ignore create statements for preexisting tables:

create external table if not exists sample_data(d map<string,string>)

In the event that you accidentally forget to specify "external" in your table definition, you can add it later by altering the table:

alter table sample_data set tblproperties ('EXTERNAL'='TRUE') ;

Note that the capitalization in the table properties is significant.  Additionally, this feature is only available in Hive 0.6.0+.  (It was present but buggy prior to 0.6.0.)

2. Use a smart partitioning scheme to create flexible views of the data.

Hive supports partitioning, which can be a huge performance win when querying only a subset of the entire dataset.  This is critical when the entire dataset consists of years of request information but you're only interested in analyzing one day of traffic.  To take advantage of this, we upload our data into hourly paths in S3:


We would then load this data with the following statements:

create external table sample_data(d map<string,string>)
  comment 'data logs uploaded from servers'
  partitioned by (
    year string, 
    month string, 
    day string, 
    hour string)
  row format delimited
    fields terminated by '\004'
    collection items terminated by '\001'
    map keys terminated by '\002'
  stored as textfile
  location 's3://log-bucket/log-name/'
alter table sample_data recover partitions ;

The year/month/day/hour partitions are now available to be used in select/where statements just like columns.

We could (and originally did) just use a single partition column for the entire date; however, using multiple partition columns allows Hive to complete ignore the presence of other partitions.  The above statement will need to recover metadata for 8-9 thousand partitions of data, which (while less troublesome than not partitioning at all) will still require a lot of time and memory.  Multiple partition columns lets us create (for example) a "view" of a single month:

create external table sample_data_2011_12(d map<string,string>)
  comment 'data logs uploaded from servers'
  partitioned by (
    day string, 
    hour string)
  row format delimited
    fields terminated by '\004'
    collection items terminated by '\001'
    map keys terminated by '\002'
  stored as textfile
  location 's3://log-bucket/log-name/year=2011/month=12/'
alter table sample_data recover partitions ;

Now Hive only needs to recover 7-8 hundred partitions.  We use a similar strategy for our daily reporting, which makes recovering partition data even faster.

The only time this scheme breaks down is when the report boundaries don't align with the partition boundaries.  In these cases, you can still get the benefits of partitioning by manually adding the partition information to the table.  For example, to do efficient queries on a week of data, we would replace the "recover partitions" statement with a sequence of statements like these (and tweak the table definition to use only a single partition column):

alter table sample_data add partition(day_hour=2011120600) location "s3://log-bucket/log-name/year=2011/month=12/day=06/hour=00/";

3. Use S3 multipart uploads for large objects.

While S3, Elastic MapReduce, and Hive theoretically allow you easily scale your data storage and analytics, we regularly run up against operational limits as our processing requirements grow.  One surprising problem we recently ran into was S3 throttling EMR because our cluster was accessing the source data too quickly.  After some back-and-forth with support, they suggested a workaround of uploading the source objects (which were multiple GB in size and created by a previous EMR job) with multi-part uploads.

Enabling multi-part uploads in EMR is simply a matter of flipping a configuration switch in the cluster configuration.  When starting a cluster from the command line, simply add the following options (taken from the linked documentation):

elastic-mapreduce --create --alive \
--bootstrap-action s3://elasticmapreduce/bootstrap-actions/configure-hadoop \
--bootstrap-name "enable multipart upload" \
--args "-c,fs.s3n.multipart.uploads.enabled=true, \

4. Spot instances can dramatically reduce your data costs.

As our data processing requirements have grown, so have our cluster sizes.  We've found that we can reduce our costs by over 50% by using a mix of on-demand and spot instances.

Each Elastic MapReduce job can have up to three instance groups: master, core, and task.  All data resides on the master and core instances, so jobs can use spot instances to add processing power and additional IO with very little risk.  You can add these instances at startup by simply specifying a bid price:

elastic-mapreduce --create --alive --hive-interactive --hive-versions 0.7 \
--instance-group master --instance-type m1.xlarge --instance-count 1 \
--instance-group core --instance-type m1.xlarge --instance-count 9 \
--instance-group task --instance-type m1.xlarge --instance-count 90 --bid-price 0.25 

For ad hoc queries, I've also found it easy to prototype my job on a subset of data using only the master and core instances, then add the task instances once I'm satisfied that the query is correct.  New task instances are available as Hadoop workers as soon as they're added to the cluster, even if you're already running your MapReduce job.  You can modify the size of the task group using the command-line client:

elastic-mapreduce --jobflow j-xxxxxxxxxxxxx --add-instance-group task  --instance-type m1.xlarge --instance-count 90 --bid-price 0.25

elastic-mapreduce --jobflow j-xxxxxxxxxxxxx --set-num-task-group-instances 90

Note that while the size of the task group can be changed, you cannot modify the instance type or the bid price after the task group is created.

Tuesday, December 13, 2011

Promises in Javascript/Coffeescript

This happens often. You’re humming along writing some awesome Javascript code. At first everything is neat and organized. Then you add a feature here, an AJAX call there and before your once lovely codebase has turned into callback spaghetti.

One scenario where this can become particularly nasty is when you have an arbitrary number of asynchronous tasks, one of which cant run until all the others have completed. Suppose for example you are creating a page that displays information on your organization’s engineering team and open source projects hosted on Github. This might require a few api calls to Github (eg. one to get the repositories, another to get the team members) that can be executed in parallel. We could just append the information to the DOM as each api call finishes, but it would be nice to avoid pieces of the site popping in at different times. Instead it would be ideal if we made the api calls to Github and only after both api calls had completed would we render the information to the DOM. The problem is that in Javascript trying to implement this can get quite messy.

A Bad Solution

The issue here is knowing when it is safe to execute the task that renders the Github information to the page. One strategy might be to store the results from each api call in an array and then wait for a set amount of time before drawing the page.

results = []

githubApiCallOne (response) -> 

githubApiCallTwo (response) ->

setTimeout(() ->
, 5000)

This turns out to be a poor solution as we can either end up waiting too little, in which case the program would fail or we could set the timeout too high making our users wait longer than necessary to load our page.

Promises to The Rescue

Fortunately there is a better way to implement our Github page. Using a construct called a Promise (sometimes called a Future) allows us to elegantly handle these types of situations. Using promises we can turn our code into something like this:

).then((apiCallOneData, apiCallTwoData) ->
  renderPage(apiCallOneData, apiCallTwoData)

The basic idea is that our async api calls will now return a promise object that functions much like an IOU – they can’t give us the results of the api call immediately but they (probably) can at some time in the future. The Promise.when method takes an arbitrary number of promise objects as parameters and then executes the callback in the “then” method once every promise passed to “when” has been completed.

To do this, our api calls would have to be modified to return promise objects, which turns out to be trivial. Such an implementation might look like so:

githubApiCallOne = () ->
  promise = new Promise()

  # async call
  ajaxGet("/repositories", (repository_data) ->
    # fulfill the promise when async call completes

  return promise

githubApiCallTwo = () ->
  promise = new Promise()

  ajaxGet("/users", (user_data) ->

  return promise

The githubApiCallOne and githubApiCallTwo make their ajax calls but return a promise object immediately. Then when the AJAX calls complete, they can fulfill the promise objects by calling “complete” and passing in their data. Once both promise objects have been fulfilled the callback passed to Promise.then is executed and we render the page. With jQuery

The good news is if you’re already using jQuery you get Promises for free. As of jQuery 1.5 all the $.ajax methods (eg. $.get, $.post etc) return promises which allows you to do this:

promise1 = $.get ""
promise2 = $.post ""

$.when(promise1, promise2)
 .then (promise1Result, promise2Result) ->
  # do something with the data

What if I cant use jQuery?

Rolling a custom implementation of Promises isn’t recommended for production code but might be necessary if you write a lot of 3rd party Javascript and/or just want to try it for fun. Here’s a very basic implementation to get you started. Error handling, exceptions etc are left as an exercise to the reader.

class Promise
  @when: (tasks...) ->
    num_uncompleted = tasks.length 
    args = new Array(num_uncompleted)
    promise = new Promise()

    for task, task_id in tasks
      ((task_id) ->
        task.then(() ->
          args[task_id] =
          promise.complete.apply(promise, args) if num_uncompleted == 0

    return promise
  constructor: () ->
    @completed = false
    @callbacks = []

  complete: () ->
    @completed = true
    @data = arguments
    for callback in @callbacks
      callback.apply callback, arguments

  then: (callback) ->
    if @completed == true
      callback.apply callback, @data

    @callbacks.push callback

Sharp eyed readers might notice that the code inside the for loop in the Promise.when method looks a bit strange. You might notice that I’m wrapping the promise’s “then” method call inside of a self executing function that passes in the task_id variable. This funkiness is actually required due to the way that closures work in Javascript. If you attempt to reference the task_id without the self executing closure, you’ll actually get a reference to the task_id iterator instead of a copy – which means by the time your “then” methods execute the loop will have finished iterating and all the task_ids will share the same value! To get around this you have to create a new scope and pass in the iterator so we end up with a copy of the value instead of a reference.

And Finally an example using the supplied Promise class to prove it works:

delay = (string) ->
  promise = new Promise()
  setTimeout(() -> 
    promise.complete string
  return promise

logEverything = (fooData, barData, bazData) -> 
  console.log fooData[0], barData[0], bazData[0]

window.onload = () ->
  ).then logEverything