I thought that the problem of grouping pageviews into visits would make an interesting code kata. Here’s the statement of the problem that I worked from:
Given a non-empty sequence of timestamps (as milliseconds since the epoch), write a function that would return a sequence of visits, where each visit is itself a sequence of timestamps where each pair of consecutive timestamps is no more than N milliseconds apart.
As a starting point, I decided to take a straightforward procedural approach:
def doingItIteratively(pageviews: Seq[Long]): Seq[Seq[Long]] = {
val iterator = pageviews.sorted.iterator
val visits = ListBuffer[ListBuffer[Long]]()
var previousPV: Long = iterator.next
var currentVisit: ListBuffer[Long] = ListBuffer(previousPV)
for (currentPV <- iterator) {
if (currentPV - previousPV > N) {
visits += currentVisit
currentVisit = ListBuffer[Long]()
}
currentVisit += currentPV
previousPV = currentPV
}
visits += currentVisit
visits map (_.toSeq) toSeq
}
So, we simply iterate through the (sorted) events tracking both the current visit and the previous pageview. If the current pageview represents a new visit, push the previous visit into the list of all visits and start a new one. Then push the current pageview into the (potentially new) visit.
It actually felt a bit odd to write procedural code like this and ignore the functional parts of Scala. Using a fold cleans the code up a bit and gets rid of the mutable state.
def doingItByFolds(pageviews: Seq[Long]): Seq[Seq[Long]] = {
val sortedPVs = pageviews.sorted
(Seq[Seq[Long]]() /: sortedPVs) { (visits, pv) =>
val isNewVisit = visits.lastOption flatMap (_.lastOption) map {
prevPV => pv - prevPV > N
} getOrElse true
if (isNewVisit) {
visits :+ Seq(pv)
} else {
visits.init :+ (visits.last :+ pv)
}
}
}
Here, we’re starting with an empty list of visits and folding it over the sorted pageviews. At each pageview, we decide if we need to start a new visit. If so, we append a new visit containing the pageview to the accumulated visits. If not, we pop off the last visit, append the pageview, and put the last visit back on the tail of the accumulated visits.
One part that’s still a bit messy is comparing the current timestamp to the previous one. We can improve that by iterating through the intervals between pageviews instead of the actual pageviews.
def slidingThroughIt(pageviews: Seq[Long]): Seq[Seq[Long]] = {
val intervals = (0L +: pageviews.sorted).sliding(2)
(Seq[Seq[Long]]() /: intervals) {
(visits, interval) =>
if (interval(1) - interval(0) > N) {
visits :+ Seq(interval(1))
} else {
visits.init :+ (visits.last :+ interval(1))
}
}
}
Here, we’re prepending a “0L” timestamp (and assuming that none of the pageviews happened in the early 70s) and using the “sliding” method to pair each timestamp with the previous one.
So far, we’ve been using a sequence of pageviews as a visit. What happens if we add an explicit Visit type? This lets us convert all pageviews into Visits at the start, then focus on merging overlapping Visits. One nice benefit is that this is a map-reduce algorithm that can be easily parallelized instead of one that must sequentially iterate over the pageviews (either explicitly or with a fold).
case class Visit(start: Long, end: Long, pageviews: Seq[Long]) {
def +(other: Visit): Visit = {
Visit(min(start,other.start), max(end, other.end),
(pageviews ++ other.pageviews).sorted)
}
}
def doingItMapReduceStyle(pageviews: Seq[Long]): Seq[Visit] = {
pageviews.par map { pv =>
Seq(Visit(pv, pv+N, Seq(pv))
} reduce { (visit1, visit2) =>
val sortedVisits = (v1 ++ v2) sortBy (_.start)
(Seq[Visit]() /: sortedVisits) { (visits, next) =>
if (visits.lastOption map(_.end >= next.start) getOrElse false)
{
visits.init :+ (visits.last + visit)
} else {
visits :+ visit
}
}
}
}
The map-reduce solution is fun, but in a production system, I’d probably stick with the sliding variation and add a bit more flexibility to track actual pageview objects instead of just timestamps.
