Windows in Apache Beam

Versions: Apache Beam 2.2.0 https://github.com/bartosz25/beam-learning

As mentioned in one of the first posts about Apache Beam, the concept of window is a key element in its data processing logic. Even for bounded data a default window called global is defined. For the unbounded one the variety of windows is much bigger.

This post introduces the idea of windows in Apache Beam. It's divided in 6 parts. The first section explains how the items are assigned to the windows. The subsequent ones make an insight on available window types: fixed-time, sliding, session, global and finally calendar-based. All sections are terminated by several learning tests that should help to understand the described window.

All tests share the same object used to show which items are assigned to given window:

class DataPerWindowHandler extends DoFn<String, String> {

  private WindowHandlers windowHandler;

  public DataPerWindowHandler(WindowHandlers windowHandler) {
    this.windowHandler = windowHandler;
  }

  @ProcessElement
  public void processElement(ProcessContext processContext, BoundedWindow window) throws InterruptedException {
    IntervalWindow intervalWindow = (IntervalWindow) window;
    windowHandler.addItem(processContext.element(), intervalWindow.start());
    processContext.output(processContext.element());
  }

}

enum WindowHandlers {

  FIXED_TIME, SLIDING_TIME_DUPLICATED, SLIDING_TIME_MISSING, GLOBAL;

  private Map<Instant, List<String>> itemsPerWindow = new HashMap<>();

  public void addItem(String item, Instant windowInterval) {
    List<String> windowItems = itemsPerWindow.getOrDefault(windowInterval, new ArrayList<>());
    windowItems.add(item);
    itemsPerWindow.put(windowInterval, windowItems);
  }

  public Map<Instant, List<String>> getItemsPerWindow() {
    return itemsPerWindow;
  }

}

Items assignement

The items from bounded and unbounded datasets are injected to appropriate window thanks to the property called event time. This attribute tells when given event was produced. For instance, it can be a time when the event was created on IoT device, just before sending it to the data processing pipeline.

How this event time is resolved ? The simplest manner consists on constructing timestamped objects called TimestampedValue. However, it's not always possible - especially for big datasets. The alternative for this simple solution is the explicit definition of event time property. And for some of major data sources we can retrieve this possibility:

Event time property doesn't change in subsequent transforms. However it can be explicitly overridden by org.apache.beam.sdk.transforms.DoFn.WindowedContext#outputWithTimestamp(OutputT, org.joda.time.Instant) method. Some of the following tests show the mentioned properties of event time:

private static final String FILE_1 = "/tmp/beam/1";
private static final String FILE_2 = "/tmp/beam/2";

@BeforeClass
public static void writeFiles() throws IOException {
  FileUtils.writeStringToFile(new File(FILE_1), "1\n2\n3\n4", "UTF-8");
  FileUtils.writeStringToFile(new File(FILE_2), "5\n6\n7\n8", "UTF-8");
}

@AfterClass
public static void deleteFiles() {
  FileUtils.deleteQuietly(new File(FILE_1));
  FileUtils.deleteQuietly(new File(FILE_2));
}

@Test
public void should_detect_infinity_event_time_for_bounded_collections() {
  Pipeline pipeline = BeamFunctions.createPipeline("Infinity event time for bounded collection");

  TextIO.Read reader = TextIO.read().from("/tmp/beam/*");
  PCollection<String> readNumbers = pipeline.apply(reader).apply(Window.into(FixedWindows.of(new Duration(10))))
    .apply(ParDo.of(new DoFn<String, String>() {
      @ProcessElement
      public void processElement(ProcessContext processContext) {
        assertThat(processContext.timestamp()).isEqualTo(BoundedWindow.TIMESTAMP_MIN_VALUE);
        processContext.outputWithTimestamp(processContext.element(), processContext.timestamp());
      }
    }));

  pipeline.run().waitUntilFinish();
}

@Test
public void should_change_event_time_for_collection_elements() {
  Pipeline pipeline = BeamFunctions.createPipeline("Event time changed");
  Instant now = new Instant(0);
  Instant sec1Duration = now.plus(Duration.standardSeconds(1));
  Instant sec2Duration = now.plus(Duration.standardSeconds(2));
  PCollection<String> timestampedLetters = pipeline.apply(Create.timestamped(Arrays.asList(
    TimestampedValue.of("a", sec1Duration), TimestampedValue.of("a", sec1Duration),
    TimestampedValue.of("a", sec1Duration), TimestampedValue.of("b", sec2Duration),
    TimestampedValue.of("a", sec1Duration), TimestampedValue.of("a", sec1Duration),
    TimestampedValue.of("a", sec1Duration)
  )));
  Duration windowDuration = Duration.standardSeconds(1);
  Window<String> window = Window.into(FixedWindows.of(windowDuration));
  PCollection<String> countResult = timestampedLetters.apply(ParDo.of(new DoFn<String, String>() {
    @ProcessElement
    public void processElement(ProcessContext processContext) {
      processContext.outputWithTimestamp(processContext.element(), processContext.timestamp().plus(1));
    }
  }))
  .apply(window)
  .apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
      .via((String letter) -> KV.of(letter, 1)))
  .apply(Count.perKey())
  .apply(MapElements.into(TypeDescriptors.strings()).via((KV<String, Long> pair) ->
    pair.getKey() + "=" + pair.getValue()));

  IntervalWindow window2 = new IntervalWindow(now.plus(windowDuration), now.plus(windowDuration).plus(windowDuration));
  PAssert.that(countResult).inFinalPane(window2).containsInAnyOrder("a=6");
  IntervalWindow window3 = new IntervalWindow(window2.end(), window2.end().plus(windowDuration));
  PAssert.that(countResult).inFinalPane(window3).containsInAnyOrder("b=1");
  pipeline.run().waitUntilFinish();
}

Fixed-time window

This kind of window is the easiest in streaming processing. It creates the windows of equal size, one by one. For instance if a window duration is 5 seconds, the fixed-time window will create the windows for time intervals: 0-5 sec, 5-10 sec, 10-15 sec and so on. As you can see, there is no data lost since the windows are created one after another. The following test shows that pretty well:

@Test
public void should_construct_fixed_time_window() {
  Pipeline pipeline = BeamFunctions.createPipeline("Fixed-time window");
  PCollection<String> timestampedLetters = pipeline.apply(Create.timestamped(Arrays.asList(
    TimestampedValue.of("a1", new Instant(1)), TimestampedValue.of("a3", new Instant(1)),
    TimestampedValue.of("a2", new Instant(1)), TimestampedValue.of("b1", new Instant(2)),
    TimestampedValue.of("a4", new Instant(1)), TimestampedValue.of("c1", new Instant(3)),
    TimestampedValue.of("a6", new Instant(1)), TimestampedValue.of("d1", new Instant(4)),
    TimestampedValue.of("d2", new Instant(4)), TimestampedValue.of("a5", new Instant(1))
  )));
  PCollection<String> windowedLetters = timestampedLetters.apply(Window.into(FixedWindows.of(new Duration(1))));
  windowedLetters.apply(ParDo.of(new DataPerWindowHandler(WindowHandlers.FIXED_TIME)));

  pipeline.run().waitUntilFinish();

  Map<Instant, List<String>> itemsPerWindow = WindowHandlers.FIXED_TIME.getItemsPerWindow();
  List<String> itemsInstant1 = itemsPerWindow.get(new Instant(1));
  assertThat(itemsInstant1).hasSize(6).containsOnly("a1", "a2", "a3", "a4", "a5", "a6");
  List<String> itemsInstant2 = itemsPerWindow.get(new Instant(2));
  assertThat(itemsInstant2).hasSize(1).containsOnly("b1");
  List<String> itemsInstant3 = itemsPerWindow.get(new Instant(3));
  assertThat(itemsInstant3).hasSize(1).containsOnly("c1");
  List<String> itemsInstant4 = itemsPerWindow.get(new Instant(4));
  assertThat(itemsInstant4).hasSize(2).containsOnly("d1", "d2");
}

Sliding window

The second category of windows is called sliding. It means that the window keeps fixed-time but it's triggered at time different than 0. It leads to 1 of 2 possible situations. The window created at time t+1 can contain some items from the window t (aka: overlapping window, larger window). This situation occurs when the window time is greater than the time triggering the window materialization. The second case concerns the case when the t+2 window misses some items injected to the system between t+1 and t+2. It's produced when the window time is lower than the time triggering the window materialization.

The windows mentioned above can be resumed in the images used to explain window-based transformations in Spark Streaming:

The following 2 tests show how sliding window can be translated to Apache Beam's code:

@Test
public void should_construct_sliding_time_window_with_duplicated_items() {
  Pipeline pipeline = BeamFunctions.createPipeline("Sliding-time window with duplicated items");
  PCollection<String> timestampedLetters = pipeline.apply(Create.timestamped(Arrays.asList(
      TimestampedValue.of("a1", new Instant(1)), TimestampedValue.of("a2", new Instant(1)),
      TimestampedValue.of("a3", new Instant(1)), TimestampedValue.of("b1", new Instant(2)),
      TimestampedValue.of("c1", new Instant(3)), TimestampedValue.of("d1", new Instant(4)),
      TimestampedValue.of("d2", new Instant(4)), TimestampedValue.of("a4", new Instant(1))
  )));

  PCollection<String> windowedLetters = timestampedLetters
    .apply(Window.into(SlidingWindows.of(new Duration(2)).every(new Duration(1))));
  windowedLetters.apply(ParDo.of(new DataPerWindowHandler(WindowHandlers.SLIDING_TIME_DUPLICATED)));

  pipeline.run().waitUntilFinish();

  Map<Instant, List<String>> itemsPerWindow = WindowHandlers.SLIDING_TIME_DUPLICATED.getItemsPerWindow();
  List<String> itemsInstant0 = itemsPerWindow.get(new Instant(0));
  assertThat(itemsInstant0).hasSize(4).containsOnly("a1", "a2", "a3", "a4");
  List<String> itemsInstant1 = itemsPerWindow.get(new Instant(1));
  assertThat(itemsInstant1).hasSize(5).containsOnly("a1", "a2", "a3", "a4", "b1");
  List<String> itemsInstant2 = itemsPerWindow.get(new Instant(2));
  assertThat(itemsInstant2).hasSize(2).containsOnly("b1", "c1");
  List<String> itemsInstant3 = itemsPerWindow.get(new Instant(3));
  assertThat(itemsInstant3).hasSize(3).containsOnly("c1", "d1", "d2");
  List<String> itemsInstant4 = itemsPerWindow.get(new Instant(4));
  assertThat(itemsInstant4).hasSize(2).containsOnly("d1", "d2");
}

@Test
public void should_construct_sliding_time_window_with_missing_items() {
  Pipeline pipeline = BeamFunctions.createPipeline("Sliding-time window with missing items");
  PCollection<String> timestampedLetters = pipeline.apply(Create.timestamped(Arrays.asList(
    TimestampedValue.of("a1", new Instant(1)), TimestampedValue.of("a2", new Instant(1)),
    TimestampedValue.of("a3", new Instant(1)), TimestampedValue.of("b1", new Instant(2)),
    TimestampedValue.of("c1", new Instant(3)), TimestampedValue.of("d1", new Instant(4)),
    TimestampedValue.of("d2", new Instant(4)), TimestampedValue.of("a4", new Instant(1))
  )));

  PCollection windowedLetters = timestampedLetters
    .apply(Window.into(SlidingWindows.of(new Duration(1)).every(new Duration(2))));
  windowedLetters.apply(ParDo.of(new DataPerWindowHandler(WindowHandlers.SLIDING_TIME_MISSING)));

  pipeline.run().waitUntilFinish();

  Map<Instant, List<String>> itemsPerWindow = WindowHandlers.SLIDING_TIME_MISSING.getItemsPerWindow();
  List<String> itemsInstant2 = itemsPerWindow.get(new Instant(2));
  assertThat(itemsInstant2).hasSize(1).containsOnly("b1");
  List<String> itemsInstant4 = itemsPerWindow.get(new Instant(4));
  assertThat(itemsInstant4).hasSize(2).containsOnly("d1", "d2");
}

Session window

This category of windows is useful when the data is distributed irregularly over the time. For instance it fits pretty well when during some time we have a lot of events, later we have no data for it, and finally the events arrive once again after some time of absence. The data is accumulated in the session window before reaching some time threshold called gap duration. The values arriving after this time are put into the next session window and so on. It's important to emphasize that this type of window is key-based, i.e. each key will have its own session window:

@Test
public void should_construct_session_window_for_key_value_elements() {
  Pipeline pipeline = BeamFunctions.createPipeline("Session window example for key-value elements");
  PCollection<KV<String, String>> timestampedLetters = pipeline.apply(Create.timestamped(Arrays.asList(
    TimestampedValue.of(KV.of("a", "a1"), new Instant(1)), 
    TimestampedValue.of(KV.of("a", "a2"), new Instant(1)),
    TimestampedValue.of(KV.of("a", "a3"), new Instant(5)), 
    TimestampedValue.of(KV.of("b", "b1"), new Instant(2)),
    TimestampedValue.of(KV.of("c", "c1"), new Instant(3)), 
    TimestampedValue.of(KV.of("d", "d1"), new Instant(4)),
    TimestampedValue.of(KV.of("d", "d2"), new Instant(4)), 
    TimestampedValue.of(KV.of("a", "a4"), new Instant(5))
  )));

  PCollection<KV<String, String>> windowedLetters = timestampedLetters
    .apply(Window.into(Sessions.withGapDuration(new Duration(2))));

  IntervalWindow window1 = new IntervalWindow(new Instant(1), new Instant(3));
  PAssert.that(windowedLetters).inWindow(window1).containsInAnyOrder(KV.of("a", "a1"), KV.of("a", "a2"));
  IntervalWindow window2 = new IntervalWindow(new Instant(2), new Instant(4));
  PAssert.that(windowedLetters).inWindow(window2).containsInAnyOrder(KV.of("b", "b1"));
  IntervalWindow window3 = new IntervalWindow(new Instant(3), new Instant(5));
  PAssert.that(windowedLetters).inWindow(window3).containsInAnyOrder(KV.of("c", "c1"));
  IntervalWindow window4 = new IntervalWindow(new Instant(4), new Instant(6));
  PAssert.that(windowedLetters).inWindow(window4).containsInAnyOrder(KV.of("d", "d1"), KV.of("d", "d2"));
  // As you can see, the session is key-based. Normally the [a3, a4] fits into window4. However, as their
  // window is constructed accordingly to "a" session, i.e. 1-3, 4 (no data), 5 - 7, they go to the window 5 - 7
  // and not 4 - 6
  IntervalWindow window5 = new IntervalWindow(new Instant(5), new Instant(7));
  PAssert.that(windowedLetters).inWindow(window5).containsInAnyOrder(KV.of("a", "a3"), KV.of("a", "a4"));
  pipeline.run().waitUntilFinish();
}

Global window

Maybe the simplest one to explain. It's a single one window where all elements are assigned. Data processed in this window is similar to batch-oriented processing. But it requires several attention points. First, if an unbounded data is put to the global window, we'd carefully use aggregations (group by key, combine). By default they require all data to be available before processing which is not possible with unbounded dataset (can be fixed with non-default trigger). And since the amount of stored data can be big, we'd take care about the resources use. The global window in the code looks pretty simply:

@Test
public void should_construct_global_window_for_key_value_elements() {
  Pipeline pipeline = BeamFunctions.createPipeline("Global window example for key-value elements");
  PCollection<KV<String, String>> timestampedLetters = pipeline.apply(Create.timestamped(Arrays.asList(
    TimestampedValue.of(KV.of("a", "a1"), new Instant(1)), 
    TimestampedValue.of(KV.of("a", "a2"), new Instant(1)),
    TimestampedValue.of(KV.of("a", "a3"), new Instant(5)), 
    TimestampedValue.of(KV.of("b", "b1"), new Instant(2)),
    TimestampedValue.of(KV.of("c", "c1"), new Instant(3)), 
    TimestampedValue.of(KV.of("d", "d1"), new Instant(4)),
    TimestampedValue.of(KV.of("d", "d2"), new Instant(4)), 
    TimestampedValue.of(KV.of("a", "a4"), new Instant(5))
  )));

  PCollection<KV<String, String>> windowedLetters = timestampedLetters
    .apply(Window.into(new GlobalWindows()));

  PAssert.that(windowedLetters).inWindow(GlobalWindow.INSTANCE).containsInAnyOrder(KV.of("a", "a1"),
    KV.of("a", "a2"), KV.of("a", "a3"), KV.of("a", "a4"), KV.of("b", "b1"), KV.of("c", "c1"),
    KV.of("d", "d1"), KV.of("d", "d2"));
  pipeline.run().waitUntilFinish();
}

Calendar-based window

The last category of window is based on calendar. It helps to group the items by date, month or year. It's pretty straightforward to implement since it's the user who defines when the window is materialized. An example of calendar-based window can be a window o 6 months, starting the 1st day of month and beginning in January 2017. It could produce the windows for 01.01.2017 - 30.06.2017, 01.07.2017 - 31.12.2017, 01.01.2018 - 30.06.2018 and so on.

The following test should help to understand this type of window:

@Test
public void should_construct_calendar_based_window() throws InterruptedException {
  Pipeline pipeline = BeamFunctions.createPipeline("Calendar-based window");
  Instant day01012017 = DateTime.parse("2017-01-01T11:20").toInstant();
  Instant day01052017 = DateTime.parse("2017-05-01T11:20").toInstant();
  Instant day01062017 = DateTime.parse("2017-06-01T11:20").toInstant();
  Instant day02062017 = DateTime.parse("2017-06-02T11:20").toInstant();
  Instant day01072017 = DateTime.parse("2017-07-01T11:20").toInstant();
  Instant day02072017 = DateTime.parse("2017-07-02T11:20").toInstant();
  Instant day01122017 = DateTime.parse("2017-12-01T11:20").toInstant();
  Instant day31122017 = DateTime.parse("2017-12-31T11:20").toInstant();
  Instant day01012018 = DateTime.parse("2018-01-01T11:20").toInstant();
  PCollection<String> timestampedLetters = pipeline.apply(Create.timestamped(Arrays.asList(
    TimestampedValue.of("01012017", day01012017), TimestampedValue.of("01052017", day01052017),
    TimestampedValue.of("01062017", day01062017), TimestampedValue.of("01072017", day01072017),
    TimestampedValue.of("01122017", day01122017), TimestampedValue.of("31122017", day31122017),
    TimestampedValue.of("01012018", day01012018), TimestampedValue.of("02062017", day02062017),
    TimestampedValue.of("02072017", day02072017)
  )));

  CalendarWindows.MonthsWindows monthsWindow = CalendarWindows.months(6).beginningOnDay(1).withStartingMonth(2017, 1);
  PCollectionvString> windowedLetters = timestampedLetters.apply(Window.into(monthsWindow));
  windowedLetters.apply(ParDo.of(new DataPerWindowHandler(WindowHandlers.CALENDAR)));

  pipeline.run().waitUntilFinish();
  Map<Instant, List<String>> itemsPerWindow = WindowHandlers.CALENDAR.getItemsPerWindow();
  assertThat(itemsPerWindow).hasSize(3);
  List<String> itemsWindow1 = itemsPerWindow.get(DateTime.parse("2017-01-01T00:00:00.000Z").toInstant());
  assertThat(itemsWindow1).hasSize(4).containsOnly("01012017", "01052017", "01062017", "02062017");
  List<String> itemsWindow2 = itemsPerWindow.get(DateTime.parse("2017-07-01T00:00:00.000Z").toInstant());
  assertThat(itemsWindow2).hasSize(4).containsOnly("01122017", "01072017", "31122017", "02072017");
  List<String> itemsWindow3 = itemsPerWindow.get(DateTime.parse("2018-01-01T00:00:00.000Z"));
  assertThat(itemsWindow3).hasSize(1).containsOnly("01012018");
}

This post introduces the concept of windows in Apache Beam. Its first part explains how the particular items are assigned to created windows. As we could see, everything is based on a property called event time that most of the time represents the time at which given event occurred. The latter parts described available window types: fixed-time where the windows of the same size were created regularly, sliding window where windows could contain duplicated data or miss some of them. Later were described session and global windows, storing respectively similar items and all items. Finally calendar-based that creates day, month or yearly windows.