Window-based transformations in Spark Streaming

Versions: Spark 2.0.0

Regarding to batch-oriented processing in Spark, new transformation types in Spark Streaming are based on time periods.

The first part describes shortly basic parameters and way of working of window operations in Spark Streaming. After, the next 3 parts describe 3 types of windows we can create in Spark Streaming: tumbling, larger and not overlapping. Each of them will be described and tested in separated sections. The naming rules are borrowed from Spark Streaming's tests of window-related features.

Introduction to window transformations

Window transformations are specific to Spark Streaming because they operate on a set of X recent RDDs. The X parameter tells on how many items (RDDs) we want to work. If instead of X we put 3 seconds, it means that the 3 last seconds of data will be taken into account. Specifically in Spark Streaming this parameter is called window duration.

But window duration is not an unique parameter. The second, let's call it Y, represents the frequency of data (RDD) creation. Concretely, this parameter can tell how often collected data should be transformed. If instead of Y we specify 2 seconds, it means that every 2 seconds something (for instance reduce transformation) will happen with this data. In Spark Streaming this parameter is called slide duration.

When we work with window operations in Spark Streaming, we don't need to define both parameters. We can only specify window duration. In this case the default slide duration corresponds to the batch interval specified in streaming context. De facto both parameters are specified every time, even if we don't specify slide duration explicitly. Why ?

We can think about slide duration as a trigger of RDD creation. Every time when new RDD is created, Spark checks if some of previously constructed RDD match to window duration. For instance, when slide duration is 2 seconds and window duration 4, at 2nd second we'll get all RDDs created between -2 and 2 seconds, at 4th second the ones created beteween 0 and 4, at 6th created between 2 and 6 and so on.

Keep in mind that to access previous RDDs, Spark must keeps their track in cache. It's visible in a DStream representing window operations, WindowedDStream:

 
// Persist parent level by default, as those RDDs are going to be obviously reused.
parent.persist(StorageLevel.MEMORY_ONLY_SER)

Please note also that Spark provides 3 methods to construct windowed DStream: window(), reduceByWindow(...) and countByWindow(...). The first one builds windowed DStream on specified intervals. reduceByWindow(...) builds the stream on specified intervals but also applies a reduce function to it. The last method builds and counts the number of generated elements in each generated RDD.

Each of these methods accepts window and slide duration. Window duration must be specified every time and its value must correspond to the multiplication of batch. If slide duration is specified, it also has be a integral multiple of batch interval (so for batch interval of 500ms valid durations will be 500, 1000, 1500, 2000 and so on).

Tumbling window in Spark Streaming

Tumbling window is the "ideal" window because window and slide duration are the same. It means that we'll receive ideal windows, as in below image:

Every time when slide duration is reached, we go back to collect RDDs created from the last action. Below you can find some tests illustrating the concept:

@Test
public void should_generate_tumbling_windows() throws IOException, InterruptedException {
  triggerDataCreation(0, 6, batchContext, testQueue);

  JavaInputDStream<Integer> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME);

  JavaDStream<Integer> windowedDStream = queueDStream.window(Durations.seconds(2),
    Durations.seconds(2));

  List<List<Integer>> windows = new ArrayList<>();
  windowedDStream.foreachRDD(rdd -> putIfDefined(rdd.collect(), windows));
  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(5_000L);

  // It proves that tumbling window is a 'perfect' window where
  // items doesn't overlap and aren't lost.
  assertThat(windows).hasSize(2);
  assertThat(windows).containsOnly(
    Arrays.asList(0, 1, 2, 3),
    Arrays.asList(4, 5)
  );
}

@Test
public void should_count_entries_in_tumbling_window() throws IOException, InterruptedException {
  streamingContext.checkpoint(CHECKPOINT_DIR);
  triggerDataCreation(0, 6, batchContext, testQueue);

  JavaInputDStream<Integer> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME);

  JavaDStream<Long> windowedDStream = queueDStream.countByWindow(Durations.seconds(2),
    Durations.seconds(2));

  List<List<Long>> windows = new ArrayList<>();
  windowedDStream.foreachRDD(rdd -> putIfDefined(rdd.collect(), windows));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(8_000L);

  assertThat(windows.get(0)).isEqualTo(singletonList(4L));
  assertThat(windows.get(1)).isEqualTo(singletonList(2L));
  assertThat(windows.get(2)).isEqualTo(singletonList(0L));
}

@Test
public void should_reduce_by_tumbling_window() throws IOException, InterruptedException {
  triggerDataCreation(0, 6, batchContext, testQueue);

  JavaInputDStream<Integer> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME);

  JavaDStream<Integer> windowedDStream = queueDStream.reduceByWindow(
    (value1, value2) -> value1 +  value2,
    Durations.seconds(2),
    Durations.seconds(2)
  );

  List<List<Integer>> windows = new ArrayList<>();
  windowedDStream.foreachRDD(rdd -> putIfDefined(rdd.collect(), windows));
  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(8_000L);

  assertThat(windows).hasSize(2);
  assertThat(windows).containsExactly(singletonList(6), singletonList(9));
}

Larger window in Spark Streaming

In the case of larger window, RDDs don't always fit in unique window. Thus they'll overlap. It's especially visible when window duration is bigger than slide duration. You can take a look on below image and test to see this behavior:

@Test
public void should_generate_larger_window() throws InterruptedException, IOException {
  triggerDataCreation(0, 10, batchContext, testQueue);

  JavaInputDStream<Integer> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME);
  JavaDStream<Integer> windowedDStream = queueDStream.window(Durations.seconds(4),
    Durations.seconds(1));
  List<List<Integer>> windows = new ArrayList<>();
  windowedDStream.foreachRDD(rdd -> {
    putIfDefined(rdd.collect(), windows);
  });

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(8_000L);

  assertThat(windows).hasSize(8);
  assertThat(windows).containsExactly(
      // window computed at 1st second, so for the time interval ([-3;1])
      Arrays.asList(0, 1),
      // time interval ([-2;2])
      Arrays.asList(0, 1, 2, 3),
      // time interval  ([-1;3])
      Arrays.asList(0, 1, 2, 3, 4, 5),
      // time interval  ([0;4])
      Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7),
      // time interval ([1;5])
      Arrays.asList(2, 3, 4, 5, 6, 7, 8, 9),
      // time interval  ([2;6])
      Arrays.asList(4, 5, 6, 7, 8, 9),
      // time interval  ([3;7])
      Arrays.asList(6, 7, 8, 9),
      // time interval ([4;8])
      Arrays.asList(8, 9)
  );
}

@Test
public void should_count_entries_in_larger_window() throws IOException, InterruptedException {
  streamingContext.checkpoint(CHECKPOINT_DIR);
  triggerDataCreation(0, 10, batchContext, testQueue);

  JavaInputDStream<Integer> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME);
  JavaDStream<Long> windowedDStream = queueDStream.countByWindow(Durations.seconds(4),
    Durations.seconds(1));
  List<List<Long>> windows = new ArrayList<>();
  windowedDStream.foreachRDD(rdd -> putIfDefined(rdd.collect(), windows));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(10_000L);

  assertThat(windows.get(0)).isEqualTo(singletonList(2L));
  assertThat(windows.get(1)).isEqualTo(singletonList(4L));
  assertThat(windows.get(2)).isEqualTo(singletonList(6L));
  assertThat(windows.get(3)).isEqualTo(singletonList(8L));
  assertThat(windows.get(4)).isEqualTo(singletonList(8L));
  assertThat(windows.get(5)).isEqualTo(singletonList(6L));
  assertThat(windows.get(6)).isEqualTo(singletonList(4L));
  assertThat(windows.get(7)).isEqualTo(singletonList(2L));
  assertThat(windows.get(8)).isEqualTo(singletonList(0L));
}

@Test
public void should_reduce_by_larger_window() throws IOException, InterruptedException {
  triggerDataCreation(0, 10, batchContext, testQueue);

  JavaInputDStream<Integer> queueDStream = streamingContext.queueStream(testQueue, ONE_AT_TIME);
  JavaDStream<Integer> windowedDStream = queueDStream.reduceByWindow(
    (value1, value2) -> value1 +  value2,
    Durations.seconds(4),
    Durations.seconds(1)
  );
  List<List<Integer>> windows = new ArrayList<>();
  windowedDStream.foreachRDD(rdd -> putIfDefined(rdd.collect(), windows));

  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(9_000L);

  assertThat(windows.get(0)).isEqualTo(singletonList(1));
  assertThat(windows.get(1)).isEqualTo(singletonList(6));
  assertThat(windows.get(2)).isEqualTo(singletonList(15));
  assertThat(windows.get(3)).isEqualTo(singletonList(28));
  assertThat(windows.get(4)).isEqualTo(singletonList(44));
  assertThat(windows.get(5)).isEqualTo(singletonList(39));
  assertThat(windows.get(6)).isEqualTo(singletonList(30));
  assertThat(windows.get(7)).isEqualTo(singletonList(17));
}

Not overlapping window in Spark Streaming

The last type name taken from Spark's tests is not overlapping window. However, we should add that this type can not consume all generated RDDs. It occurs when window duration is shorter than slide duration. Below image and tests show this situation:

@Test
public void should_generate_not_overlapping_window() throws InterruptedException, IOException {
  JavaReceiverInputDStream<Integer> receiverInputDStream =
    streamingContext.receiverStream(new AutoDataMakingReceiver(StorageLevel.MEMORY_ONLY(), 1_000L, 9));

  JavaDStream<Integer> windowedDStream = receiverInputDStream.window(Durations.milliseconds(1_000L),
          Durations.milliseconds(2_000L));
  List<List<Integer>> windows = new ArrayList<>();
  windowedDStream.foreachRDD(rdd -> putIfDefined(rdd.collect(), windows));
  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(8_000L);

  assertThat(windows.get(0)).isEqualTo(singletonList(0));
  assertThat(windows.get(1)).isEqualTo(singletonList(2));
  assertThat(windows.get(2)).isEqualTo(singletonList(4));
  assertThat(windows.get(3)).isEqualTo(singletonList(6));
}

@Test
public void should_count_in_not_overlapping_window() throws InterruptedException, IOException {
  streamingContext.checkpoint(CHECKPOINT_DIR);
  JavaReceiverInputDStream<Integer> receiverInputDStream =
    streamingContext.receiverStream(new AutoDataMakingReceiver(StorageLevel.MEMORY_ONLY(), 1_000L, 9));

  JavaDStream<Long> countByWindowDStream = receiverInputDStream.countByWindow(Durations.milliseconds(1_000L),
          Durations.milliseconds(2_000L));
  List<List<Long>> windowsFromCountByWindow = new ArrayList<>(); ;
  countByWindowDStream.foreachRDD(rdd -> putIfDefined(rdd.collect(), windowsFromCountByWindow));
  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(9_000L);

  // Sometimes it returns a sequence of [1, 3, 5, 7]
  assertThat(windowsFromCountByWindow.get(0)).isEqualTo(singletonList(1L));
  assertThat(windowsFromCountByWindow.get(1)).isEqualTo(singletonList(1L));
  assertThat(windowsFromCountByWindow.get(2)).isEqualTo(singletonList(1L));
  assertThat(windowsFromCountByWindow.get(3)).isEqualTo(singletonList(1L));
}

@Test
public void should_reduce_by_not_overlapping_window() throws IOException, InterruptedException {
  JavaReceiverInputDStream<Integer> receiverInputDStream =
    streamingContext.receiverStream(new AutoDataMakingReceiver(StorageLevel.MEMORY_ONLY(), 1_000L, 9));

  JavaDStream<Integer> windowedDStream = receiverInputDStream.reduceByWindow(
    (value1, value2) -> value1 +  value2,
    Durations.milliseconds(1_000L),
    Durations.milliseconds(2_000L)
  );

  List<List<Integer>> windows = new ArrayList<>();
  windowedDStream.foreachRDD(rdd -> putIfDefined(rdd.collect(), windows));
  streamingContext.start();
  streamingContext.awaitTerminationOrTimeout(8_000L);

  // Sometimes it returns a sequence of [1, 3, 5, 7]
  assertThat(windows.get(0)).isEqualTo(singletonList(0));
  assertThat(windows.get(1)).isEqualTo(singletonList(2));
  assertThat(windows.get(2)).isEqualTo(singletonList(4));
  assertThat(windows.get(3)).isEqualTo(singletonList(6));
}


private static class AutoDataMakingReceiver extends Receiver<Integer> {

  private long sleepingTime;
  private int maxItems;

  public AutoDataMakingReceiver(StorageLevel storageLevel, long sleepingTime, int maxItems) {
    super(storageLevel);
    this.sleepingTime = sleepingTime;
    this.maxItems = maxItems;
  }

  @Override
  public void onStart() {
    try {
      // Give some time to Spark to construct DStream
      Thread.sleep(500L);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      e.printStackTrace();
    }
    for (int i = 0; i < maxItems; i++) {
      store(i);
      try {
        Thread.sleep(sleepingTime);
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        e.printStackTrace();
      }
    }
  }

  @Override
  public void onStop() {
    // Do nothing but you should clean data or close persistent connections here
  }
}

This post describes window-based transformations in Spark Streaming. The first part presented two important parameters in window construction: window duration and slide duration. We also discovered 3 methods to construct DStream abstraction representing data windows (windowed DStream). Next part presented 3 available types of windows: tumbling (the 'perfect' window), larger (overlapping, especially if window duration is at least twice bigger than slide duration) and not overlapping (can contain not consumed RDDs).

If you liked it, you should read: