Monday, January 30, 2012

work at Bizo (looking for some good engineers)

We’re a small, disciplined team that gets a lot done. Our platform processes billions of page views monthly and 100s of terabytes of data so we have lots of fun problems to tackle. We believe in teamwork and communication: comments, design reviews, code reviews for every change, weekly tech talks. We believe in giving developers ownership over projects. We believe Engineering is more than coding. We have fun and keep the beer fridge well stocked.

We have customers, are well funded and recently named the forth fastest growing private company in the San Francisco Bay Area.

We are looking for motivated problem solvers with an entrepreneurial / hacker spirit.

If you're a reader of this blog, you already know our technology stack. Some highlights: Scala, Java, Javscript, Ruby, AWS (pretty much every service), Hadoop/Hive, GWT, MongoDB, Solr, etc.

If you're interested, please apply on stackoverflow.

Wednesday, January 18, 2012

Using GenericUDFs to return multiple values in Apache Hive

A basic user defined function (UDF) in Hive is very easy to write: you simply subclass org.apache.hadoop.hive.ql.exec.UDF and implement an evaluate method.  We've previously written about this strategy, and it works well for most simple cases.

The first case where this breaks down is when you want to return multiple values from your UDF.  For me, this often arises when we have serialized data stored in a single Hive field and want to extract multiple pieces of information from it.

For example, suppose we have a simple Person object (leaving out all of the error checking code):

case class Person(val firstName: String, val lastName: String)

object Person {
  def serialize(p: Person): String = {
    p.firstName + "|" + p.lastName

  def deserialize(s: String): Person = {
    val parts = s.split("|")
    Person(parts(0), parts(1))

We want to convert a data table containing these serialized objects into one containing firstName and lastName columns.

create table input(serializedPerson string) ;
load data local inpath ... ;

create table output(firstName string, lastName string) ;

So, what should our UDF and query look like?

Using the previous strategy, we could create two separate UDFs:

insert overwrite table output
select firstName(serializedPerson), lastName(serializedPerson)
from input ;

Unfortunately, the two invocations will have to separately deserialize their inputs, which could be expensive in less trivial examples.  It also requires writing two separate implementation classes whose only difference is which field to pull out of your model object.

An alternative is to use a GenericUDF and return a struct instead of a simple string.  This requires using object inspectors to specify the input and output types, just like in a UDTF:

class DeserializePerson extends GenericUDF {
  private var inputInspector: PrimitiveObjectInspector = _

  def initialize(inputs: Array[ObjectInspector]): StructObjectInspector = {
    this.inputInspector = inputs(0).asInstanceOf[PrimitiveObjectInspector]

    val stringOI =    PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(STRING)

    val outputFieldNames = Seq("firstName", "lastName")
    val outputInspectors = Seq(stringOI, stringOI)

  def getDisplayString(children: Array[String]): String = {
    "deserialize(" + children.mkString(",") + ")"

  def evaluate(args: Array[DeferredObject]): Object = {
    val input = inputInspector.getPrimitiveJavaObject(args(0).get)
    val person = Person.deserialize(input.asInstanceOf[String])
    Array(person.firstName, person.lastName)


Here, we're specifying that we expect a single primitive object inspector as an input (error handling code omitted) and returning a struct containing two fields, both of which are strings.  We can now use the following query:

create temporary function deserializePerson as 'com.bizo.udf.DeserializePerson' ;

insert overwrite table output
select person.firstName, person.lastName
from (
  select deserializePerson(serializedPerson)
  from input
) parsed ;

This query deserializes the person only once but gives you access to both of the values returned by the UDF.

Note that this method does not allow you to return multiple rows -- for that, you still need to use a UDTF.

Friday, January 13, 2012

Clustering of sparse data using python with scikit-learn

Coming from a Matlab background, I found sparse matrices to be easy to use and well integrated into the language. However, when transitioning to python’s scientific computing ecosystem, I had a harder time using sparse matrices. This post is intended to help Matlab refugees and those interested in using sparse matricies in python (in particular, for clustering)

scikit-learn (2.10+)
numpy (refer to scikit-learn version requirements)
scipy (refer to scikit-learn version requirements)

Sparse Matrix Types:
There are six types of sparse matrices implemented under scipy:
  1. bsr_matrix -- block sparse row matrix
  2. coo_matrix -- sparse matrix in coordinate format
  3. csc_matrix -- compressed sparse column matrix
  4. csr_matrix -- compressed sparse row matrix
  5. dia_matrix -- sparse matrix with diagonal storage
  6. dok_matrix -- dictionary of keys based sparse matrix
  7. lil_matrix -- row-based linked list sparse matrix
For more info see: (

When to use which matrix:
The following are scenarios when you would want to choose one sparse matrix type over the another:
  • Fast Arithmetic Operation: csc_matrix, csr_matrix
  • Fast Column Slicing (e.g., A[:, 1:2]): csc_matrix
  • Fast Row Slicing (e.g., A[1:2, :]) csr_matrix
  • Fast Matrix vector products: csr_matrix, bsr_matrix, csc_matrix
  • Fast Changing of sparsity (e.g., adding entries to matrix): lil_matrix, dok_matrix
  • Fast conversion to other sparse formats: coo_matrix
  • Constructing Large Sparse Matrices: coo_matrix
Clustering with scikit-learn:
With the release of scikit-learn 2.10, one of the useful new features is the support for sparse matrices with the k-means algorithm. The following is how you would use sparse matrices with k-means:

Full Matrix to Sparse Matrix

from numpy.random import random
from scipy.sparse import *
from sklearn.cluster import KMeans

# create a 30x1000 dense matrix random matrix.
D = random((30,1000))
# keep entries with value < 0.10 (10% of entries in matrix will be non-zero)
# X is a "full" matrix that is intrinsically sparse.
X = D*(D<0.10) # note: element wise mult

# convert D into a sparse matrix (type coo_matrix)
# note: we can initialize any type of sparse matrix.
# There is no particular motivation behind using
# coo_matrix for this example.
S = coo_matrix(X)

labeler = KMeans(k=3)
# convert coo to csr format
# note: Kmeans currently only works with CSR type sparse matrix

# print cluster assignments for each row
for (row, label) in enumerate(labeler.labels_):
print "row %d has label %d"%(row, label)

One of the issues with Example-1 is that we are constructing a sparse matrix from a full matrix. It will often be the case that we will not be able to fit a full (although intrinsically sparse) matrix in memory. For example, if the matrix X was a 100000x1000000000 full matrix, there could be some issues. One solution to this is to somehow extract out the non-zero entries of X and to use a smarter constructor for the sparse matrix.

Sparse Matrix Construction

In Example-2, we will assume that we have X's data stored on some file on disk. In particular, we will assume that X is stored in a csv file and that we are able to extract out the non-zero data efficiently.

import csv
from scipy.sparse import *
from sklearn.cluster import KMeans

def extract_nonzero(fname):
extracts nonzero entries from a csv file
input: fname (str) -- path to csv file
output: generator<(int, int, float)> -- generator
producing 3-tuple containing (row-index, column-index, data)
for (rindex,row) in enumerate(csv.reader(open(fname))):
for (cindex, data) in enumerate(row):
if data!="0":
yield (rindex, cindex, float(data))

def get_dimensions(fname):
determines the dimension of a csv file
input: fname (str) -- path to csv file
output: (nrows, ncols) -- tuple containing row x col data

rowgen = (row for row in csv.reader(open(fname)))
# compute col size
colsize = len(
# compute row size
rowsize = 1 + sum(1 for row in rowgen)
return (rowsize, colsize)

# obtain dimensions of data
(rdim, cdim) = get_dimensions("X.csv")

# allocate a lil_matrix of size (rdim by cdim)
# note: lil_matrix is used since we be modifying
# the matrix a lot.

S = lil_matrix((rdim, cdim))

# add data to S
for (i,j,d) in extract_nonzero("X.csv"):
S[i,j] = d

# perform clustering
labeler = KMeans(k=3)
# convert lil to csr format
# note: Kmeans currently only works with CSR type sparse matrix

# print cluster assignments for each row
for (row, label) in enumerate(labeler.labels_):
print "row %d has label %d"%(row, label)

What to do when Sparse Matrices aren't supported:
When sparse matrices aren't supported, one solution is to convert the matrix to a full matrix. To do this, simply invoke the todense() method.

Hudson/Jenkins With RVM and PhantomJS

Setting up Hudson/Jenkins to work RVM (Ruby Version Manager) and PhantomJS (for headless JavaScript testing) can be painful. This post will show you how to easily set them up on your own server.

At Bizo we have several projects that have dependencies on different versions of Ruby, mostly due to some projects relying on older gems which are incompatible with Ruby 1.9. Installing RVM on a dev machine is almost always a cinch but getting it to play nicely with your ci build server isn't quite so straightforward.

We run our Hudson server off of an Amazon EC2 instance. Our EC2 instances are started up with custom software, but  it really boils down to executing a bash start up script. Assuming the Hudson user's $HOME is set to /var/lib/hudson, you can copy/paste  the code below to install RVM for you. Otherwise just replace /var/lib/hudson below to the $HOME dir of your Hudson (or Jenkins) user.

bash -s stable < <(curl -s
echo "[ -s \"/var/lib/hudson/.rvm/scripts/rvm\" ] && source \"/var/lib/hudson/.rvm/scripts/rvm\" # loads RVM" > .bashrc
# ensure RVM is loaded
source ~/.bashrc

echo "Installing Ruby 1.9.2"
rvm install 1.9.2 
rvm use 1.9.2

echo "Installing gems for Ruby: 1.9.2"
gem install bundler --no-rdoc --no-ri
# add additional ruby versions here
su - hudson -c "$COMMANDS"

Then in your Hudson build go to your project configuration and under "execute shell" you can invoke rvm and run your project like normal. Note* our version of Hudson doesn't automatically load .bashrc, so you might need to source it first to ensure RVM loads, ex:

source ~/.bashrc
# Pick our ruby version
rvm use 1.9.2

# Run your project... ex bundle install && rake test:units for a Rails project 

PhantomJS is our execution environment of choice for running JavaScript unit tests and setting it up to run on Hudson is actually quite easy.

Here is the necessary bash snippet to make it available for use in Hudson. 

INSTALL_PATH= # wherever you want
wget ${INSTALL_PATH}/phantomjs.tar.gz
# OR For 64bit machines wget

mkdir ${INSTALL_PATH}/phantomjs
tar -zxvf ${INSTALL_PATH}/phantomjs.tar.gz -C ${INSTALL_PATH}

ln -s ${INSTALL_PATH}/phantomjs/bin/phantomjs /usr/local/bin/phantomjs
After running the script above you can invoke PhantomJS as "phantomjs" in the "execute shell" box inside your project configuration. You'll probably want your tests to fail with a non-0 exit status so the Hudson build will fail, if you use the Jasmine testing framework you can use our phantom-jasmine test runner on Github:

Interactive Hive sessions, Elastic MapReduce, and GNU screen

One extremely annoying quality of using Hive interactively on EMR (or any other remote system) is that your sessions will die if you lose your connection to the server.  Once this happens, your ssh session will end, terminating both your Hive session and any queries that may currently be running.

In most cases, this happens when I'm waiting for a query to execute and I need to move from one place to another, whether from my desk to a conference room or from the office to home.  When I can predict (or know) that I'm going to lose my connection and just want to be able to reconnect to Hive later, the best option I've found is to run Hive inside of GNU screen.

I'm definitely a screen newbie, but there are really only three things you need to know:

1. As soon as you log in for the first time, install screen and start it up:

sudo apt-get install screen

2. When you're (temporarily) done interacting with Hive and want to stick it in the background, tell screen to detach Hive from your current session by pressing "Ctrl-a" then "d".  You may now log out from your EMR node.

3. When you're ready to resume your Hive session, simply log back on and tell screen to reconnect to the most recent session:

screen -r

Screen does a whole lot of other stuff, but simply allowing graceful reconnection to Hive sessions is definitely worth the price of entry.

For comparison, some other things you could do to work around this problem are using nohup, suspending/putting the job in the background and using disown, or using an even more advanced tool like  tmux.

Monday, January 9, 2012

How to Measure Latency Distribution using Amazon CloudWatch

By default, web services hosted in AWS behind an Elastic Load Balancer have their response rates automatically tracked in CloudWatch.  This lets you easily monitor the minimum, maximum, and average latency of your services.

Unfortunately, none of these metrics are very useful.  Minimum and maximum latencies really measure outliers, and averages can easily obscure what's really going on in your services.

To get a better picture of the performance of our backend services, we explicitly have client services track the latency distribution of servers using CloudWatch custom metrics.  Instead of having a single metric that measures the number of milliseconds taken by each request, we instead count the number of requests that take a particular number of milliseconds.

More specifically, for each service, we create eleven custom metrics representing the buckets 0-10ms, 11-20ms, 21-30ms, ... , 91-100ms, and 101+ms.  (We actually have another set of buckets covering the 100-1000ms range at 100ms intervals, but we've found these to be less useful.)  For each request, we then increment the counter for the bucket corresponding to the request's latency.  Periodically, an automated process simply pulls the aggregated counts from CloudWatch into a Google Spreadsheet and graph the results in a stacked bar chart.

A stacked bar chart of response times for one of our services.  Each layer represents a different response time bucket.

This gives us a nice visual representation of both the overall traffic levels as well as how many of them had response times below each threshold.

Our automated process also converts the totals into percentiles.

A stacked bar chart of response times as percentiles.

Google's chart tools support displaying only a vertical slice of the data, so we can easily show 90th, 95th, and 97.5th percentiles of our response times.  This lets us easily see whether we're fulfilling performance requirements like "95% of all requests with response times below N ms."

Our slowest responses.

The combination of latency bucketing, Amazon CloudWatch, and Google Spreadsheets gives us a very lightweight way of tracking our server performance.  The only additional overhead on our servers is a bit of logic to do local aggregation and push data into CloudWatch, and the only other moving part is a simple cron task that connects together the CloudWatch and Google Data APIs.