Tuesday, October 20, 2009

Clearing Linux Filesystem Cache

I was doing some performance tuning of our mysql db and was having some trouble consistently reproducing query performance due to IO caching that was occuring in Linux. In case you're wondering, you can clear this cache by executing the following command as root:

echo 1 > /proc/sys/vm/drop_caches

Friday, October 16, 2009

bash, errors, and pipes

Our typical pattern for writing bash scripts has been to start off each script with:

#!/bin/bash -e

The -e option will cause the script to exit immediate if a command has exited with a non-zero status. This way your script will fail as early as possible, and you never get into a case where on the surface, it looks like the script completed, but you're left with an empty file, or missing lines, etc.

Of course, this is only for "simple" commands, so in practice, you can think of it terminating immediately if the entire line fails. So a script like:

#!/usr/bin/bash -e
/usr/bin/false || true
echo "i am still running"
will still print "i am still running," and the script will exit with a zero exit status.

Of course, if you wrote it that way, that's probably what you're expecting. And, it's easy enough to change (just change "||" to "&&").

The thing that was slightly surprising to me was how a script would behave using pipes.

#!/bin/bash -e
/usr/bin/false | sort > sorted.txt
echo "i am still running"
If your script is piping its output to another command, it turns out that the return status of a pipeline is the exit status of its last command. So, the script above will also print "i am still running" and exit with a 0 exit status.

Bash provides a PIPESTATUS variable, which is an array containing a list of the exit status values from the pipeline. So, if we checked ${PIPESTATUS[0]} it would contain 1 (the exit value of /usr/bin/false), and ${PIPESTATUS[1]} would contain 0 (exit value of sort). Of course, PIPESTATUS is volatile, so, you must check it immediately. Any other command you run will affect its value.

This is great, but not exactly what I wanted. Luckily, there's another bash option -o pipefail, which will change the way the pipeline exit code is derived. Instead of being the last command, it will become the last command with a non-zero exit status. So

#!/bin/bash -e -o pipefail
/usr/bin/false | sort > sorted.txt
echo "this line will never execute"
So, thanks to pipefail, the above script will work as we expect. Since /usr/bin/false returns a non-zero exit status, the entire pipeline will return a non-zero exit status, the script will die immediately because of -e, and the echo will never execute.

Of course, all of this information is contained in the bash man page, but I had never really ran into it / looked into it before, and I thought it was interesting enough to write up.

Monday, October 12, 2009

s3fsr 1.4 released

s3fsr is a tool we built at Bizo to help quickly get files into/out of S3. It's had a few 1.x releases, but by 1.4 we figured it was worth getting around to posting about.

Overview

While there a lot of great S3 tools out there, s3fsr's niche is that it's a FUSE/Ruby user land file system.

For a command line user, this is handy, because it means you can do:
# mount yourbucket in ~/s3
s3fsr yourbucketname ~/s3

# see the directories/files
ls ~/s3/

# upload
mv ~/local.txt ~/s3/remotecopy.txt

# download
cp ~/s3/remote.txt ~/localcopy.txt
Behind the scenes, s3fsr is talking to the Amazon S3 REST API and getting/putting directory and file content. It will cache directory listings (not file content), so ls/tab completion will be quick after the initial short delay.

S3 And Directory Conventions

A unique aspect of s3fsr, and a specific annoyance it was written to fulfill, is that it understands several different directory conventions used by various S3 tools.

This directory convention problem stems from Amazon's decision to forgo any explicit notion of directories in the API, and instead force everyone to realize that S3 is not a file system but a giant hash table of string key -> huge byte array.

Let's take an example--you want to store two files, "/dir1/foo.txt" and "/dir1/bar.txt" in S3. In a traditional file system, you'd have 3 file system entries: "/dir1", "/dir1/foo.txt", and "/dir1/bar.txt". Note that "/dir1" gets its own entry.

In S3, without tool-specific conventions, storing "/dir1/foo.txt" and "/dir1/bar.txt" really means only 2 entries. "/dir1" does not exist of its own accord. The S3 API, when reading and writing, never parses keys apart by "/", it just treats the whole path as one big key to get/set in its hash table.

For Amazon, this "no /dir1" approach makes sense due to the scale of their system. If they let you have a "/dir1" entry, pretty soon API users would want the equivalent of a "rm -fr /dir1", which, for Amazon, means instead of a relatively simple "remove the key from the hash table" operation, they have to start walking a hierarchical structure and deleting child files/directories as they go.

When the keys are strewn across a distributed hash table like Dynamo, this increases the complexity and makes the runtime nondeterministic.

Which Amazon, being a bit OCD about their SLAs and 99th percentiles, doesn't care for.

So, no S3 native directories.

There is one caveat--the S3 API lets you progressively infer the existence of directories by probing the hash table keys with prefixes and delimiters.

In our example, if you probe with "prefix=/" and "delimiter=/", S3 will then, and only then, split & group the "/dir1/foo.txt" and "/dir1/bar.txt" keys on "/" and return you just "dir1/" as what the S3 API calls a "common prefix".

Which is kind of like a directory. Except that you have to create the children first, and then the directory pops into existence. Delete the children, and the directory pops out of existence.

This brings us to the authors of tools like s3sync and S3 Organizer--their users want the familiar "make a new directory, double click it, make a new file in it" idiom, not a backwards "make the children files first" idiom. It is, understandably, different from what users expect.

So, the tool authors got creative and basically added their own "/dir1" marker entries to S3 when users' perform a "new directory" operation to get back to the "directory first" idiom.

Note this is a hack, because issuing a "REMOVE /dir1" to S3 will not recursively delete the child files, because to S3 "/dir1" is just a meaningless key with no relation to any other key in the hash table). So now the burden is on the tool to do its own recursive iteration/deletion of the directories.

Which is cool, and actually works pretty well, except that the two primary tools implemented marker entries differently:
  • s3sync created marker entries (e.g. a "/dir1" entry) with a hard-coded content that etags (hashes) to a specific value. This known hash is nice because it makes it easy to distinguish directory entries from file entries when listing S3 entries and, S3 knowing nothing about directories, the tool having to infer on its own which keys represent files and which represent directories.
  • S3 Organizer created marker entries as well, but instead of a known etag/hash, they suffixed the directory name, so the key of "/dir1" is actually "/dir1_$folder$". It's then the job of the tool is recognize the suffix as a marker directory entry, strip off the suffix before showing the name to the user, and use a directory icon instead of a file icon.
So, if you use a S3 tool that does not understand these 3rd party conventions, browsing a well-used bucket will likely end up looking odd with obscure/duplicate entries:
/dir1 # s3sync marker entry file
/dir1 # common prefix directory
/dir1/foo.txt # actual file entry
/dir2_$folder$ # s3 organizer maker entry file
/dir2 # common prefix directory
/dir2/foo.txt # actual file entry
This quickly becomes annoying.

And so s3fsr understands all three conventions, s3sync, S3 Organizer, and common prefixes, and just generally tries to do the right thing.

FUSE Rocks

One final note is that the FUSE project is awesome. Implementing mountable file systems that users can "ls" around in usually involves messy, error-prone kernel integration that is hard to write and, if the file system code misbehaves, can screw up your machine.

FUSE takes a different approach and does the messy kernel code just once, in the FUSE project itself, and then it acts as a proxy out to your user-land, process-isolated, won't-blow-up-the-box process to handle the file system calls.

This proxy/user land indirection does degrade performance, so you wouldn't use it for your main file system, but for scenarios like s3fsr, it works quite well.

And FUSE language bindings like fusefs for Ruby make it a cinch to develop too--s3fsr is all of 280 LOC.

Wrapping up

Let us know if you find s3fsr useful--hop over to the github site, install the gem, kick the tires, and submit any feedback you might have.


Want to be challenged at work?

We've got a few challenges and are looking to grow our (kick ass) engineering team. Check out the opportunities below and reach out if you think you've got what it takes...

Thursday, October 8, 2009

Efficiently selecting random sub-collections.

Here's a handy algorithm for randomly choosing k elements from a collection of n elements (assume k < n)


public static <T> List<T> pickRandomSubset(Collection<T> source, int k, Random r) {
  List<T> toReturn = new ArrayList<T>(k);
  double remaining = source.size();
  for (T item : source) {
    double nextChance = (k - toReturn.size()) / remaining;
    if (r.nextDouble() < nextChance) {
      toReturn.add(item);
      if (toReturn.size() == k) {
        break;
      }
    }
    --remaining;
  }
  return toReturn;
}

The basic idea is to iterate through the source collection only once. For each element, we can compute the probability that it should be selected, which simply equals the number of items left to pick divided by the total number of items left.

Another nice thing about this algorithm is that it also works efficiently if the source is too large to fit in memory, provided you know (or can count) how many elements are in the source.

This isn't exactly anything groundbreaking, but it's far better than my first inclination to use library functions to randomly sort my list before taking a leading sublist.

Wednesday, October 7, 2009

hive map reduce in java

In my last post, I went through an example of writing custom reduce scripts in hive.

Writing a streaming reducer requires a lot of the same work to check for when keys change. Additionally, in java, there's a decent amount of boilerplate to go through just to read the columns from stdin.

To help with this, I put together a really simple little framework that more closely resembles the hadoop Mapper and Reducer interfaces.

To use it, you just need to write a really simple reduce method:
  void reduce(String key, Iterator<String[]> records, Output output);

The helper code will handle all IO, as well as the grouping together of records that have the same key. The 'records' Iterator will run you through all rows that have the key specified in key. It is assumed that the first column is the key. Each element in the String[] record represents a column. These rows aren't buffered in memory or anything, so it can handle any arbitrary number of rows.

Here's the complete example from the my reduce example, in java (even shorter than perl).
public class Condenser {
  public static void main(final String[] args) {
    new GenericMR().reduce(System.in, System.out, new Reducer() {
      void reduce(String key, Iterator records, Output output) throws Exception {
        final StringBuilder vals = new StringBuilder();
        while (records.hasNext()) {
          // note we use col[1] -- the key is provided again as col[0]
          vals.append(records.next()[1]);
          if (records.hasNext()) { vals.append(","); }
        }
        output.collect(new String[] { key, vals.toString() });
      }
    });
  }
}

Here's a wordcount reduce example:

public class WordCountReduce {
  public static void main(final String[] args) {
    new GenericMR().reduce(System.in, System.out, new Reducer() {
      public void reduce(String key, Iterator<String[]> records, Output output) throws Exception {
        int count = 0;
        
        while (records.hasNext()) {
          count += Integer.parseInt(records.next()[1]);
        }
        
        output.collect(new String[] { key, String.valueOf(count) });
      }
    });
  }
}


Although the real value is in making it easy to write reducers, there's also support for helping with mappers. Here's my key value split mapper from a previous example:

  public class KeyValueSplit {
    public static void main(final String[] args) {
      new GenericMR().map(System.in, System.out, new Mapper() {
      public void map(String[] record, Output output) throws Exception {
        for (final String kvs : record[0].split(",")) {
          final String[] kv = kvs.split("=");
          output.collect(new String[] { kv[0], kv[1] });
        }
      }
    }
  }

The full source code is available here. Or you can download a prebuilt jar here.

The only dependency is apache commons-lang.

I'd love to hear any feedback you may have.

Tuesday, October 6, 2009

Simple DB Firefox Plugin -- New Release

I finally got around to updating our open-sourced Simple DB Firefox Plugin creatively named SDB Tool.

obligatory screen shot

The major highlights include:
  • Runs in Firefox 3.5!
  • Support for "Select" Queries (e.g. Version 2009-04-15 of the API)
  • Lots of UI Tweaks and Refactoring...
Please report any issues here.

Click here to install.

reduce scripts in hive

In a previous post, I discussed writing custom map scripts in hive. Now, let's talk about reduce tasks.

The basics

As before, you are not writing an org.apache.hadoop.mapred.Reducer class. Your reducer is just a simple script that reads from stdin (columns separated by \t) and should write rows to stdout (again, columns separated by \t).

Another thing to mention is that you can't run a reduce without first doing a map.

The rows to your reduce script will be sorted by key (you specify which column this is), so that all rows with the same key will be consecutive. One thing that's kind of a pain with hive reducers, is that you need to keep track of when keys change yourself. Unlike a hadoop reducer where you get a (K key, Iterator<V> values), here you just get row after row of columns.

An example

We'll use a similar example to the map script.

We will attempt to condense a table (kv_input) that looks like:
k1 v1
k2 v1
k4 v1
k2 v3
k3 v1
k1 v2
k4 v2
k2 v2
...

into one (kv_condensed) that looks like:

k1 v1,v2
k2 v1,v2,v3
...

The reduce script

#!/usr/bin/perl                                                                                       

undef $currentKey;
@vals=();

while (<STDIN>) {
  chomp();
  processRow(split(/\t/));
}

output();

sub output() {
  print $currentKey . "\t" . join(",", sort @vals) . "\n";
}

sub processRow() {
  my ($k, $v) = @_;

  if (! defined($currentKey)) {
    $currentKey = $k;
    push(@vals, $v);
    return;
  }

  if ($currentKey ne $k) {
    output();
    $currentKey = $k;
    @vals=($v);
    return;
  }

  push(@vals, $v);
}

Please forgive my perl. It's been a long time (I usually write these in java, but thought perl would make for an easier blog example).

As you can see, a lot of the work goes in to just keeping track of when the keys change.

The nice thing about these simple reduce scripts is that it's very easy to test locally, without going through hadoop and hive. Just call your script and pass in some example text separated by tabs. If you do this, you need to remember to sort the input by key before passing into your script (this is usually done by hadoop/hive).

Reducing from Hive

Okay, now that we have our reduce script working, let's run it from Hive.

First, we need to add our map and reduce scripts:

add file identity.pl;
add file condense.pl;

Now for the real work:






01
02
03
04
05
06
07
08
09
10
11


from (
  from kv_input
  MAP k, v
  USING './identity.pl'
  as k, v
 cluster by k) map_output
insert overwrite table kv_condensed
reduce k, v
  using './condense.pl'
  as k, v
;


This is fairly dense, so I will attempt to give a line by line breakdown:

On line 3 we are specifying the columns to pass to our reduce script from the input table (specified on line 2).

As I mentioned, You must specify a map script in order to reduce. For this example, we're just using a simple identity perl script. On line 5 we name the columns the map script will output.

Line 6 specifies the column which is the key. This is how the rows will be sorted when passed to your reduce script.

Line 8 specifies the columns to pass into our reducer (from the map output columns on line 5).

Finally, line 10 names the output columns from our reducer.

(Here's my full hive session for this example, and an example input file).

I hope this was helpful. Next time, I'll talk about some java code I put together to simplify the process of writing reduce scripts.

Monday, October 5, 2009

Developing on the Scala console with JavaRebel

If you're the type of developer who likes to mess around interactively with your code, you should definitely be using the Scala console. Even if you're not actually using any Scala in your code, you can still instantiate your Java classes, call their methods, and play around with the results. Here's a handy script that I stick in the top-level of my Eclipse projects that will start an interactive console with my compiled code on the classpath:

#!/bin/bash

tempfile=`mktemp /tmp/tfile.XXXXXXXXXX`

/usr/bin/java -jar /mnt/bizo/ivy-script/ivy.jar -settings /mnt/bizo/ivy-script/ivyconf.xml -cachepath ${tempfile} > /dev/null

classpath=`cat ${tempfile} | tr -d "\n\r"`

rm ${tempfile}

exec /usr/bin/java -classpath /opt/local/share/scala/lib:target/classes:${classpath} -noverify -javaagent:/opt/javarebel/javarebel.jar scala.tools.nsc.MainGenericRunner

(Since we already use Ivy for dependency management, this script also pulls in the appropriate jar files from the Ivy cache. See this post for more details.)

The javaagent I'm using here is JavaRebel, a really awesome tool that provides automatic code reloading at runtime. Using the Scala console and JavaRebel, I can instantiate an object on the console and test a method. If I get an unexpected result, I can switch back to Eclipse, fix a bug or add some additional logging, and rerun the exact same method back on the console. JavaRebel will automagically detect that the class file was changed and reload it into the console, and the changes will even be reflected in the objects I created beforehand.

The icing on this cake is that Zero Turnaround (the makers of JavaRebel) is giving away free licenses to Scala developers. How awesome is that?