Dealing with state lifecycle in Apache Beam

Versions: Apache Beam 2.2.0 https://github.com/bartosz25/beam-learning

As we saw in the previous post, Apache Beam brings the possibility to deal with state. However, as we learned there, the state itself allows only to keep something in memory during the window duration. After that, the state is removed. But thanks to another Beam's feature called timers we can deal with the expiring state just before its removal from the state store.

This post talks about timers in 3 sections. At the beginning it introduces the idea. The second part describes timer's Java API while the last shows how it can be used through the usual learning tests.

Timer

Stateful processing in Apache Beam is based on state coupled to the key of the processed dataset. However, it's also bounded to the window duration and after its expiration it's difficult to do something with computed state. Hopefully Beam brings an abstraction of timers to help to control the state lifecycle.

A timer can be thought as a CRON task scheduled to execution at a specific moment. The timer helps to control the state lifecycle and can be aligned either on event time (e.g. when window expires) or on processing time (e.g. every 1 second in particular window).

In order to understand the timer's role, let's take a simple object of buffer accumulating data during the window duration (let's say 10 seconds) and flushing it at regular interval of time (e.g. 5 seconds). We could try (inefficiently) to do it without the timer, as in the following code:

new DoFn<KV<String, Long>, Long>() {

  private static final String FLUSH_TIME_STATE = "flushTime";

  private static final String ACCUMULATOR_NAME = "accumulator";

  @StateId(FLUSH_TIME_STATE)
  private final StateSpec<ValueState<Long>> flushTimeStateSpec = StateSpecs.value(VarLongCoder.of());

  @StateId(ACCUMULATOR_NAME)
  private final StateSpec<BagState<String>> accumulatorStateSpec = StateSpecs.bag();

  @DoFn.ProcessElement
  public void processElement(ProcessContext processContext,
      @StateId(FLUSH_TIME_STATE) ValueState<Long> flushTime,
      @StateId(ACCUMULATOR_NAME) BagState<String> accumulator) {
    if (flushTime.read() == null) {
      flushTime.write(System.currentTimeMillis()+2000L);
    } else {
      if (flushTime.read() >= System.currentTimeMillis()) {
        // Flush & clear accumulator here
        // Reset flush time also - it'll be set again with new encountered item
      }
    }
  } 
};

But as you can see, it's tricky. You have to keep the action time as a separate state and in addition to that, the code doesn't guarantee that all data will be flushed. The values processed in the 2nd sub-window interval (5-10 sec) won't be flushed when there is no entry reaching the end of the interval. A timer is a much cleaner solution because it guarantees the execution after the defined time or with the end of the window. So the mentioned requirement of the element fitting on the window boundary is not true anymore.

Timer API

The use of timer is straightforward. First, the timer is defined with @TimerId annotation and similarly to the state, it's expressed as a spec of org.apache.beam.sdk.state.TimerSpec type. As already told, 2 types of timers are available, both represented by TimeDomain enum: EVENT_TIME and PROCESSING_TIME. The first one is related to the event time and it advances together with system's watermark. The second type concerns processing time and advances with the pipeline execution.

Any particular timer is the implementation of org.apache.beam.sdk.state.Timer interface. It exposes 2 methods that can be used to align the timer on event or processing time. The first one offset(Duration offset) defines the next invocation of the timer (NOW + offset). The second one align(Duration period) aligns the execution to the next boundary of specified period (NOW + period - (NOW % period); e.g 5 + 3 - (5%3) = 8 - 2 = 6). The 2 methods are available for processing and event time based timers. There is also another method called set(Instant absoluteTime) that applies only for event time timers. It defines the specific (absolute) time when the timer will be executed.

One of mentioned methods is called in the @ProcessElement annotated method where the Timer object is passed as an argument annotated with @TimerId (similarly to state object passed in @StateId field). However, Timer object servers only to control the execution time. The physical execution of timer methods is provided by TimerInternals implementations. As told in the Javadoc, they "Encapsulate interaction with time within the execution environment". The timer is added through setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) method. As you can see, the method contains a state namespace that together with timer id is used to uniquely identify given timer object.

The methods called when the timer's condition is met are identified by @OnTimer annotation. Its value corresponds to the timer triggering the method's execution. This method has as parameter the instance of OnTimerContext. It can also contain the state related to the triggered event.

Timer examples

Let's see now how we can use timely processing through some simple test cases:

private static final Instant NOW = new Instant(0);
private static final Instant SEC_1_DURATION = NOW.plus(Duration.standardSeconds(1));
private static final Instant SEC_3_DURATION = NOW.plus(Duration.standardSeconds(3));
private static final Instant SEC_5_DURATION = NOW.plus(Duration.standardSeconds(5));
private static final Instant SEC_12_DURATION = NOW.plus(Duration.standardSeconds(12));

@Test
public void should_save_accumulated_entries_after_window_expiration() {
  Pipeline pipeline = BeamFunctions.createPipeline("Timer expiration with window end");
  Coder<String> utf8Coder = StringUtf8Coder.of();
  Coder<Long> varLongCoder = VarLongCoder.of();
  KvCoder<String, Long> keyValueCoder = KvCoder.of(utf8Coder, varLongCoder);
  TestStream<KV<String, Long>> words = TestStream.create(keyValueCoder).addElements(
      TimestampedValue.of(KV.of("a", 1L), SEC_1_DURATION), TimestampedValue.of(KV.of("a", 2L), SEC_1_DURATION),
      TimestampedValue.of(KV.of("b", 5L), SEC_1_DURATION), TimestampedValue.of(KV.of("a", 6L), SEC_1_DURATION),
      TimestampedValue.of(KV.of("c", 2L), SEC_1_DURATION), TimestampedValue.of(KV.of("a", 7L), SEC_1_DURATION),
      TimestampedValue.of(KV.of("a", 3L), SEC_1_DURATION), TimestampedValue.of(KV.of("a", 9L), SEC_1_DURATION),
      TimestampedValue.of(KV.of("d", 2L), SEC_1_DURATION), TimestampedValue.of(KV.of("a", 1L), SEC_1_DURATION),
      TimestampedValue.of(KV.of("a", 2L), SEC_1_DURATION)
    )
    .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(15);
  Window<KV<String, Long>> window = Window.into(FixedWindows.of(windowDuration));

  PCollection<Long> results = pipeline.apply(words).apply(window).apply(ParDo.of(new DoFn<KV<String, Long>, Long>() {
    private static final String DATA_HOLDER_NAME = "sumState";
    private static final String EXPIRY_STATE_NAME = "expiry";

    @StateId(DATA_HOLDER_NAME)
    private final StateSpec<ValueState<Long>> sumStateSpec = StateSpecs.value(VarLongCoder.of());

    @TimerId(EXPIRY_STATE_NAME)
    private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

    @ProcessElement
    public void processElement(ProcessContext processContext, IntervalWindow window,
          @StateId(DATA_HOLDER_NAME) ValueState<Long> sumState,
          @TimerId(EXPIRY_STATE_NAME) Timer expiryTimer) {
      long currentState = Optional.ofNullable(sumState.read()).orElse(0L);
      if (sumState.read() == null) {
          // timer will be triggered when the window ends
          expiryTimer.set(window.maxTimestamp());
      }
      long newState = currentState + processContext.element().getValue();
      sumState.write(newState);
    }

    @OnTimer(EXPIRY_STATE_NAME)
    public void flushOnExpiringState(
        OnTimerContext context,
        @StateId(DATA_HOLDER_NAME) ValueState<Long> sumState) {
      context.output(sumState.read());
    }

  }));
  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(windowDuration));
  PAssert.that(results).inWindow(window1).containsInAnyOrder(31L, 5L, 2L, 2L);
  pipeline.run().waitUntilFinish();
}

@Test
public void should_fail_on_expiring_timer_after_the_window() {
  Pipeline pipeline = BeamFunctions.createPipeline("Timer expiration after the end of the window");
  Coder<String> utf8Coder = StringUtf8Coder.of();
  Coder<Long> varLongCoder = VarLongCoder.of();
  KvCoder<String, Long> keyValueCoder = KvCoder.of(utf8Coder, varLongCoder);
  TestStream<KV<String, Long>> words = TestStream.create(keyValueCoder).addElements(
      TimestampedValue.of(KV.of("a", 1L), SEC_1_DURATION), TimestampedValue.of(KV.of("a", 2L), SEC_1_DURATION),
      TimestampedValue.of(KV.of("b", 5L), SEC_1_DURATION), TimestampedValue.of(KV.of("a", 6L), SEC_1_DURATION)
    )
    .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(15);
  Window<KV<String, Long>> window = Window.into(FixedWindows.of(windowDuration));

  assertThatThrownBy(() -> {
    pipeline.apply(words).apply(window).apply(ParDo.of(new DoFn<KV<String, Long>, Long>() {
      private static final String EXPIRY_STATE_NAME = "expiry";

      @TimerId(EXPIRY_STATE_NAME)
      private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

      @ProcessElement
      public void processElement(ProcessContext processContext, IntervalWindow window,
              @TimerId(EXPIRY_STATE_NAME) Timer expiryTimer) {
        expiryTimer.set(window.maxTimestamp().plus(Duration.standardSeconds(5)));
      }

      @OnTimer(EXPIRY_STATE_NAME)
      public void expiryState(OnTimerContext context) {
      }

    }));
    pipeline.run().waitUntilFinish();
  }).isInstanceOf(Pipeline.PipelineExecutionException.class)
          .hasMessage("java.lang.IllegalArgumentException: Attempted to set event time timer " +
                  "for 1970-01-01T00:00:19.999Z but that is after the " +
                  "expiration of window 1970-01-01T00:00:14.999Z");
}

@Test
public void should_save_accumulated_entries_after_processing_time_timer() {
  Pipeline pipeline = BeamFunctions.createPipeline("Processing time timer expiration");
  Coder<String> utf8Coder = StringUtf8Coder.of();
  Coder<Long> varLongCoder = VarLongCoder.of();
  KvCoder<String, Long> keyValueCoder = KvCoder.of(utf8Coder, varLongCoder);
  TestStream<KV<String, Long>> words = TestStream.create(keyValueCoder).addElements(
      TimestampedValue.of(KV.of("a1", 1L), SEC_1_DURATION), TimestampedValue.of(KV.of("a1", 2L), SEC_1_DURATION),
      TimestampedValue.of(KV.of("b1", 5L), SEC_1_DURATION)
    )
    .advanceProcessingTime(Duration.standardSeconds(1))
    .addElements(
      TimestampedValue.of(KV.of("a2", 1L), SEC_1_DURATION), TimestampedValue.of(KV.of("a2", 2L), SEC_1_DURATION),
      TimestampedValue.of(KV.of("b2", 5L), SEC_1_DURATION)
    )
    .advanceProcessingTime(Duration.standardSeconds(1))
    .addElements(
      TimestampedValue.of(KV.of("a3", 1L), SEC_5_DURATION), TimestampedValue.of(KV.of("a3", 2L), SEC_5_DURATION),
      TimestampedValue.of(KV.of("b3", 5L), SEC_5_DURATION)
    )
    // Need to advance 2 seconds to see the last entries flushed
    .advanceProcessingTime(Duration.standardSeconds(2))
    .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(10);
  Window<KV<String, Long>> window = Window.into(FixedWindows.of(windowDuration));

  PCollection<Iterable<String>> results = pipeline.apply(words).apply(window).apply(ParDo.of(new DoFn<KV<String, Long>, Iterable<String>>() {
    private static final String FLUSH_STATE_NAME = "expiry";
    private static final String ACCUMULATOR_NAME = "accumulator";

    @StateId(ACCUMULATOR_NAME)
    private final StateSpec<BagState<String>> accumulatorStateSpec = StateSpecs.bag();

    @TimerId(FLUSH_STATE_NAME)
    private final TimerSpec flushSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

    @ProcessElement
    public void processElement(ProcessContext processContext, IntervalWindow window,
          @StateId(ACCUMULATOR_NAME) BagState<String> wordsAccumulator,
          @TimerId(FLUSH_STATE_NAME) Timer expiryTimer) {
      if (wordsAccumulator.isEmpty().read()) {
          expiryTimer.align(Duration.standardSeconds(1)).setRelative();
      }
      wordsAccumulator.add(processContext.element().getKey());
    }

    @OnTimer(FLUSH_STATE_NAME)
    public void flushAccumulatedResults(
        OnTimerContext context,
        @StateId(ACCUMULATOR_NAME) BagState<String> wordsAccumulator) {
      context.output(wordsAccumulator.read());
    }

  }));
  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(Duration.standardSeconds(10)));
  PAssert.that(results).inWindow(window1).containsInAnyOrder(Arrays.asList("a1", "a1"),
    Arrays.asList("b1"), Arrays.asList("b2"), Arrays.asList("b3"), Arrays.asList("a3", "a3"),
    Arrays.asList("a2", "a2")
  );
  pipeline.run().waitUntilFinish();
}

@Test
public void should_not_call_timer_when_it_planned_after_then_end_of_window() {
  Pipeline pipeline = BeamFunctions.createPipeline("Processing time timer triggers after the window ends");
  Coder<String> utf8Coder = StringUtf8Coder.of();
  Coder<Long> varLongCoder = VarLongCoder.of();
  KvCoder<String, Long> keyValueCoder = KvCoder.of(utf8Coder, varLongCoder);
  TestStream<KV<String, Long>> words = TestStream.create(keyValueCoder).addElements(
      TimestampedValue.of(KV.of("a1", 1L), SEC_1_DURATION), TimestampedValue.of(KV.of("a1", 2L), SEC_1_DURATION),
      TimestampedValue.of(KV.of("b1", 5L), SEC_1_DURATION)
    )
    .advanceProcessingTime(Duration.standardSeconds(10))
    .addElements(
        TimestampedValue.of(KV.of("a12", 1L), SEC_12_DURATION), TimestampedValue.of(KV.of("a12", 2L), SEC_12_DURATION),
        TimestampedValue.of(KV.of("b12", 5L), SEC_12_DURATION)
    )
    .advanceProcessingTime(Duration.standardSeconds(10))
    .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(10);
  Window<KV<String, Long>> window = Window.into(FixedWindows.of(windowDuration));

  PCollection<Iterable<String>> results = pipeline.apply(words).apply(window).apply(ParDo.of(new DoFn<KV<String, Long>, Iterable<String>>() {
    private static final String FLUSH_STATE_NAME = "expiry";
    private static final String ACCUMULATOR_NAME = "accumulator";

    @StateId(ACCUMULATOR_NAME)
    private final StateSpec<BagState<String>> accumulatorStateSpec = StateSpecs.bag();

    @TimerId(FLUSH_STATE_NAME)
    private final TimerSpec flushSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

    @ProcessElement
    public void processElement(ProcessContext processContext, IntervalWindow window,
            @StateId(ACCUMULATOR_NAME) BagState<String> wordsAccumulator,
            @TimerId(FLUSH_STATE_NAME) Timer expiryTimer) {
      if (wordsAccumulator.isEmpty().read()) {
        // trigger after 15 seconds while the window is 10 secs
        expiryTimer.align(windowDuration.plus(Duration.standardSeconds(5))).setRelative();
      }
      wordsAccumulator.add(processContext.element().getKey());
    }

    @OnTimer(FLUSH_STATE_NAME)
    public void flushAccumulatedResults(
        OnTimerContext context,
        @StateId(ACCUMULATOR_NAME) BagState<String> wordsAccumulator) {
      context.output(wordsAccumulator.read());
    }

  }));
  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(Duration.standardSeconds(10)));
  // As we can see the result is empty because the timer based on processing time was defined to trigger
  // after the end of the window
  // Apparently we'd think that the data will be flushed in the 2nd window (processing timer planned for the
  // 15'' of processing). However it doesn't occur because the state is bounded to the window.
  PAssert.that(results).inWindow(window1).empty();
  pipeline.run().waitUntilFinish();
}

@Test
public void should_apply_timer_on_global_window() {
  Pipeline pipeline = BeamFunctions.createPipeline("Timely transform in global window");
  Coder<String> utf8Coder = StringUtf8Coder.of();
  KvCoder<String, String> keyValueCoder = KvCoder.of(utf8Coder, utf8Coder);
  TestStream<KV<String, String>> usersWithVisitedPageX = TestStream.create(keyValueCoder).addElements(
      TimestampedValue.of(KV.of("user1", "page1"), SEC_1_DURATION), TimestampedValue.of(KV.of("user2", "page2"), SEC_1_DURATION),
      TimestampedValue.of(KV.of("user1", "page5"), SEC_1_DURATION)
    )
    .advanceProcessingTime(Duration.standardSeconds(10))
    .addElements(
      TimestampedValue.of(KV.of("user2", "page11"), SEC_12_DURATION), TimestampedValue.of(KV.of("user3", "page21"), SEC_12_DURATION),
      TimestampedValue.of(KV.of("user3", "page8"), SEC_12_DURATION)
    )
    .advanceProcessingTime(Duration.standardSeconds(10))
    .advanceWatermarkToInfinity();

  PCollection<KV<String, String>> usersWithVisitedPage = pipeline.apply(Create.of(Arrays.asList(
    KV.of("user1", "page1"), KV.of("user2", "page2"), KV.of("user1", "page5"),
    KV.of("user2", "page11"), KV.of("user3", "page21"), KV.of("user3", "page8")
  )));
  Window<KV<String, String>> globalWindow = Window.<KV<String, String>>into(new GlobalWindows()).triggering(Repeatedly
      .forever(AfterProcessingTime
        .pastFirstElementInPane()
        .plusDelayOf(Duration.standardSeconds(5))
      )
    )
    .withAllowedLateness(Duration.ZERO).discardingFiredPanes();
  PCollection<Long> userWithVisitsNumber = pipeline.apply(usersWithVisitedPageX).apply(globalWindow).apply(
          ParDo.of(new DoFn<KV<String, String>, Long>() {
      private static final String DATA_HOLDER_NAME = "sumState";

      private static final String EXPIRY_STATE_NAME = "expiry";

      @StateId(DATA_HOLDER_NAME)
      private final StateSpec<ValueState<Long>> sumStateSpec = StateSpecs.value(VarLongCoder.of());

      @TimerId(EXPIRY_STATE_NAME)
      private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

      @ProcessElement
      public void processElement(ProcessContext processContext,
              @StateId(DATA_HOLDER_NAME) ValueState<Long> sumState,
              @TimerId(EXPIRY_STATE_NAME) Timer expiryTimer) {
        long currentCounter = Optional.ofNullable(sumState.read()).orElse(0L);
        if (currentCounter == 0L) {
          expiryTimer.align(Duration.standardSeconds(5)).setRelative();
        }
        long newCounterValue = currentCounter + 1;
        sumState.write(newCounterValue);
      }

      @OnTimer(EXPIRY_STATE_NAME)
      public void flushAccumulatedResults(
          OnTimerContext context,
          @StateId(DATA_HOLDER_NAME) ValueState<Long> sumState) {
        context.output(sumState.read());
      }
  }));

  // Here we deal with global window so we don't need to fear that the processing time timer won't be
  // called when the window expires.
  PAssert.that(userWithVisitsNumber).containsInAnyOrder(2L, 2L, 2L);
  pipeline.run().waitUntilFinish();
}

@Test
public void should_invoke_trigger_at_event_time() {
  Pipeline pipeline = BeamFunctions.createPipeline("Event time timer expiration");
  Coder<String> utf8Coder = StringUtf8Coder.of();
  Coder<Long> varLongCoder = VarLongCoder.of();
  KvCoder<String, Long> keyValueCoder = KvCoder.of(utf8Coder, varLongCoder);
  TestStream<KV<String, Long>> words = TestStream.create(keyValueCoder).addElements(
      TimestampedValue.of(KV.of("a1", 1L), SEC_1_DURATION), TimestampedValue.of(KV.of("a1", 2L), SEC_1_DURATION),
      TimestampedValue.of(KV.of("b1", 5L), SEC_1_DURATION)
    )
    .advanceWatermarkTo(NOW.plus(Duration.standardSeconds(2)))
    .addElements(
      TimestampedValue.of(KV.of("a2", 1L), SEC_3_DURATION), TimestampedValue.of(KV.of("a2", 2L), SEC_3_DURATION),
      TimestampedValue.of(KV.of("b2", 5L), SEC_3_DURATION)
    )
    .advanceWatermarkTo(NOW.plus(Duration.standardSeconds(4)))
    .addElements(
      TimestampedValue.of(KV.of("a3", 1L), SEC_5_DURATION), TimestampedValue.of(KV.of("a3", 2L), SEC_5_DURATION),
      TimestampedValue.of(KV.of("b3", 5L), SEC_5_DURATION)
    )
    .advanceWatermarkTo(NOW.plus(Duration.standardSeconds(6)))
    .advanceWatermarkToInfinity();
  Duration windowDuration = Duration.standardSeconds(2);
  Window<KV<String, Long>> window = Window.into(FixedWindows.of(windowDuration));

  PCollection<Iterable<String>> results = pipeline.apply(words).apply(window).apply(
          ParDo.of(new DoFn<KV<String, Long>, Iterable<String>>() {
    private static final String FLUSH_STATE_NAME = "expiry";
    private static final String ACCUMULATOR_NAME = "accumulator";

    @StateId(ACCUMULATOR_NAME)
    private final StateSpec<BagState<String>> accumulatorStateSpec = StateSpecs.bag();

    @TimerId(FLUSH_STATE_NAME)
    private final TimerSpec flushSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

    @ProcessElement
    public void processElement(ProcessContext processContext, IntervalWindow window,
            @StateId(ACCUMULATOR_NAME) BagState<String> wordsAccumulator,
            @TimerId(FLUSH_STATE_NAME) Timer expiryTimer) {
      if (wordsAccumulator.isEmpty().read()) {
        expiryTimer.set(NOW.plus(Duration.standardSeconds(1)));
      }
      wordsAccumulator.add(processContext.element().getKey());
    }

    @OnTimer(FLUSH_STATE_NAME)
    public void flushAccumulatedResults(
        OnTimerContext context,
        @StateId(ACCUMULATOR_NAME) BagState<String> wordsAccumulator) {
      context.output(wordsAccumulator.read());
    }
  }));

  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(Duration.standardSeconds(2)));
  PAssert.that(results).inWindow(window1).containsInAnyOrder(Arrays.asList("a1", "a1"),
      Arrays.asList("b1"));
  IntervalWindow window2 = new IntervalWindow(NOW.plus(Duration.standardSeconds(2)),
      NOW.plus(Duration.standardSeconds(4)));
  PAssert.that(results).inWindow(window2).containsInAnyOrder(Arrays.asList("a2", "a2"),
      Arrays.asList("b2"));
  IntervalWindow window3 = new IntervalWindow(NOW.plus(Duration.standardSeconds(4)),
      NOW.plus(Duration.standardSeconds(6)));
  PAssert.that(results).inWindow(window3).containsInAnyOrder(Arrays.asList("a3", "a3"),
      Arrays.asList("b3"));
  pipeline.run().waitUntilFinish();
}

As shown in this post, the timely processing is complementary to the basic stateful processing in Apache Beam. It behaves as a CRON tasks scheduled to executed somewhere in the future. As we could see, they can execute according to the event or processing time and that they're bounded to the windows (i.e. timer is always executed for the current window, it can't be executed for the state of the previous window).