Wednesday, January 23, 2013

Asanban: Lean Development with Asana and Kanban

On Bizo's External Apps team (aka. 'xapps'), we've been using a Kanban system to manage our work.  All of Bizo Engineering uses Asana to track tasks, which isn't specifically designed for Kanban.  We've settled on a set of of conventions that we use in Asana which enable our Kanban system.  These conventions also help us to track metrics like the average lead time from month to month.


Kanban is a second-generation Agile software development methodology.  The focus is on finding and fixing bottlenecks, as well as removing waste by limiting work-in-progress.  (The "WIP" limits referenced in this post are the number of work items that are allowed to be in a particular stage of the system at one time.)  Adopting a Kanban system has made things easier for engineers, increased efficiency, and is very popular with our Product Management folks as well.  We are now focused on delivering value incrementally rather than specifying and implementing larger chunks of work.  If you're interested in adopting Kanban, I recommend reading David Anderson's seminal book on the topic: Kanban: Successful Evolutionary Change for Your Technology Business


Each stage of work in our value chain is a priority heading in Asana.  The name of the priority header follows the convention: "{STEP NAME} ({WIP}):", eg. "Dev Ready (10):".  The steps that are earliest in the value chain are at the bottom of our Asana project, with tasks moving upwards through each stage until they reach "Production (15):" at the top when the functionality described by the task has been delivered to production.  Once Product Management has verified that the functionality described by a task is functioning correctly in production, they mark the task as complete.  We use tags to represent work item types, although its fairly limited at present.


One of the most basic metrics to track in a Kanban system is the average amount of lead time (the time it takes from when a task gets added to the input queue until value is delivered).  I have created some tooling that allows us to accomplish this systematically.  I'll first describe what it does, and then how you can use it.

The first piece of the tooling bulk loads task data from the Asana API into MongoDB.  The API returns JSON and I just store JSON as-is in MongoDB, which works out well since MongoDB speaks JSON natively.  One hiccup is that once tasks are archived in Asana, you can no longer obtain information about them through the API.  Accordingly, the bulk load needs to be scheduled to run on a regular basis (in our case, every night) so that we don't lose information about archived tasks.  Furthermore, we have a policy that tasks should not be archived until they have been completed for at least 24 hours, so that the bulk loader will always run at least once after a task has been completed before it gets archived.  After loading the task data, the bulk loader will create data describing how much time each task spent in each state, as well as how long each task took (in days) to complete from start to finish (lead time).

The other piece is a Sinatra web service that runs a map-reduce against the lead time data created by the bulk loader and serves lead times by month as JSON.  It can also aggregate by year or day (but I don't think aggregating by day is useful).

I have packaged up both of those pieces into a gem called "asanban", which you can use.   The source code and instructions for installation and usage are here:

Pain Points

There are a couple of problems I've run into using Asana with a Kanban system.  The first is that there's no way to enforce WIP limits.  Users just have to be mindful of the limits shown in the priority headers.  I have been thinking about writing a nightly report that uses the data created by the bulk loader to find violated WIP limits and send out emails, but I haven't gotten to it yet.  (This tooling is essentially a hack day project at this point.)  There is also no functionality to facilitate different classes of service (SLAs and WIP break-downs), but maybe those could be supported using the same kind of nightly report.

Another problem I've run into is that task sizes can be all over the place, which reduces the meaningfulness of the metrics.  Some Kanban practitioners use hierarchical work items to address this kind variability in size.  Stories can be grouped into epics and/or broken down into "grains".  Asana does support sub-tasks, so I may recommend that we use those to break down large work items in the future, at which point the bulk loader would be modified to track metrics by sub-task (for tasks that have them, which would be assumed to be epics).

Next Steps

As we fine tune our Kanban process, we'll use these lead time metrics to verify that when we've made an adjustment (changing a WIP limit or adding a buffer, for example), that our performance improves.  I'd like to have more metrics so that we can have even better insight into our system in the future.  For example, I'd like to see the average time tasks spend in particular steps, the average amount of total WIP, as well as WIP in each step (shown over time) and failure load.

The first order of business moving forward on this will probably be an improved charting interface on top of the existing metrics.  Also, it would be nice if the bulk loader used a scheduling library so that folks don't have to manually schedule it in cron.  It also could use some automated tests!!!  As I mentioned previously, I've just been working on this on hack days, so if there's something you'd like to see done soon, well... pull requests will be gladly accepted! :)

Monday, January 21, 2013

What Makes Spark Exciting

At Bizo, we’re currently evaluating/prototyping Spark as a replacement for Hive for our batch reports.

As a brief intro, Spark is an alternative to Hadoop. It provides a cluster computing framework for running distributed jobs. Similar to Hadoop, you provide Spark with jobs to run, and it handles splitting up the job into small tasks, assigning those tasks to machines (optionally with Hadoop-style data locality), issuing retries if tasks fail transiently, etc.

In our case, these jobs are processing a non-trivial amount of data (log files) on a regular basis, for which we currently use Hive.

Why Replace Hive?

Admittedly, Hive has served us well for quite awhile now. (One of our engineers even built a custom “Hadoop on demand” framework for running periodic on-demand Hadoop/Hive jobs in EC2 several months before Amazon Elastic Map Reduce came out.)

Without Hive, it would have been hard for us to provide the same functionality, probably at all, let alone in the same time frame.

That said, it has gotten to the point where Hive is more frequently invoked in negative contexts (“damn it, Hive”) than positive.

Personally, I admittedly even try to avoid tasks that involve working with Hive. I find it to be frustrating and, well, just not a lot of fun. Why? Two primary reasons:

1. Hive jobs are hard to test

Bizo has a culture of excellence, and for engineering one of the things this means is testing. We really like tests. Especially unit tests, which are quick to run and enable a fast TDD cycle.

Unfortunately, Hive makes unit testing basically impossible. For several reasons:

  • Hive scripts must be run in a local Hadoop/Hive installation.

    Ironically, very few developers at Bizo have local Hadoop installations. We are admittedly spoiled by Elastic Map Reduce, such that most of us (myself anyway) wouldn’t even know how to setup Hadoop off the top of our heads. We just fire up an EMR cluster.

  • Hive scripts have production locations embedded in them.

    Both our log files and report output are stored in S3, so our Hive scripts end up with lots of “s3://” paths scattered throughout in them.

    While we do run dev versions of reports with “-dev” S3 buckets, still relying on S3 and raw log files (that are usually in a compressed/binary-ish format) is not conducive to setting up lots of really small, simplified scenarios to unit test each boundary case.

  • Hive scripts do not provide any abstraction–they are just one big HiveQL file. This means its hard to break up a large report into small, individually testable steps.

Despite these limitations, about a year ago we had a developer dedicate some effort to prototyping an approach that would run Hive scripts within our CI workflow. In the end, while his prototype worked, the workflow was wonky enough that we never adopted it for production projects.

The result? Our Hive reports are basically untested. This sucks.

2. Hive is hard to extend

Extending Hive via custom functions (UDFs and UDAFs) is possible, and we do it all the time–but it’s a pain in the ass.

Perhaps this is not Hive’s fault, and it’s some Hadoop internals leaking into Hive, but the various ObjectInspector hoops, to me, always seemed annoying to deal with.

Given these shortcomings, Bizo has been looking for a Hive-successor for awhile, even going so far as to prototype revolute, a Scala DSL on top of Cascading, but had not yet found something we were really excited about.

Enter Spark!

We had heard about Spark, but did not start trying it until being so impressed by the Spark presentation at AWS re:Invent (the talk received the highest rating of all non-keynote sessions) that we wanted to learn more.

One of Spark’s touted strengths is being able to load and keep data in memory, so your queries aren’t always I/O bound.

That is great, but the exciting aspect for us at Bizo is how Spark, either intentionally or serendipitously, addresses both of Hive’s primary shortcomings, and turns them into huge strengths. Specifically:

1. Spark jobs are amazingly easy to test

Writing a test in Spark is as easy as:

class SparkTest {
  def test() {
    // this is real code...
    val sc = new SparkContext("local", "MyUnitTest')
    // and now some psuedo code...
    val output = runYourCodeThatUsesSpark(sc)

(I will go into more detail about runYourCodeThatUsesSpark in a future post.)

This one liner starts up a new SparkContext, which is all your program needs to execute Spark jobs. There is no local installation required (just have the Spark jar on your classpath, e.g. via Maven or Ivy), no local server to start/stop. It just works.

As a technical aside, this “local” mode starts up an in-process Spark instance, backed by a thread-pool, and actually opens up a few ports and temp directories, because it’s a real, live Spark instance.

Granted, this is usually more work than you want to be done in an unit test (which ideally would not hit any file or network I/O), but the redeeming quality is that it’s fast. Tests run in ~2 seconds.

Okay, yes, this is slow compared to pure, traditional unit tests, but is such a huge revolution compared to Hive that we’ll gladly take it.

2. Spark is easy to extend

Spark’s primary API is a Scala DSL, oriented around what they call an RDD, or Resilient Distributed Dataset, which is basically a collection that only supports bulk/aggregate transforms (so methods like map, filter, and groupBy, which can be seen as transforming the entire collection, but no methods like get or take which assume in-memory/random access).

Some really short, made up example code is:

// RDD[String] is like a collection of lines
val in: RDD[String] = sc.textFile("s3://bucket/path/")
// perform some operation on each line
val suffixed = { line => line + "some suffix" }
// now save the new lines back out

Spark’s job is to package up your map closure, and run it against that extra large text file across your cluster. And it does so by, after shuffling the code and data around, actually calling your closure (i.e. there is no LINQ-like introspection of the closure’s AST).

This may seem minor, but it’s huge, because it means there is no framework code or APIs standing between your running closure and any custom functions you’d want to run. Let’s say you want to use SomeUtilityClass (or the venerable StringUtils), just do:

val in: RDD[String] = sc.textFile("s3://bucket/path/")
val processed = { line =>
  // just call it, it's a normal method call

Notice how SomeUtilityClass doesn’t have to know it’s running within a Spark RDD in the cluster. It just takes a String. Done.

Similarly, Spark doesn’t need to know anything about the code you use witin the closure, it just needs to be available on the classpath of each machine in the cluster (which is easy to do as part of your cluster/job setup, you just copy some jars around).

This seamless hop between the RDD and custom Java/Scala code is very nice, and means your Spark jobs end up reading just like regular, normal Scala code (which to us is a good thing!).

Is Spark Perfect?

As full disclosure, we’re still in the early stages of testing Spark, so we can’t yet say whether Spark will be a wholesale replacement for Hive within Bizo. We haven’t gotten to any serious performance comparisons or written large, complex reports to see if Spark can take whatever we throw at it.

Personally, I am also admittedly somewhat infutuated with Spark at this point, so that could be clouding my judgement about the pros/cons and the tradeoffs with Hive.

One Spark con so far is that Spark is pre-1.0, and it can show. I’ve seen some stack traces that shouldn’t happen, and some usability warts, that hopefully will be cleared up by 1.0. (That said, even as a newbie I find the codebase small and very easy to read, such that I’ve had several small pull requests accepted already–which is a nice consolation compared to the daunting codebases of Hadoop and Hive.)

We have also seen that, for our first Spark job, moving from “Spark job written” to “Spark job running in production” is taking longer than expected. But given that Spark is a new tool to us, we expect this to be a one-time cost.

More to Come

I have a few more posts coming up which explain our approach to Spark in more detail, for example:

  • Testing best practices
  • Running Spark in EMR
  • Accessing partitioned S3 logs

To see those when they come out, make sure to subscribe to the blog, or, better yet, come work at Bizo and help us out!