Late data in Apache Beam

Versions: Apache Beam 2.2.0 https://github.com/bartosz25/beam-learning

Data, especially in streaming applications, can very often arrive on late to the processing pipeline. Despite of that, Apache Beam is able to handle this case pretty easily thanks to watermark mechanism.

This post is another one devoted to window-related features in Apache Beam. After the presentation of window types in Apache Beam it's a good moment to focus on watermark. The first part defines this mechanism. The second section is devoted to the watermark configuration. The next and at the same time, the last one, provides some examples through learning tests.

Watermark

In order to define watermark we need first focus on the aspect of late data. The data is considered to be late when it arrives to the system after the end of its window. For instance let's suppose we've a window storing items for event time included in 2017-01-01 10:00 - 2017-01-01 10:20 interval. Any item having the event time included in this interval but that comes to the system after the window computation is considered to be on late. Thus the lateness can be defined as the item's arrival after the reserved processing time interval.

But a late event doesn't mean lost event. Apache Beam provides a mechanism to deal with late events and even if they arrive after window computation, the framework allows to include them into the final result of given window. This mechanism is called watermark. It's a kind of tracker for the window time that is resumed in the following image:

The late data can be categorized in different groups:

The late data brings another concept - the one of pane. We can distinguish early, on-time and late panes. Each of them contains different kind of results. The first one can store partial results and can be used together with trigger mechanism that will be described in one of further posts. The on-time pane contains all items arrived on time, i.e. within the window boundary. The late pane contains the items arrived at late but before the allowed lateness time elapsed. These 3 types are resumed in the image below:

Watermark configuration

By default late events are rejected from the window. This behavior can be changed through org.apache.beam.sdk.transforms.windowing.Window#withAllowedLateness(org.joda.time.Duration) method. It defines how long late date will be accepted and also how long the state of old window will be kept alive. After expiration of this delay the state will be removed and new-late data rejected.

Moreover, the firing on the final pane (= with late date if any) after permanently closing the processing window, can be controlled through the properties defined in Window.ClosingBehavior enum: FIRE_IF_NON_EMPTY or FIRE_ALWAYS. The first one fires the final pane only when there is new data. The second one ignores the fact of new data and always fires the final pane.

Watermark example

Below examples show sample watermark configuration and two mentioned accumulation modes:

private static final Instant NOW = new Instant(0);
private static final Instant SEC_1_DURATION = NOW.plus(Duration.standardSeconds(1));
private static final Instant SEC_2_DURATION = NOW.plus(Duration.standardSeconds(2));


@Test
public void should_show_unobservably_late_data() {
  Pipeline pipeline = BeamFunctions.createPipeline("Unobservably late data");
  Coder<tring> utfCoder = StringUtf8Coder.of();
  TestStream<String> lettersStream = TestStream.create(utfCoder).addElements(
    TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
    TimestampedValue.of("a", SEC_2_DURATION)
  )
  .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  Window<String> window = Window.<String>into(FixedWindows.of(windowDuration))
    .withAllowedLateness(Duration.standardSeconds(5), Window.ClosingBehavior.FIRE_ALWAYS)
    .accumulatingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, lettersStream);

  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(Duration.standardSeconds(5)));
  PAssert.that(results).inOnTimePane(window1).containsInAnyOrder("a=3");
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=3");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_discard_late_data_with_default_configuration() {
  Pipeline pipeline = BeamFunctions.createPipeline("Discarded late data");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> lettersStream = TestStream.create(utfCoder).addElements(
    TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
    TimestampedValue.of("a", SEC_1_DURATION)
  )
  // the window is now+5 sec, advance to 6'' and add one late event that should be discarded
  .advanceWatermarkTo(NOW.plus(Duration.standardSeconds(6)))
  .addElements(TimestampedValue.of("a", SEC_1_DURATION))
  .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  Window<String> window = Window.into(FixedWindows.of(windowDuration));

  PCollection<String> results = applyCounter(pipeline, window, lettersStream);

  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(Duration.standardSeconds(5)));
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=3");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_accept_late_data_with_allowed_lateness_and_accumulation_mode() {
  // It's an example of observably late data, i.e. late data arriving after the end of the window but before
  // the allowed lateness time elapses
  Pipeline pipeline = BeamFunctions.createPipeline("Accepted late data with accepted previous panes");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> lettersStream = TestStream.create(utfCoder).addElements(
    TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
    TimestampedValue.of("a", SEC_1_DURATION)
  )
  // the window is now+5 sec, advance to 6'' and add one late event that should be accepted
  .advanceWatermarkTo(NOW.plus(Duration.standardSeconds(6)))
  .addElements(TimestampedValue.of("a", SEC_1_DURATION))
  .advanceWatermarkTo(NOW.plus(Duration.standardSeconds(11)))
  .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  Window<String> window = Window.<String>into(FixedWindows.of(windowDuration))
    // closing behavior defines the conditions under which the final pane will be computed
    // FIRE_ALWAYS fires the creation always - even if there is not new data since the last firing
    .withAllowedLateness(Duration.standardSeconds(5), Window.ClosingBehavior.FIRE_ALWAYS)
    .accumulatingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, lettersStream);

  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(Duration.standardSeconds(5)));
  PAssert.that(results).inOnTimePane(window1).containsInAnyOrder("a=3");
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=4");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_accept_late_data_but_discard_the_data_after_watermark() {
  // This is an example of droppably late data, i.e. data arriving after the window and allowed lateness
  Pipeline pipeline = BeamFunctions.createPipeline("Accepted late data but only within watermark");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> lettersStream = TestStream.create(utfCoder).addElements(
    TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
    TimestampedValue.of("a", SEC_1_DURATION)
  )
  // the window is now+5 sec, advance to 15'' and add one late event that should be discarded
  .advanceWatermarkTo(NOW.plus(Duration.standardSeconds(15)))
  .addElements(TimestampedValue.of("a", SEC_1_DURATION))
  .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  Window<String> window = Window.<String>into(FixedWindows.of(windowDuration))
    .withAllowedLateness(Duration.standardSeconds(5), Window.ClosingBehavior.FIRE_ALWAYS)
    .accumulatingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, lettersStream);

  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(Duration.standardSeconds(5)));
  PAssert.that(results).inOnTimePane(window1).containsInAnyOrder("a=3");
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=3");
  pipeline.run().waitUntilFinish();
}

private static PCollection<String> applyCounter(Pipeline pipeline, Window<String> window,
                                              TestStream<String> inputCollection) {
  return pipeline.apply(inputCollection)
    .apply(window)
    .apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
            .via((String letter) -> KV.of(letter, 1)))
    .apply(Count.perKey())
    .apply(MapElements.into(TypeDescriptors.strings()).via((KV<String, Long> pair) ->
      pair.getKey() + "=" + pair.getValue()));
}

As proven in this post the watermark helps to improve the completness of accumulated data. It's especially useful when we know that a lot of on late data can't be simply discarded. As told in the first section, the watermark keeps window open during some more time than initially defined in the window.