Thursday, April 21, 2011

How Bizo survived the Great AWS Outage of 2011 relatively unscathed...

The twittersphere, techblogs and even some business sites are a buzz with the news that the US East Region of AWS has been experiencing a major outage. This outage has taken down some of the most well known names on the web. Bizo's infrastructure is 100% AWS and we support 1000s of publisher sites (including some very well know business sites) doing billions of impressions a month. Sure, we had a few bruises early yesterday morning when the outage first began, but soon after then we've been operating our core, high volume services on top of AWS but without the East region.


Here is how we have remained up despite not having a single ops person on our engineering team:


1) Our services are well monitored
We rely on pingdom for external verifcation of site availability on a world wide basis. Additionally, we have our own internal alarms and dashboards that give us up to the minute metrics such as request rate, cpu utilization etc. Most of this data comes from AWS Cloudwatch monitoring but we also track error ratesand have alarms setup to alert us when these rates change or go over a certain threshold.


2) Our services have circuit breakers between remote services that trip when other services become unavailable and we heavily cache data
When building our services, we always assume that remote services will fail at some point. We've spend a good deal of time investing in minimizing the domino effect of a failing remote service. When a remote service becomes unavailable the caller detects this and will go into tripped mode occasionally retrying with backoffs. Of course we also rely on caching read-only data heavily and are able to take advantage of the fact that the data needed for most of our services does not change very often.


3) We utilize autoscaling
One of the promises of AWS is the ability to start and stop more servers based on traffic and load. We've been using autoscaling since it was launched and it worked like a charm. You can see the instances starting up based on the new load in the US West region as traffic was diverted over from US East.


(all times UTC)


4) Our architecture is designed to let us funnel traffic around an entire region if necessary
We utilize Global Load Balancing to direct traffic to the closest region based on the end-user's location. For instance, if a user is in California, wedirect their traffic to the US West region. This was extremely valuable in keeping us fully functioning in the face of a regional outage. When we finally decided that the US East region was going to cause major issues, switching all traffic to US West was as easy as clicking a few buttons. You can see how the requests transitioned over quickly after we made the decision. (By the way, quick shout-out to Dynect who is our GSLB service provider. Thanks!)


(all times UTC)


Bumps and Bruises
Of course we didn't escape without sustaining some issues. We'll do another blog post on some of the issues we did run into but they were relatively minor.


Conclusion
After 3 years running full time on AWS across 4 regions and 8 availability zones we design our systems with the assumption that failure will happen and it helped us come through this outage relatively unscathed.

Monday, April 18, 2011

Command Query Responsibility Segregation with S3 and JSON

We recently tackled a problem at Bizo where we wanted to decouple our high-volume servers from our MySQL database.

While considering different options (NoSQL vs. MySQL, etc.), in retrospect we ended up implementing a SOA-version of the Command Query Separation pattern (or Command Query Responsibility Segregation, which is services/messaging-specific).

Briefly, in our new approach, queries (reads) use an in-memory cache that is bulk loaded and periodically reloaded from a snapshot of the data stored as JSON in S3. Commands (writes) are HTTP calls to a remote JSON API service. MySQL is still the authoritative database, we just added a layer of decoupling for both reads and writes.

This meant our high-volume servers now have:

  • No reliance on MySQL availability or schema
  • No wire calls blocking the request thread (except a few special requests)

The rest of this post explains our context and elaborates on the approach.

Prior Approach: Cached JPA Calls

For context, our high-volume servers rely on configuration data that is stored in a MySQL database. Of course, the configuration data that doesn’t have to be absolutely fresh, so we’d already been using caching to avoid constantly pounding the database for data that rarely changes.

There were several things we liked about this approach:

  • We use Amazon RDS for the MySQL instance, which provides out-of-the-box backups, master/slave configuration, etc., and is generally a pleasure to use. We enjoy not running our own database servers.

  • We also have several low-volume internal and customer-facing web applications that maintain the same data and are perfectly happy talking to a SQL database. They are normal, chatty CRUD applications for which the tool support and ACID-sanity of a SQL database make life a lot easier.

That being said, we wanted to tweak a few things:

  • Reduce the high-volume servers’ reliance on MySQL for seeding their cache.

    Although RDS is great, and definitely more stable than our own self-maintained instances would be, there are nonetheless limits on its capacity. Especially if one of our other application misbehaves (which has never happened…cough), it can degrade the MySQL instance to the point of negatively affecting the high-volume servers.

  • Reduce cache misses that block the request thread.

    Previously, configuration data (keyed by a pre-request configuration id) was not pulled into cache until it was needed. The first request (after every cache flush) would reload the data for it’s configuration id from MySQL and repopulate the cache.

    While not initially a big deal, as Bizo has grown, we’re now running in multiple AWS regions, and cache misses require a cross-region JDBC call to fetch their data from the MySQL server running in us-east.

Illustrated in code, our approach had, very simplified, been:

class TheServlet {
  public void doGet() {
    int configId = request.getParameter("configId");
    Config config = configService.getConfig(configId);
    // continue processing with config settings
  }
}

class ConfigService {
  // actually thread-safe/ehcache-managed, flushed every 30m
  Map<Integer, Config> cached = new HashMap<Integer, Config>();

  public Config getConfig(int configId) {
    Config config = cached.get(configId);
    if (config == null) {
      // hit mysql for the data, blocks the request thread
      config = configJpaRepository.find(configId);
      // cache it
      cached.put(configId, config);
    }
    return config;
  }
}

Potential “Big Data” Approaches

Given our primary concern was MySQL being a single point of failure, we considered moving to a new database platform, e.g. SimpleDB, Cassandra, or the like, all of which can scale out across machines.

Of course, RDS’s master/slave MySQL setup already reduces its risk of single machine point of failure, but the RDS master/slave cluster as a whole is still, using the term loosely, a “single point”. Granted, with this very loose definition, there will always be some “point” you rely on–we just wanted one that we felt more comfortable with than MySQL.

Anyway, for NoSQL options, we couldn’t get over the cons of:

  • Having to run our own clusters (except for SimpleDB).

  • Having to migrate our low-volume CRUD webapps over to the new, potentially slow (SimpleDB), potentially eventually-consistent (Cassandra) NoSQL back-end.

  • Still having cache misses result in request threads blocking on wire calls.

Because of these cons, we did not put a lot of effort into researching NoSQL approaches for this problem–we felt it was fairly apparent they weren’t necessary.

Realization: MySQL is Fine, Fix the Cache

Of course, we really didn’t have a Big Data problem (well, we do have a lot of those, but not for this problem).

We just had a cache seeding problem. Specifically:

  • All of our configuration data can fit in RAM, so we should be able to bulk-load all of it at once–no more expensive, blocking wire calls on cache misses (basically there are no cache misses anymore).

  • We can load the data from a more reliable, non-authoritative, non-MySQL data store–e.g. an S3 snapshot (config.json.gz) of the configuration data.

    The S3 file then basically becomes our alternative “query” database in the CQRS pattern.

When these are put together, a solution emerges where we can have a in-memory, always-populated cache of the configuration data that is refreshed by a background thread and results in request threads never blocking.

In code, this looks like:

class TheServlet {
  public void doGet() {
    // note: no changes from before, which made migrating easy
    int configId = request.getParameter("configId");
    Config config = configService.getConfig(configId);
    // continue processing with config settings
  }
}

class ConfigService {
  // the current cache of all of the config data
  AtomicReference<Map> cached = new AtomicReference();

  public void init() {
    // use java.util.Timer to refresh the cache
    // on a background thread
    new Timer(true).schedule(new TimerTask() {
      public void run() {
        Map newCache = reloadFromS3("bucket/config.json.gz");
        cached.set(newCache);
      }
    }, 0, TimeUnit.MINUTES.toMillis(30));
  }

  public Config getConfig(int configId) {
    // now always return whatever is in the cache--if a
    // configId isn't present, that means it was not in
    // the last S3 file and is treated the same as it
    // not being in the MySQL database previously
    Map currentCache = cached.get();
    if (currentCache == null) {
      return null; // data hasn't been loaded yet
    } else {
      return currentCache.get(configId);
    }
  }

  private Map reloadFromS3(String path) {
    // uses AWS SDK to load the data from S3
    // and Jackson to deserialize it to a map
  }
}

A Few Wrinkles: Real-Time Reads and Writes

So far I’ve only talked about the cached query/reads side of the new approach. We also had two more requirements:

  • Very (very) infrequently, a high-volume server will need real-time configuration data to handle a special request.

  • The high-volume servers occasionally write configuration/usage stats back to the MySQL database.

While we could have continued using a MySQL/JDBC connection for these few requests, this also provided the opportunity to build a JSON API in front of the MySQL database. This was desirable for two main reasons:

  • It decoupled our high-volume services from our MySQL schema. By still honoring the JSON API, we could upgrade the MySQL schema and the JSON API server at the same time with a much smaller, much less complicated downtime window than with the high-volume services talking directly to the MySQL schema.

  • The MySQL instance is no longer being accessed across AWS regions, so can have much tighter firewall rules, which only allow the JSON API server (that is within its same us-east region) access it.

The new setup looks basically like:

Scalatra Servlet Example

With Jackson and Scalatra, the JSON API server was trivial to build, especially since it could reuse the same JSON DTO objects that are also serialized in the config.json.gz file in S3.

As an example for how simple Jackson and Scalatra made writing the JSON API, here is the code for serving real-time request requests:

class JsonApiService extends ScalatraServlet {
  get("/getConfig") {
    // config is the domain object fresh from MySQL
    val config = configRepo.find(params("configId").toLong)
    // configDto is just the data we want to serialize
    val configDto = ConfigMapper.toDto(configDto)
    // jackson magic to make json
    val json = jackson.writeValueAsString(configDto)
    json
  }
}

Background Writes

The final optimization was realizing that, when the high-volume servers have requests that trigger stats to be written to MySQL, for our requirements, these writes aren’t critical.

This means there is no need to perform them on the request-serving thread. Instead, we can push the writes onto a queue and have it fulfilled by a background thread.

This generally looks like:

class ConfigWriteService {
  // create a background thread pool of (for now) size 1
  private ExecutorService executor = new ThreadPoolExector(...);

  // called by the request thread, won't block
  public void writeUsage(int configId, int usage) {
    offer("https://json-api-service/writeUsage?configId=" +
      configId +
      "&usage=" +
      usage);
    }
  }

  private void offer(String url) {
    try {
      executor.submit(new BackgroundWrite(url));
    } catch (RejectedExecutionException ree) {
      // queue full, writes aren't critical, so ignore
    }
  }

  private static class BackgroundWrite implements Runnable {
    private String url;

    private BackgroundWrite(String url) {
      this.url = url;
    }

    public void run() {
      // make call using commons-http to url
    }
  }
}

tl;dr We Implemented Command Query Responsibility Segregation

With changing only a minimal amount of code in our high-volume servers, we were able to:

  • Have queries (most reads) use cached, always-loaded data is that periodically reloaded from data snapshots in S3 (a more reliable source than MySQL)

  • Have commands (writes) sent from a background-thread to a JSON API that saves the data to MySQL and hides JDBC schema changes.

For this configuration data, and our current requirements, MySQL, augmented with a more aggressive, Command Query Separation-style caching schema, has and continues to work well.

For more reading on CQS/CQRS, I suggest:

  • Both the Wikipedia article and Martin Fowler’s CommandQuerySeparation, however they focus on CQS as applied to OO, e.g. side-effect free vs. mutating method calls.

  • For CQS applied to services, e.g. CQRS, Udi Dahan seems to be one of the first advocates of the term. Since then, CQRS even seems to have it’s own site and google group.

Friday, April 15, 2011

Crowdflower Trickery: dynamic tasks

Here at Bizo, we often use crowdflower to improve the quality of our data. In doing so, we’ve come across some cool, but under-documented, tricks. One trick that we’ve particularly found useful is using liquid for designers to dynamically generate crowdflower tasks. Let us take a look at how to do this with a toy example.

Problem:
We are given a list of deserts from a bakery as shown in data.csv bellow . Our task is to determine the following:

1. If the desert is a cake, is it appropriate for:
  • Wedding events
  • Birthday events
  • Casual eating
2. If the desert is NOT a cake, is it appropriate for:
  • eating using hands
  • eating using forks
  • eating using spoons
file: data.csv
"id","name","desert_type"
0,"Peanut Butter Cake","cake"
1,"Strawberry Donut","donut"
2,"Cookies and crème ice cream cake","cake"
3,"Apple Strudle","strudle"
4,"Chocolate Pie","pie"
5,"Red Velvet Cake","cake"


Initial Approach:
One approach is to create two separate jobs: one which will ask workers questions relevant to cakes and one which will ask the workers questions relevant to non-cake deserts. To create these jobs we would first have to break up our data into two data files: one containing data only for cakes (see cakes.csv) and one for everything else (see non-cakes.csv). Then, we would have to specify the appropriate cml code for each job. For our example, the data and cml code will look like:

Job 1: Cake deserts
file: cake.csv
"id","name","desert_type"
0,"Peanut Butter Cake","cake"
2,"Cookies and crème ice cream cake","cake"
5,"Red Velvet Cake","cake"

file: cake.cml
Desert: {{name}}
<cml:checkboxes label="This desert appropriate for">
<cml:checkbox label="wedding events"/>
<cml:checkbox label="birthday events"/>
<cml:checkbox label="casual eating"/>
</cml:checkboxes>

Job 2: Non-cake deserts
file: non-cake.csv
"id","name","desert_type"
1,"Strawberry Donut","donut"
3,"Apple Strudle","strudle"
4,"Chocolate Pie","pie"

file: non-cake.cml
Desert: {{name}}
<cml:checkboxes label="This desert appropriate for">
<cml:checkbox label="eating using hands"/>
<cml:checkbox label="eating using forks"/>
<cml:checkbox label="eating using spoons"/>
</cml:checkboxes>


Approach using Liquid:
Using liquid, we can solve the same problem with just one job rather than two. To do so, we simply embed liquid logic tags into our cml code to dynamically display tasks to users -- the appropriate checkboxes will appear based on the "desert_type" field. Furthermore, we do not need to break up the data.csv file.

cml code
Desert: {{name}}
<cml:checkboxes label="This desert is appropriate for">
{% if desert_type=='cake' %}
<cml:checkbox label="wedding events"/>
<cml:checkbox label="birthday events"/>
<cml:checkbox label="casual eating"/>
{% else %}
<cml:checkbox label="eating using hands"/>
<cml:checkbox label="eating using forks"/>
<cml:checkbox label="casual using spoons"/>
{% endif %}
</cml:checkboxes>

Thursday, April 14, 2011

Hive Unit Testing

Introduction

Hive has become an extremely important component in our overall software stack. We have numerous ‘mission-critical’ reports that are generated using Hive and want to make sure we can apply our testing processes to Hive scripts in the same way that we apply them to other code artifacts.

A few weeks ago, I was tasked with finding an approach for unit testing our Hive scripts. To my surprise, a Google search for ‘Hive Unit Testing’ yielded relatively few useful results.

I wanted a solution that would allow us to test locally (vs. a solution that would require EMR). Where possible, I prefer local testing because it’s simpler, provides more immediate feedback, and doesn’t require a network.

After reading this post, you will (hopefully) know how to run Hive unit tests in your own environment.

The Approach

After performing some research, I decided on an approach that is part of the Hive project itself.  At a high level, the solution works in the following way:
  • Start up an instance of the Hive CLI
  • Execute a Hive script (positive or negative case)
  • Compare the output (from the CLI) of the script compared to an expected output file
  • Rinse and repeat
The rest of this post discusses the specific steps required to get this solution running in your own environment.

Set up Hive Locally

The first step is to create some Ant tasks for setting up Hive locally. Here’s a snippet of Ant that shows how to do this:


You should now be able to execute ‘ant hive.init’ and have Hive available in the tools directory.

Generate test cases

The developer is responsible for providing the .q Hive files that represent the test cases. There is a code generation step that will create JUnit classes (one for positive test cases, one for negative test cases) given a set of .q files. The Ant snippet below shows how to generate the test classes:


Here are some notes about the key variables above:
  • hive.test.template.dir - the directory where the velocity templates are located for the code generation step.
  • target.hive.positive.query.dir - the directory where positive test cases are located.
  • target.hive.negative.query.dir - the directory where negative test cases are located.
  • hive.positive.results.dir - the directory where expected positive test results are located. The name of this file must be the name of the query file appened by ‘.out’. For example, if the test query file is named hive_test.q then the results file must be named hive_test.q.out.
  • hive.negative.results.dir - the directory where expected negative test results are located.
  • qfile - This variable should be specified if you want to generate a test class with a single test case. For example, if you have a test file named hive_test.q, then you would set the value of this property to hive_test (e.g. ant -Dqfile=hive_test hive.gen.test).
  • qfile_regex - Similar in functionality to qfile, this variable should be set to a regular expression that will match the test files that you want to generate tests for.
The test classes are generated from velocity template files. You can find examples of the template from the Hive codebase here: https://github.com/apache/hive/blob/trunk/ql/src/test/templates/TestCliDriver.vm
https://github.com/apache/hive/blob/trunk/ql/src/test/templates/TestNegativeCliDriver.vm

The above files can basically be used as-is, but you will need to provide your own Test Helper class, QTestUtil, and update its package location accordingly in the templates.

QTestUtil

QTestUtil contains code for:
  • starting up hive
  • executing a query file
  • comparing the results to expected results
  • running cleanup between tests
  • shutting down hive
You can find the one from the Hive project here: https://github.com/apache/hive/blob/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java

The main modifications you will want to make to this file are deletions as there is some Hive project specific set up code that you will not need in your environment.

Executing the tests

After you have generated the tests, you can execute them by creating a target with the junit task. Here is some sample Ant for doing this:



Conclusion

This post outlined a solution for unit testing Hive scripts. Another nice aspect of this approach that I failed to mention is that it’s based on JUnit so you can use your existing code coverage tools with it (we use Cobertura) to get coverage information when testing custom UDFs. Also, I should mention that I used Hive 0.6.0 when putting this together.

Wednesday, April 6, 2011

Hive 0.7 no longer auto-downloads transform scripts

I ran into a bit of a surprise moving a Hive 0.5 script to Hive 0.7 the other day.

Previously, in Hive 0.5, we called our Java transform code like:

insert overwrite table the_table
select
transform(...)
using 'java -cp s3://bucket-name/code.jar MapperClassName'

Behind the scenes, before actually calling the "java" executable, Hive would inspect each of the arguments and, if it found an "s3://..." URL, download that file from S3 to a local copy, and then pass the path to the local copy to your program.

This was convenient as then your external "java" executable didn't have to know anything about S3, how to authenticate with it, etc.

However, in Hive 0.7, this no longer works. Perhaps for the understandable reason that if you did want to pass the literal string "s3://..." to your mapper class, Hive implicitly interjecting on your behalf may not be what you want, and, AFAIK, you had no way to avoid it.

So, now an explicit "add file" command is required, e.g.:

add file s3://bucket-name/code.jar
insert overwrite table the_table
select
transform(...)
using 'java -cp code.jar MapperClassName'

The add file command downloads code.jar to the local execution directory (without any bucket name/path mangling like in Hive 0.5), and then your transform arguments can reference the local file directly.

All in all, a pretty easy fix, but rather frustrating to figure out given the long cycle time of EMR jobs.

Also, kudos to this post in the AWS developer forums that describes the same problem and solution: