Monday, April 22, 2013

Scala Command-Line Hacks

Do you like command-line scripting and one-liners with Perl, Ruby and the like?   

For instance, here's a Ruby one-liner that uppercases the input:

% echo matz | ruby -p -e '$! "a-z", "A-Z"'

You like that kind of stuff?  Yes?  Excellent!  Then I offer you a hacking idea for Scala.

As you may know, Scala offers similar capability with the -e command-line option but it's fairly limited in its basic form because of the necessary boilerplate code to set up iteration over the standard input... it just begs for a simple HACK!

Using a simple bash wrapper,

# Usage: scala-map MAP_CODE
code=$(cat <<END map { $@ } foreach println
scala -e "$code"

then we can express similar one-liners using Scala code and the standard library:

% ls | scala-map _.toUpperCase

% echo "foo bar baz" | scala-map '_.split(" ").mkString("-")'

Nifty, right?  Here's another script template to fold over the standard input,

# Usage: scala-fold INITIAL_VALUE FOLD_CODE
# where the following val's can be used in FOLD_CODE:
#        `acc`  is bound to the accumulator value
#        `line` is bound to the current line
code=$(cat <<END
println($1) { case (acc, line) => $2 })
scala -e "$code"

Now if you wanted to calculate the sum of the second column of space-separated input, you'd write:

$ cat | scala-fold 0 'acc + (line.split(" ")(1).toInt)'
foo 1
bar 2
baz 3 

You get the idea ...  hopefully this inspires you to try a few things with Scala scripting templates!

Disclaimer:  I am not advocating these hacks as replacement to learning other Unix power tools like grep, sed, awk, ... I am simply illustrating that Scala can be turned into an effective command-line scripting tool as part of the vast array of Unix tools.   Use what works best for you.

Friday, April 19, 2013

Efficiency & Scalability

Software engineers know that distributed systems are often hard to scale and many can intuitively point to reasons why this is the case by bringing up points of contention, bottlenecks and latency-inducing operations.  Indeed, there exists a plethora of reasons and explanations as to why most distributed systems are inherently hard to scale, from the CAP theorem to scarcity of certain resources, e.g., RAM, network bandwidth, ...

It's said that good engineers know how to identify resources that may not appear to be relevant to scaling initially but will become more significant as particular kinds of demand grow. If that’s the case, then great engineers know that system architecture is often the determining factor in system scalability  that a system’s own architecture may be its worse enemy so they define and structure systems in order avoid fundamental flaws.

In this post, I want to explore the relationship between system efficiency and scalability in distributed systems; they are to some extent two sides of the same coin.  We’ll consider specifically two common system architecture traits:  replication and routing.  Some of this may seem obvious to some of you but it’s always good to back intuition with some additional reasoning.

Before we go any further, it’s helpful to formulate a definition of efficiency applicable to our context:

efficiency is the extent to which useful work is performed relative to the total work and/or cost incurred.

We’ll also use the following definition of scalability,

scalability is the ability of a system to accommodate an increased workload by repeatedly applying a cost-effective strategy for extending a system’s capacity.

So, scalability and efficiency are both determined by cost-effectiveness with the distinction that scalability is a measure of marginal gain.  Stated differently, if efficiency decreases significantly as a system grows, then a system is said to be non-scalable.

Enough rambling, let’s get our thinking caps on!  Since we’re talking about distributed systems, it’s practically inevitable to compare against traditional single-computer systems, so we’ll start with a narrow definition of system efficiency:

average work for processing a request on a single computer Efficiency =
  average work for processing a request in distributed system

This definition is a useful starting point for our exploration because it abstracts out the nature of the processing that’s happening within the system; it’s overly simple but it allows us to focus our attention on the big picture.

More succinctly, we’ll write:

(1)  Efficiency = Wsingle / Wcluster

Replication Cost

Many distributed systems replicate some or all of the data they process across different processing nodes (to increase reliability, availability or read performance) so we can model:

(2) Wcluster = Wsingle + (r x Wreplication)

where r is the number of replicas in the system and Wreplication is the work required to replicate the data to other nodes. Wreplication is typically lower than Wsingle, though realistically they have different cost models (e.g., Wsingle may be CPU-intensive whereas Wreplication may be I/O-intensive).   If n is the number of nodes in the system, then r may be as large as (n-1), meaning replicating to all other nodes, though most systems will only replicate to 2 or 3 other nodes  for good reason  as we’ll discover later.

We’ll now define the replication coeffient, which expresses the relative cost of replication compared to the cost of processing the request on a single node:

(3) Qreplication = Wreplication / Wsingle

Solving for Qreplication, we get:

(4) Wreplication = Qreplication x Wsingle

If we substitute Wreplication in (2) by the equation formulated in (4), we obtain:

(5) Wcluster = Wsingle x [ 1 + ( r x Qreplication * Wsingle ) ]

We now factor out Wsingleon the left side:

(6) Wcluster = Wsingle x [ 1 + r * Qreplication ]

Taking the efficiency equation (1) and substituting Wcluster from (6), the equation becomes:

(7) Efficiency = Wsingle / [ Wsingle x ( 1 + r * Qreplication ]

We then simplify Wsingle to obtain the final efficiency for a replicating distributed system:

(8) Efficiency (replication) = 1 / [ 1 + (r x Qreplication) ]

As expected, both r and Qreplication are critical factors determining efficiency.

Interpreting this last equation and assuming Qreplication is a constant inherent to the system’s processing, our two takeaways are:

  1. If the system replicates to all other nodes (i.e., r = n - 1) it becomes clear that the efficiency of the system will degrade as more nodes are added and will approach zero as n becomes sufficiently large.

    To illustrate this, let's assume Qreplication is 10%,

    Efficiency (r = 1, n = 2) = 91%
    Efficiency (r = 2, n = 3) = 83%
    Efficiency (r = 3, n = 4) = 76%
    Efficiency (r = 4, n = 5) = 71%
    Efficiency (r = 5, n = 6) = 67%

    In other words, fully-replicated distributed systems don't scale.

  2. For a system to scale, the replication factor should be a (small) constant.

    Let's illustrate this with Qreplication fixed at 10% and using a replication factor of 3,

    Efficiency (r = 3, n = 4) = 76%
    Efficiency (r = 3, n = 5) = 76%
    Efficiency (r = 3, n = 6) = 76%
    Efficiency (r = 3, n = 7) = 76%
    Efficiency (r = 3, n = 8) = 76%

    As we can see, fixed-replication-factor distributed systems scale  although, as you might expect, they do not exhibit the same efficiency as a single-node system. At worse, the efficiency will be 1/r  as you would intuitively expect.

Routing Cost

When a distributed system routes requests to nodes holding the relevant information (e.g., a partially replicated system, r < n) its working model may be defined as,

(9) Wcluster = (r / n) * Wsingle  +  (n-r)/n * (Wrouting + Wsingle)

The above equation represents the fact that r out of n requests are processed locally whereas the remainer of the requests are routed and processed on a different node.

Let’s define the routing coefficient to be,

(10) Qrouting  =  Wrouting / Wsingle

Solving for Wrouting in (9) by (11) to obtain,

(12) Wcluster = (r/n) * Wsingle  +  (n-r)/n * [ (Qrouting * Wsingle) + Wsingle ]

and taking the efficiency equation (1), substituting Wcluster from (12), the simplified equation becomes:

(13) Efficiency (routing) = n / [ n + (n - r) * Qrouting ]

Looking at this last equation, we can infer that:

  1. As the system grows and n goes towards infinity, the efficiency of the system can be expressed as 1 / (1 + Qrouting).   The efficiency is not dependent on the actual number of nodes within the system therefore routing-based systems generally scale.(But you knew that already)
  2. If the number of nodes is large compared to the replication factor (n >> r) and Qrouting is significant (1.0, same cost as Wsingle), then the efficiency is ½, or 50%.   This matches the intuition that the system is routing practically all requests and therefore spending half of its efforts on routing.   The system is scaling linearly but it’s costing twice as much to operate (for every node) compared to a single-node system.
  3. If the cost of routing is insignificant (Qrouting = 0), the efficiency is 100%.  That’s right, if it doesn’t cost anything to route the request to a node that can process it, the efficiency is the same as a single-node system.

Let’s consider a practical distributed system with 10 nodes (n = 10), a replication factor of 3 (r = 3), and a relative routing cost of 10% (Qrouting = 0.10).  This system would have an efficiency of  10 / 10 + (7 * 10%) = 93.46%.   As you can see, routing-based distributed systems can be pretty efficient if Qrouting is relatively small.

Where To Now?

Well, this was a fun exploration of system scalability in the abstract.  We came up with interesting equations to describe the scalabilty of both data-replicating and request-routing architectures.  With some thinkering, these can serve as a good basis for reasoning about some of your distributed systems.

In real life, however, there are many other aspects to consider when scaling systems.  In fact, it often feels like a whack-a-mole hunt; you never know there the next performance non-linearity is going to rear its ugly head.  But if you use either (or both) the data-replicating and request-routing style architecture with reasonable replication factors and you manage to keep your replication/routing costs well below your single-node processing costs, you may find some comfort in knowing that at least you haven’t introduced a fundamental scaling limitation unto your system.

PS: With apologies for the formatting of the formulas ... Blogger wasn't exactly friendly with my equations imported from Google Docs so I had to go down the ASCII route. Thanks for reading and making it through!

Monday, April 15, 2013

Sensible Defaults for Apache HttpClient

Defaults for HttpClient

Before coming to Bizo, I wrote a web service client that retrieved daily XML reports over HTTP using the Apache DefaultHttpClient. Everything went fine until one day the connection simply hung forever. We found this odd because we had set the connection timeout. It turned out we also needed to set the socket timeout (HttpConnectionParams.SO_TIMEOUT). The default for both connection timeout (max time to wait for a connection) and socket timeout (max time to wait between consecutive data packets) is infinity. The server was accepting the connection but then not sending any data so our client hung forever without even reporting any errors. Rookie mistake, but everyone is a rookie at least once. Even if you are an expert with HttpClient, chances are there will be someone maintaining your code in the future who is not.

Another problem with defaults using HttpClient is with PoolingClientConnectionManager. PoolingClientConnectionManager has two attributes: MaxTotal and MaxPerRoute. MaxTotal is the maximum total number of connections in the pool. MaxPerRoute is the maximum number of connections to a particular host. If the client attempts to make a request and either of these maximums have been reached, then by default the client will block until a connection is free. Unfortunately the default for MaxTotal is 20 and the default MaxPerRoute is only 2. In a SOA, it is common to have many connections from a client to a particular host. The limit of 2 (or even 1) connections per host makes sense for a polite web crawler, but in a SOA, you are likely going to need a lot more. Even the 20 maximum total connections in the pool is likely much lower than desired.

If the client does reach the MaxPerRoute or the MaxTotal connections, it will block until the connection manager timeout (ClientPNames.CONN_MANAGER_TIMEOUT) is reached. This timeout controls how long the client will wait for a connection from the connection manager. Fortunately, if this timeout is not set directly, it will default to the connection timeout if that is set, which will prevent the client from queuing up requests indefinitely.

What would a better set of defaults be?

A good default is something that is "safe". A safe default for a connection timeout is long enough to not give up waiting when things are working normally, but short enough to not cause system instability when the is down. Unfortunately safe is context dependent. Safe for a daily data sync process and safe for an in thread service request handler are very different. Safe for a request that is critical to the correct functioning of the program is different than safe for a some ancillary logging that is ok to miss 1% of the time. A default for timeouts that is safe in all cases is not really possible.

Safe defaults for PoolingClientConnectionManager's MaxTotal and MaxPerRoute should be big enough that they won’t be hit unless there is a bug. New to version 4.2 is the fluent-hc API for making http requests. This uses a PoolingClientConnectionManager with defaults of 200 MaxTotal and 100 MaxPerRoute. We are using these same defaults for all our configurations.

Note that the fluent-hc API is very nice, but requires setting the connection timeouts on each request. This is perfect if you need to tune the settings for each request but does not provide a safety check against accidentally leaving the timeout infinite.

How can you help out a new dev implementing a new HTTP client?

If you can't have a safe default and the existing defaults are decidedly not safe, then it is best to require a configuration. We created a wrapper for PoolingClientConnectionManager that requires the developer to choose a configuration instead of letting the defaults silently take effect. One way to require a configuration is to force passing in the timeout values. However, it can be a hard to know the right values especially when stepping into a new environment. To help a developer implementing a new client at Bizo, we created some canonical configurations in the wrapper based on our experience working in our production environment on AWS. The configurations are:

Connection timeout
Socket timeout
125 ms
125 ms
1 second
1 second
10 seconds
10 seconds
1 minute
5 minutes

Clients with critical latency requirements can use the SameRegion configuration and need to make sure they are connecting to a service in the same AWS region. Back end processes that can tolerate latency can use the MaxTimeout configuration. Now when a developer is implementing a new client, the timeouts used by other services are readily available without having to hunt through other code bases. The developer can compare these with the current use case and choose an appropriate configuration. Additionally, if we learn that some of these configurations need to be tweaked, then we can easily modify all affected code.

Commonly the socket timeout will need to be adjusted for a specific service. After a connection is established, a service will not typically start sending its response until it has finished whatever calculation was requested. This can vary greatly even for different parameters on the same service endpoint. The socket timeout will need to be set based on the expected response times of the service.

It is easy to miss a particular setting even if you know it is there. At Bizo, we are always looking for ways to solve a problem in one place. We are hopeful that this will eliminate any issues we have had with bad defaults in our HttpClients.