Reservoir sampling for bounded and unbounded data

Every time when I see a new thing, I try to note it somewhere and come back later. The "later" is driven by how many times I will meet that thing. More often I see it in the books or conferences, earlier I deep delve into it. And that's the story of this post about reservoir sampling algorithm I met twice during last month.

The post starts with an explanation of the idea behind reservoir sampling. I won't deep delve into mathematical details because there are plenty of other places where they're perfectly explained. In the next section, I will present you a sample implementation of the algorithm in Scala. Two last parts will be dedicated to the places which mentioned reservoir sampling and made me write this post earlier than I thought.

Definition

Reservoir sampling is an example of randomized algorithm. It tries to solve the problem of an efficient sampling of N elements from a dataset. Said like this, problem statement seems simple but in fact, it's more complicated than because of the dataset size. It can be either very huge or unbounded and therefore materializing it to get the samples will be impossible. To address these issues in memory efficient manner, reservoir sampling was created.

Reservoir sampling algorithm is composed of the following steps:

The idea is twofold. First, it consists of not materializing the dataset. In order to do that, you can use an iterator. Next, the second goal is to provide a uniform distribution of the sampled data. It's guaranteed by the property that for i items selected so far, any new item has k/i chance to be selected in its turn.

The nice thing of reservoir sampling is the complexity which, as you will see in the next section, is O(n).

Scala implementation

According to the algorithm's schema described above, I implemented reservoir sampling as an exercice in the following code:

object ReservoirSampling {

  def sample(numberToSample: Int, source: Iterator[Int]): Seq[Int] = {
    val reservoir = new Array[Int](numberToSample)
    var currentIndex = 0
    while (source.hasNext) {
      val nextNumber = source.next()
      if (currentIndex < numberToSample) {
        reservoir(currentIndex) = nextNumber
      } else {
        val index = ThreadLocalRandom.current().nextInt(0, currentIndex)
        if (index < numberToSample) {
          reservoir(index) = nextNumber
        }
      }
      currentIndex += 1
    }
    reservoir
  }

}

As you can see, the sampling is done in a single pass with a very small memory footprint thanks to the use of lazy evaluated iterator. Just a short test to confirm the feature and I will go to the next section where I will speak about 2 use cases I found during my data exploration:

class ReservoirSamplingTest extends FlatSpec with Matchers {

  "reservoir sampling" should "sample different distribution in 2 runs" in {
    val sampled1 = ReservoirSampling.sample(10, (1 to 100000).toIterator)
    val sampled2 = ReservoirSampling.sample(10, (1 to 100000).toIterator)

    sampled1 should have size 10
    sampled1 should not contain(sampled2)
  }

}

Reservoir sampling in Apache Spark SQL

Apache Spark uses reservoir sampling during the generation of values for range partitioning. I wrote about it in Range partitioning in Apache Spark SQL post. The algorithm is implemented as reservoirSampleAndCount method in org.apache.spark.util.random.SamplingUtils object. The implementation is very similar to the one I proposed above. The difference is that it's optimized to small values too.

Initially, I also have been thinking that reservoir sampling was used for RDD's takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong). However, this part uses another function exposed by SamplingUtils, computeFractionForSampleSize(sampleSizeLowerBound: Int, total: Long, withReplacement: Boolean) which returns a fraction for sample passed later to one of sampling algorithms (PoissonSampler or BernoulliSampler).

Reservoir sampling in streaming

I found the second mention of reservoir sampling when I was reading "Streaming Data" by Andrew G. Psaltis. The gives an example of reservoir sampling in the context of streaming data processing.

In this context of unbounded data, reservoir sampling is implemented in the same way as for bounded data. First, it fills up the reservoir with the first X elements and later, for every new incoming element, it determines whether it can or cannot replace one of existent reservoir elements. The author gives an example of a random sample from a stream in an advertising network to perform statistical analysis on a subset of data.

If you haven't used reservoir sampling yet, maybe there is a good moment to make a try? As you can see in this post, the algorithm itself is quite straightforward and implementing it in your pipeline even for a simple try shouldn't be a big deal.