Triggers in Apache Beam

on waitingforcode.com

Triggers in Apache Beam

Another important point of windowing in Apache Beam concerns triggers. Thanks to them we can freely control when the window results are computed.

In this post we'll discover triggers mechanism. The first section explains in what it consists and what are available triggers. The second part introduces the Java API methods helping to parameterize the triggers. The last section will be devoted to learning tests showing how to use triggers in the code.

Triggers definition

The window results can be fired in different moments. If we accept partial results, they can be computed before the end of the window. It'll produce early results. The late events can also be included in the final pane thanks to the watermark mechanism (more about it in the post late data in Apache Beam). But in both cases the moment where the pane is fired is controlled by the mechanism called triggers.

Apache Beam comes with 4 categories of triggers:

  • event time - as the name indicates, these triggers are based on element's event time property. Obviously the trigger using this attribute is watermark trigger. It triggers the computation x time after the window end where x is the value of the watermark (can be 0 which means that late events are discarded). The watermark trigger can be used in conjunction with other triggers for the parts of early and late results.
  • processing time - this trigger is based on processing time. It produces the window x time after processing the first element, where x is the value of processing time watermark.
  • data-driven - here the trigger uses the number of arrived data to make the computation. For instance if the triggers tells to compute the results after 2 elements in the pane, it'll be launched every 2 arrived elements.
  • composite - it allows to combine different types of triggers with predicates such as: all of (all triggers conditions must be met), first (the first met condition will launch the trigger).

Triggers control what happens with already fired panes through a structure represented by WindowingStrategy.AccumulationMode enum. Depending on chosen entry, already fired panes can be either discarded (DISCARDING_FIRED_PANES) or combined (ACCUMULATING_FIRED_PANES).

Triggers API

The trigger applied in the pipeline is defined through org.apache.beam.sdk.transforms.windowing.Window#triggering(Trigger trigger) method. The Trigger passed in its parameter is an abstract class. Among its implementations we can distinguish:

  • AfterAll - the example of composite trigger. It fires when all sub-triggers defined through of(List<Trigger> triggers) method are ready.
  • AfterEach - the sub-triggers are defined in inOrder(List<Trigger> triggers) method. The sub-trigger are executed in order, one by one.
  • AfterFirst - executes when at least one of defined sub-triggers fires. As AfterAll trigger, AfterFirst also defines the sub-triggers in of(...) method.
  • AfterPane - it's an example of data-driven trigger. It uses elementCountAtLeast(int countElems) method to define the minimal number of accumulated items before executing the trigger. It's important to know that even if this threshold is never reached, the trigger can execute for the lower number.
  • AfterProcessingTime - as the name indicates, it's an example of processing time-based trigger. It defines 2 methods to control trigger firing. The first one is called plusDelayOf(final Duration delay). It defines the interval of time during which new elements are accumulated. The second method, alignedTo(final Duration period, final Instant offset) does the same but in additional it adds specified period to the defined offset. Thus if we define the offset at 2017-01-01 10:00 and we allow the period of 4 minutes, it'll accept the data betweenn 10:00 and 10:04.
  • AfterWatermark - its method pastEndOfWindow() creates a trigger firing the pane after the end of the window. It also has more fine-grained access because it allows the definition for early results (withEarlyFirings(OnceTrigger earlyFirings), produced before the end of the window) and late results (withLateFirings(OnceTrigger lateFirings), produced after the end of the window and before the end of watermark).
  • DefaultTrigger - it's the class used by default that is an equivalent to repeatable execution of AfterWatermark trigger.
  • NeverTrigger - the pane is fired only after the passed window plus allowed lateness delay.
  • OrFinallyTrigger - it's a special kind of trigger constructed through Trigger's orFinally(OnceTrigger until) method. With the call to this method the main trigger executes until the moment when until trigger is fired.
  • Repeatedly - helps to execute given trigger repeatedly. The sub-trigger is defined in forever(Trigger repeated) method. That said even if we defined a AfterPane.elementCountAtLeast(2) as a repeatable sub-trigger, it won't stop after the first 2 elements in the pane but will continue the execution for every new 2 items.

Triggers examples

The following tests show some of specificities of each of presented triggers:

private static final Instant NOW = new Instant(0);
private static final Instant SEC_1_DURATION = NOW.plus(Duration.standardSeconds(1));

@Test
public void should_emit_early_results_after_receiving_6_events() {
  Pipeline pipeline = BeamFunctions.createPipeline("Early results after 6 events received");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> onTimeLetters = TestStream.create(utfCoder).addElements(
    TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION)
  )
  .addElements(
    TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
    TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
  .addElements(TimestampedValue.of("a", SEC_1_DURATION))
  .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  Window<String> window = Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))
    .triggering(AfterPane.elementCountAtLeast(3))
    .withAllowedLateness(windowDuration, Window.ClosingBehavior.FIRE_ALWAYS)
    .accumulatingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, onTimeLetters);

  // doc: "Triggers allow Beam to emit early results, before all the data in a given window has arrived.
  // For example, emitting after a certain amount of time elapses, or after a certain number of elements arrives."
  // Here we want to emit the results after the window receives at least 3 items. In this case we compute the
  // result for 2 .addElements operations (2 + 4 items). The last .addElements is ignored
  // See the next test to discover what happens if we add only 1 event in .addElements method
  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(Duration.standardSeconds(5)));
  PAssert.that(results).inWindow(window1).containsInAnyOrder("a=6");
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=6");

  pipeline.run().waitUntilFinish();
}

@Test
public void should_emit_early_results_after_receiving_3_events() {
  Pipeline pipeline = BeamFunctions.createPipeline("Early results after 3 events received");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> onTimeLetters = TestStream.create(utfCoder)
    .addElements(TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION))
    .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  Window<String> window = Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))
    .triggering(AfterPane.elementCountAtLeast(3))
    .withAllowedLateness(windowDuration, Window.ClosingBehavior.FIRE_ALWAYS)
    .accumulatingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, onTimeLetters);

  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(Duration.standardSeconds(5)));
  PAssert.that(results).inWindow(window1).containsInAnyOrder("a=3");
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=3");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_emit_early_results_after_processing_time_elapsed() {
  Pipeline pipeline = BeamFunctions.createPipeline("Early results after processing time elapsed");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> onTimeLetters = TestStream.create(utfCoder).addElements(
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION)
  )
  // processing time trigger fires the pane after 2 seconds, add 3 seconds then check if the final
  // pane contains the data after this time
  .advanceProcessingTime(Duration.standardSeconds(3))
  .addElements(
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
  .addElements(TimestampedValue.of("a", SEC_1_DURATION))
  .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  Window<String> window = Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))
    .triggering(AfterProcessingTime.pastFirstElementInPane().alignedTo(Duration.standardSeconds(2)))
    .withAllowedLateness(windowDuration, Window.ClosingBehavior.FIRE_ALWAYS)
    .accumulatingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, onTimeLetters);
  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(Duration.standardSeconds(5)));
  PAssert.that(results).inWindow(window1).containsInAnyOrder("a=2");
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=2");

  pipeline.run().waitUntilFinish();
}

@Test
public void should_emit_the_results_after_processing_time_aligned_to_date() {
  Pipeline pipeline = BeamFunctions.createPipeline("Processing time trigger aligned to a date");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> onTimeLetters = TestStream.create(utfCoder)
    .addElements(
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    // Advance processing time to 3 seconds - normally the additional elements should be included
    // in the result (even if the processing time trigger is of 2 seconds). The processing time trigger
    // is aligned to the date NOW + 1 second, so it behaves as we'd advance the processing time to only 2 sec
    .advanceProcessingTime(Duration.standardSeconds(3))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  AfterProcessingTime processingTimeTrigger = AfterProcessingTime.pastFirstElementInPane()
    .alignedTo(Duration.standardSeconds(2), NOW.plus(Duration.standardSeconds(1)));
  Window<String> window = Window.<String>into(FixedWindows.of(windowDuration))
    .triggering(processingTimeTrigger)
    .withAllowedLateness(windowDuration, Window.ClosingBehavior.FIRE_ALWAYS)
    .accumulatingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, onTimeLetters);

  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(windowDuration));
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=12");
  PAssert.that(results).inWindow(window1).containsInAnyOrder("a=12");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_emit_results_after_watermark_passed_when_trigger_is_never() {
  Pipeline pipeline = BeamFunctions.createPipeline("Results triggered with Never trigger");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> onTimeLetters = TestStream.create(utfCoder)
    .addElements(
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION)
    )
    // Advance processing time to show that it hasn't the influence on Never trigger + watermark
    .advanceProcessingTime(Duration.standardSeconds(3))
    .addElements(
      TimestampedValue.of("a", SEC_1_DURATION),TimestampedValue.of("a", SEC_1_DURATION),
      TimestampedValue.of("a", SEC_1_DURATION),TimestampedValue.of("a", SEC_1_DURATION))
    // Now advance the watermark to the out of the window to show that the last "a" element is discarded
    .advanceWatermarkTo(NOW.plus(Duration.standardSeconds(20)))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION))
.advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  Window<String> window = Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))
    .triggering(Never.ever())
    .withAllowedLateness(windowDuration, Window.ClosingBehavior.FIRE_ALWAYS)
    .accumulatingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, onTimeLetters);
  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(Duration.standardSeconds(5)));
  // Using Never trigger means that the result is computed at window_end + watermark time
  // That said this type of trigger is never used - the pane firing is based on window + watermark time
  PAssert.that(results).inWindow(window1).containsInAnyOrder("a=6");
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=6");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_discard_items_accumulated_in_previous_pane() {
  Pipeline pipeline = BeamFunctions.createPipeline("Discard already fired panes");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> onTimeLetters = TestStream.create(utfCoder)
    .addElements(
            TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
            TimestampedValue.of("a", SEC_1_DURATION)
    )
    // advance to 6 sec to see late event included in the final result
    .advanceWatermarkTo(NOW.plus(Duration.standardSeconds(6)))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION))
    .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  AfterWatermark.AfterWatermarkEarlyAndLate afterWatermark = AfterWatermark.pastEndOfWindow()
    .withLateFirings(AfterProcessingTime.pastFirstElementInPane())
    .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane());
  Window<String> window = Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))
    .triggering(afterWatermark)
    .withAllowedLateness(windowDuration, Window.ClosingBehavior.FIRE_ALWAYS)
    .discardingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, onTimeLetters);

  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(Duration.standardSeconds(5)));
  PAssert.that(results).inOnTimePane(window1).containsInAnyOrder("a=3");
  // a=1 since we discard already fired panes
  // See the next test to discover the difference between discarding and accumulating fired panes
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=1");

  pipeline.run().waitUntilFinish();
}

@Test
public void should_accumulate_items_accumulated_in_previous_pane() {
  Pipeline pipeline = BeamFunctions.createPipeline("Accumulate already fired panes");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> onTimeLetters = TestStream.create(utfCoder)
    .addElements(
        TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
        TimestampedValue.of("a", SEC_1_DURATION)
    )
    // advance to 6 sec to see late event included in the final result
    .advanceWatermarkTo(NOW.plus(Duration.standardSeconds(6)))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION))
    .advanceWatermarkToInfinity();
Duration windowDuration = Duration.standardSeconds(5);
  AfterWatermark.AfterWatermarkEarlyAndLate afterWatermark = AfterWatermark.pastEndOfWindow()
    .withLateFirings(AfterProcessingTime.pastFirstElementInPane())
    .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane());
  Window<String> window = Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))
    .triggering(afterWatermark)
    .withAllowedLateness(windowDuration, Window.ClosingBehavior.FIRE_ALWAYS)
    .accumulatingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, onTimeLetters);

  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(Duration.standardSeconds(5)));
  PAssert.that(results).inOnTimePane(window1).containsInAnyOrder("a=3");
  // a=4 since we accumulate already fired panes
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=4");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_emit_the_results_with_early_and_late_sub_triggers_for_watermark() {
  Pipeline pipeline = BeamFunctions.createPipeline("Early and late fired panes with watermark");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> onTimeLetters = TestStream.create(utfCoder)
    .addElements(
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
      TimestampedValue.of("a", SEC_1_DURATION)
    )
    // Advance to 5 secs - allowed processing time is 2 secs, so it should emit the early results
    .advanceProcessingTime(Duration.standardSeconds(5))
    // add new elements and advance watermark and add 3x3 elements to see if 9 or 2 elements
    // are in the final pane
    .advanceWatermarkTo(NOW.plus(Duration.standardSeconds(6)))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  AfterWatermark.AfterWatermarkEarlyAndLate afterWatermark = AfterWatermark.pastEndOfWindow()
    .withLateFirings(AfterPane.elementCountAtLeast(2))
    .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(2)));
  Window<String> window = Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)))
    .triggering(afterWatermark)
    .withAllowedLateness(windowDuration, Window.ClosingBehavior.FIRE_ALWAYS)
    .accumulatingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, onTimeLetters);

  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(Duration.standardSeconds(5)));
  PAssert.that(results).inOnTimePane(window1).containsInAnyOrder("a=3");
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=9");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_emit_the_results_with_at_least_trigger_even_if_the_threshold_was_not_reached() {
  Pipeline pipeline = BeamFunctions.createPipeline("atLeast trigger threshold not reached");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> onTimeLetters = TestStream.create(utfCoder).addElements(
    TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
    TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
    TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION)
  )
  .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  Window<String> window = Window.<String>into(FixedWindows.of(windowDuration))
    .triggering(AfterPane.elementCountAtLeast(20))
    .withAllowedLateness(windowDuration, Window.ClosingBehavior.FIRE_ALWAYS)
    .accumulatingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, onTimeLetters);
  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(windowDuration));
  // At least trigger is not strict, i.e. even if the number of items is not reached, it'll
  // trigger the pane
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=6");
  PAssert.that(results).inWindow(window1).containsInAnyOrder("a=6");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_emit_the_results_after_the_first_executed_trigger() {
  Pipeline pipeline = BeamFunctions.createPipeline("The first launched trigger");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> onTimeLetters = TestStream.create(utfCoder)
    .addElements(
        TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(
        TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
        TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    // Advance processing time in order to execute the processingTimeTrigger instead of elementsCountTrigger
    // defined below
    .advanceProcessingTime(Duration.standardSeconds(3))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION))
    .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  AfterPane elementsCountTrigger = AfterPane.elementCountAtLeast(20);
  AfterProcessingTime processingTimeTrigger = AfterProcessingTime.pastFirstElementInPane()
    .plusDelayOf(Duration.standardSeconds(2));
  Window<String> window = Window.<String>into(FixedWindows.of(windowDuration))
    .triggering(AfterFirst.of(elementsCountTrigger, processingTimeTrigger))
    .withAllowedLateness(windowDuration, Window.ClosingBehavior.FIRE_ALWAYS)
    .accumulatingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, onTimeLetters);
  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(windowDuration));
  // Only 6 items are in the pane since the trigger computes the window either after accumulating 20 items or
  // 2 seconds after receiving the first element
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=6");
  PAssert.that(results).inWindow(window1).containsInAnyOrder("a=6");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_fire_final_pane_after_all_triggers() {
  Pipeline pipeline = BeamFunctions.createPipeline("After all triggers pane firing");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> onTimeLetters = TestStream.create(utfCoder)
    .addElements(
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    // Advance processing time - the processing time trigger should be fired
    .advanceProcessingTime(Duration.standardSeconds(20))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  Window<String> window = Window.<String>into(FixedWindows.of(windowDuration))
    .triggering(AfterAll.of(AfterPane.elementCountAtLeast(12),
      AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(2))))
    .withAllowedLateness(windowDuration, Window.ClosingBehavior.FIRE_ALWAYS)
    .accumulatingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, onTimeLetters);

  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(windowDuration));
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=12");
  PAssert.that(results).inWindow(window1).containsInAnyOrder("a=12");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_build_composite_trigger_with_stopping_condition() {
  Pipeline pipeline = BeamFunctions.createPipeline("Composite trigger with stopping condition");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> onTimeLetters = TestStream.create(utfCoder)
    .addElements(
        TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(
        TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
        TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    // advance the processing time to 5 from 10 acceptable seconds
    .advanceProcessingTime(Duration.standardSeconds(5))
    .addElements(
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
      TimestampedValue.of("a", SEC_1_DURATION))
    // advance processing time to the 13th second - the items above should be discarded
    .advanceProcessingTime(Duration.standardSeconds(7))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
      TimestampedValue.of("a", SEC_1_DURATION)
    )
    .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  Trigger.OnceTrigger mainTrigger = AfterProcessingTime.pastFirstElementInPane()
    .plusDelayOf(Duration.standardSeconds(10));
  Trigger.OnceTrigger untilTrigger = AfterPane.elementCountAtLeast(2);
  Window<String> window = Window.<String>into(FixedWindows.of(windowDuration))
    .triggering(mainTrigger.orFinally(untilTrigger))
    .withAllowedLateness(windowDuration, Window.ClosingBehavior.FIRE_ALWAYS)
    .accumulatingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, onTimeLetters);

  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(windowDuration));
  // Here we use OrFinallyTrigger that stops the execution of main trigger when the conditional trigger
  // (here until trigger) is fired
  // See the next test to discover what happens without untilTrigger
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=2");
  PAssert.that(results).inWindow(window1).containsInAnyOrder("a=2");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_show_what_happens_for_orfinallytrigger_without_stopping_condition() {
  Pipeline pipeline = BeamFunctions.createPipeline("As composite trigger but without stopping condition");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> onTimeLetters = TestStream.create(utfCoder)
    .addElements(
        TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    // advance the processing time to 5 from 10 acceptable seconds
    .advanceProcessingTime(Duration.standardSeconds(5))
    .addElements(
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
      TimestampedValue.of("a", SEC_1_DURATION))
    // advance processing time to the 13th second - the items above should be discarded
    .advanceProcessingTime(Duration.standardSeconds(7))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
      TimestampedValue.of("a", SEC_1_DURATION)
    )
    .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  Trigger.OnceTrigger processingTimeTrigger = AfterProcessingTime
    .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10));
  Window<String> window = Window.<String>into(FixedWindows.of(windowDuration))
    .triggering(processingTimeTrigger)
    .withAllowedLateness(windowDuration, Window.ClosingBehavior.FIRE_ALWAYS)
    .accumulatingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, onTimeLetters);

  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(windowDuration));
  // Unlike previous test here we don't use the stopping condition. Obviously, the main trigger
  // executes normally and accumulates 9 'a' items
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=9");
  PAssert.that(results).inWindow(window1).containsInAnyOrder("a=9");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_repeatedly_execute_elements_count_based_trigger() {
  Pipeline pipeline = BeamFunctions.createPipeline("Elements count based trigger executed repeatedly");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> onTimeLetters = TestStream.create(utfCoder)
    .addElements(
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
      TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(
        TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION))
    .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  Window<String> window = Window.<String>into(FixedWindows.of(windowDuration))
    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(2)))
    .withAllowedLateness(windowDuration, Window.ClosingBehavior.FIRE_ALWAYS)
    .accumulatingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, onTimeLetters);

  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(windowDuration));
  PAssert.that(results).inFinalPane(window1).containsInAnyOrder("a=9");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_repeatedly_execute_elements_count_based_trigger_for_global_window() {
  Pipeline pipeline = BeamFunctions.createPipeline("Elements count based trigger executed repeatedly on global window");
  Coder<String> utfCoder = StringUtf8Coder.of();
  TestStream<String> onTimeLetters = TestStream.create(utfCoder)
    .addElements(
            TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(
            TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION),
            TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(
            TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("a", SEC_1_DURATION))
    .addElements(TimestampedValue.of("a", SEC_1_DURATION))
    .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(5);
  Window<String> window = Window.<String>into(new GlobalWindows())
    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(2)))
    .withAllowedLateness(windowDuration, Window.ClosingBehavior.FIRE_ALWAYS)
    .discardingFiredPanes();

  PCollection<String> results = applyCounter(pipeline, window, onTimeLetters);

  GlobalWindow globalWindow = GlobalWindow.INSTANCE;
  PAssert.that(results).inFinalPane(globalWindow).containsInAnyOrder("a=1");
  pipeline.run().waitUntilFinish();
}

private static PCollection<String> applyCounter(Pipeline pipeline, Window<String> window,
                                              TestStream<String> inputCollection) {
  return pipeline.apply(inputCollection)
    .apply(window)
    .apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
      .via((String letter) -> KV.of(letter, 1)))
    .apply(Count.perKey())
    .apply(MapElements.into(TypeDescriptors.strings()).via((KV<String, Long> pair) ->
      pair.getKey() + "=" + pair.getValue()));
}

This last post from the series devoted to Apache Beam windows. It strengthens the power of this mechanism composed of different window types, watermark for late data and trigger allowing either to trigger early results or manipulate late data more efficiently. As shown in the first section, the triggers can be divided in 4 different categories: event time, processing time, data-driven and composite. Each of them can be used together with accumulation mode that defines what to do with already fired panes. The second section listed the available triggers in the Java's API while the last shown how they can be used through learning tests using TestStream feature.

Read also about Triggers in Apache Beam here: Triggers .

Comments:

There are no comments for this article.

Write a comment