Wednesday, September 26, 2012

Grouping pageviews into visits: a Scala code kata

The basic units of any website traffic analysis are pageviews, visits, and unique visitors.  Tracking pageviews is simply a matter of counting requests to the server.  Calculating unique visitors usually relies on cookies and unique identifiers.  Visits, however, require a bit more work.  For our purposes, a single visit is defined as a sequence of pageviews where the interval between pageviews is less than a fixed length like 15 minutes.

I thought that the problem of grouping pageviews into visits would make an interesting code kata.  Here’s the statement of the problem that I worked from:

Given a non-empty sequence of timestamps (as milliseconds since the epoch), write a function that would return a sequence of visits, where each visit is itself a sequence of timestamps where each pair of consecutive timestamps is no more than N milliseconds apart.

As a starting point, I decided to take a straightforward procedural approach:

def doingItIteratively(pageviews: Seq[Long]): Seq[Seq[Long]] = {
 val iterator = pageviews.sorted.iterator
 val visits = ListBuffer[ListBuffer[Long]]()

 var previousPV: Long = iterator.next
 var currentVisit: ListBuffer[Long] = ListBuffer(previousPV)

 for (currentPV <- iterator) {
   if (currentPV - previousPV > N) {
     visits += currentVisit
     currentVisit = ListBuffer[Long]()
   }

   currentVisit += currentPV
   previousPV = currentPV
 }
 visits += currentVisit

 visits map (_.toSeq) toSeq
}

So, we simply iterate through the (sorted) events tracking both the current visit and the previous pageview.  If the current pageview represents a new visit, push the previous visit into the list of all visits and start a new one.  Then push the current pageview into the (potentially new) visit.

It actually felt a bit odd to write procedural code like this and ignore the functional parts of Scala.  Using a fold cleans the code up a bit and gets rid of the mutable state.

def doingItByFolds(pageviews: Seq[Long]): Seq[Seq[Long]] = {
 val sortedPVs = pageviews.sorted

 (Seq[Seq[Long]]() /: sortedPVs) { (visits, pv) =>
   val isNewVisit = visits.lastOption flatMap (_.lastOption) map {
     prevPV => pv - prevPV > N
   } getOrElse true

   if (isNewVisit) {
     visits :+ Seq(pv)
   } else {
     visits.init :+ (visits.last :+ pv)
   }
 }
}

Here, we’re starting with an empty list of visits and folding it over the sorted pageviews.  At each pageview, we decide if we need to start a new visit.  If so, we append a new visit containing the pageview to the accumulated visits.  If not, we pop off the last visit, append the pageview, and put the last visit back on the tail of the accumulated visits.

One part that’s still a bit messy is comparing the current timestamp to the previous one.  We can improve that by iterating through the intervals between pageviews instead of the actual pageviews.

def slidingThroughIt(pageviews: Seq[Long]): Seq[Seq[Long]] = {
 val intervals = (0L +: pageviews.sorted).sliding(2)

 (Seq[Seq[Long]]() /: intervals) {
   (visits, interval) =>
     if (interval(1) - interval(0) > N) {
       visits :+ Seq(interval(1))
     } else {
       visits.init :+ (visits.last :+ interval(1))
     }
 }
}

Here, we’re prepending a “0L” timestamp (and assuming that none of the pageviews happened in the early 70s) and using the “sliding” method to pair each timestamp with the previous one.

So far, we’ve been using a sequence of pageviews as a visit.  What happens if we add an explicit Visit type?  This lets us convert all pageviews into Visits at the start, then focus on merging overlapping Visits.  One nice benefit is that this is a map-reduce algorithm that can be easily parallelized instead of one that must sequentially iterate over the pageviews (either explicitly or with a fold).

case class Visit(start: Long, end: Long, pageviews: Seq[Long]) {
 def +(other: Visit): Visit = {
   Visit(min(start,other.start), max(end, other.end),
         (pageviews ++ other.pageviews).sorted)
 }
}

def doingItMapReduceStyle(pageviews: Seq[Long]): Seq[Visit] = {
 pageviews.par map { pv =>
   Seq(Visit(pv, pv+N, Seq(pv))
 } reduce { (visit1, visit2) =>
   val sortedVisits = (v1 ++ v2) sortBy (_.start)

   (Seq[Visit]() /: sortedVisits) { (visits, next) =>
     if (visits.lastOption map(_.end >= next.start) getOrElse false)
     {
       visits.init :+ (visits.last + visit)
     } else {
       visits :+ visit
     }
   }
 }
}

The map-reduce solution is fun, but in a production system, I’d probably stick with the sliding variation and add a bit more flexibility to track actual pageview objects instead of just timestamps.

Wednesday, September 19, 2012

Using GROUP BYs or multiple INSERTs with complex data types in Hive.

In any sort of ad hoc data analysis, the first step is often to extract a specific subset of log lines from our files.  For example, when looking at a single partner’s web traffic, I often use an initial query to copy that partner’s data into a new table.  In addition to segregating out only the data relevant to my analysis, I use this to copy the data from S3 into HDFS, which will make later queries more efficient.  (Using maps as our log lines is how we support dynamic columns.)

create external table if not exists
original_logs(fields map<string,string>) location “...” ;

create table if not exists
extracted_logs(fields map<string,string>) ;

insert overwrite table extracted_logs
select * from original_logs where fields[“partnerId”] = 123 ;

If I’m doing this for multiple partners, it’s tempting to use a multiple-insert so Hadoop only needs to make one pass of the original data.

create external table if not exists
original_logs(fields map<string,string>) location “...” ;

create table if not exists
extracted_logs(fields map<string,string>)
partitioned by (partnerId int);

from original_logs
insert overwrite table extracted_logs partition (partnerId = 123)
select * from original_logs where fields[“partnerId”] = 123
insert overwrite table extracted_logs partition (partnerId = 234)
select * from original_logs where fields[“partnerId”] = 234

Unfortunately, in Hive 0.7.x, this query fails with the error message “Hash code on complex types not supported yet.”  A multiple-insert statement uses an implicit group by, and Hive 0.7.x does not support grouping by complex types.  This bug was partially addressed in 0.8, which added support for arrays and maps, but structs and unions are still not supported.

At an initial glance, it does look like adding this support should be straightforward.  This could be a good candidate for our next open source day.