Side input in Apache Beam

Versions: Apache Beam 2.2.0 https://github.com/bartosz25/beam-learning

Very often dealing with a single PCollection in the pipeline is sufficient. However there are some cases, for instance when one dataset complements another, when several different distributed collections must be joined in order to produce meaningful results. Apache Spark deals with it through broadcast variables. Apache Beam also has similar mechanism called side input.

This post focuses on this Apache Beam's feature. The first part explains it conceptually. The next one describes the Java API used to define side input. Finally the last section shows some simple use cases in learning tests.

Side input explained

A side input is nothing more nothing less than a PCollection that can be used as an additional input to ParDo transform. However, unlike normal (processed) PCollection, the side input is a global and immutable view of underlaid PCollection. It obviously means that it can't change after computation. It can be used every time when we need to join additional datasets to the processed one or broadcast some common values (e.g. a dictionary) to the processing functions.

Any object, as well as singleton, tuple or collections, can be used as a side input.

The side input, since it's a kind of frozen PCollection, benefits of all PCollection features, such as windowing. And it's nothing strange in side input's windowing when it fits to the windowing of the processed PCollection. In the contrary situation some constraints exist. When side input's window is smaller than the processing dataset window, an error telling that the empty side input was encountered is produced. When the side input's window is larger, then the runner will try to select the most appropriated items from this large window. But even for this case an error can occur, especially when we're supposed to deal with a single value (singleton) and the window produces several entries.

Naturally the side input introduces a precedence rule. Since it's an immutable view, the side input must be computed before its use in the processed PCollection.

Certain forms of side input are cached in the memory on each worker reading it. So they must be small enough to fit into the available memory. The caching occurs every time but the situation when the input side is represented as an iterable.

Side input caching

By the way the side input cache is an interesting feature, especially in Dataflow runner for batch processing. The runner is able to look for side input values without loading whole dataset into the memory. This feature was added in Dataflow SDK 1.5.0 release for list and map-based side inputs and is called indexed side inputs.

With indexed side inputs the runner won't load all values of side input into its memory. Instead it'll only look for the side input values corresponding to index/key defined in the processing and only these values will be cached.

The cache size of Dafaflow workers can be modified through --workerCacheMb property.

Side input Java API

Internally the side inputs are represented as views. Unsurprisingly the object is called PCollectionView and it's a wrapper of materialized PCollection. It's constructed with the help of org.apache.beam.sdk.transforms.View transforms. Each transform enables to construct a different type of view:

The side inputs can be used in ParDo transform with the help of withSideInputs(PCollectionView<?>... sideInputs) method (variance taking an Iterable as parameter can be used too). Later in the processing code the specific side input can be accessed through ProcessContext's sideInput(PCollectionView<T> view). The access is done with the reference representing the side input in the code.

Side input example

Even if discovering side input benefits is the most valuable in really distributed environment, it's not so bad idea to check some of properties described above in a local runtime context:

private static final Instant NOW = new Instant(0);
private static final Instant SEC_1_DURATION = NOW.plus(Duration.standardSeconds(1));
private static final Instant SEC_2_DURATION = NOW.plus(Duration.standardSeconds(2));
private static final Instant SEC_3_DURATION = NOW.plus(Duration.standardSeconds(3));

@Test
public void should_show_side_input_in_global_window() {
  Pipeline pipeline = BeamFunctions.createPipeline("Side input in global window");
  PCollection<Integer> processedMeanNumbers = pipeline.apply(Create.of(Arrays.asList(2, 50, 20)));
  // Here the parameter shared by all functions must be computed
  PCollectionView<Integer> minNumber = processedMeanNumbers.apply(Min.integersGlobally().asSingletonView());
  PCollection<Integer> processedNumbers = pipeline.apply(Create.of(Arrays.asList(1, 2, 3, 4, 5, 6)));

  PCollection<Integer> numbersFromChars = processedNumbers.apply(
    ParDo.of(new DoFn<Integer, Integer>() {
      @ProcessElement
      public void processElement(ProcessContext processContext) {
        int minNumberToFilter = processContext.sideInput(minNumber);
        int processedElement = processContext.element();
        if (processedElement > minNumberToFilter) {
          processContext.output(processedElement);
        }
      }
    }).withSideInputs(minNumber));

  PAssert.that(numbersFromChars).containsInAnyOrder(3, 4, 5, 6);
  pipeline.run().waitUntilFinish();
}

@Test
public void should_compute_different_side_inputs_with_streaming_processing() {
  Pipeline pipeline = BeamFunctions.createPipeline("Side inputs with windows for unbounded collection");
  PCollection<String> orders = pipeline.apply(Create.timestamped(Arrays.asList(
    TimestampedValue.of("order#1", SEC_1_DURATION), TimestampedValue.of("order#2", SEC_1_DURATION),
    TimestampedValue.of("order#3", SEC_2_DURATION), TimestampedValue.of("order#4", SEC_2_DURATION)
  )));
  PCollection<Boolean> paymentSystemErrors = pipeline.apply(Create.timestamped(Arrays.asList(
    TimestampedValue.of(false, SEC_1_DURATION), TimestampedValue.of(true, SEC_2_DURATION)
  )));
  // It's illegal to define side input window lower than the data window; It returns an error
  // with empty PCollectionView
  Window<Boolean> paymentSystemErrorsWindow = Window.into(FixedWindows.of(Duration.standardSeconds(2)));
  Window<String> processingWindow = Window.into(FixedWindows.of(Duration.standardSeconds(1)));

  PCollectionView<Boolean> paymentErrorsPerSecond = paymentSystemErrors
    .apply(paymentSystemErrorsWindow).apply(View.asSingleton());

  // In order to filter we need to pass by ParDo - it doesn't apply to Filter.by(...)
  PCollection<String> ordersWithoutPaymentProblem = orders.apply(processingWindow).apply(
    ParDo.of(new DoFn<String, String>() {
    @ProcessElement
    public void processElement(ProcessContext processContext) {
      boolean wasPaymentError = processContext.sideInput(paymentErrorsPerSecond);
      if (!wasPaymentError) {
        processContext.output(processContext.element());
      }
    }
  }).withSideInputs(paymentErrorsPerSecond));

  IntervalWindow window1 = new IntervalWindow(SEC_1_DURATION, SEC_2_DURATION);
  PAssert.that(ordersWithoutPaymentProblem).inWindow(window1).containsInAnyOrder("order#1", "order#2");
  IntervalWindow window2 = new IntervalWindow(SEC_2_DURATION, SEC_3_DURATION);
  PAssert.that(ordersWithoutPaymentProblem).inWindow(window2).empty();
  pipeline.run().waitUntilFinish();
}

@Test
public void should_fail_when_side_input_window_is_bigger_than_processing_window_and_more_than_1_value_is_found() {
  Pipeline pipeline = BeamFunctions.createPipeline("Side inputs with too big window");
  PCollection<String> orders = pipeline.apply(Create.timestamped(Arrays.asList(
    TimestampedValue.of("order#1", SEC_1_DURATION), TimestampedValue.of("order#2", SEC_1_DURATION),
    TimestampedValue.of("order#3", SEC_2_DURATION), TimestampedValue.of("order#4", SEC_2_DURATION)
  )));
  PCollection<Boolean> paymentSystemErrors = pipeline.apply(Create.timestamped(Arrays.asList(
    TimestampedValue.of(false, SEC_1_DURATION), TimestampedValue.of(true, SEC_2_DURATION)
  )));
  Window<Boolean> paymentSystemErrorsWindow = Window.into(FixedWindows.of(Duration.standardSeconds(20)));
  Window<String> processingWindow = Window.into(FixedWindows.of(Duration.standardSeconds(1)));
  assertThatThrownBy(() -> {
    PCollectionView<Boolean> paymentErrorsPerSecond = paymentSystemErrors
      .apply(paymentSystemErrorsWindow).apply(View.asSingleton());
    orders.apply(processingWindow).apply(ParDo.of(new DoFn<String, String>() {
      @ProcessElement
      public void processElement(ProcessContext processContext) {
        boolean wasPaymentError = processContext.sideInput(paymentErrorsPerSecond);
        if (!wasPaymentError) {
          processContext.output(processContext.element());
        }
      }
    }).withSideInputs(paymentErrorsPerSecond));
    pipeline.run().waitUntilFinish();
  })
  .isInstanceOf(Pipeline.PipelineExecutionException.class)
  .hasMessageContaining("PCollection with more than one element accessed as a singleton view. " +
          "Consider using Combine.globally().asSingleton() to combine the PCollection into a single value");

}

@Test
public void should_fail_when_the_window_of_side_input_is_smaller_than_the_processing_window() {
  Pipeline pipeline = BeamFunctions.createPipeline("Side input window smaller than processing window");
  PCollection<String> orders = pipeline.apply(Create.timestamped(Arrays.asList(
    TimestampedValue.of("order#1", SEC_1_DURATION), TimestampedValue.of("order#2", SEC_1_DURATION),
    TimestampedValue.of("order#3", SEC_2_DURATION), TimestampedValue.of("order#4", SEC_2_DURATION)
  )));
  PCollection<Boolean> paymentSystemErrors = pipeline.apply(Create.timestamped(Arrays.asList(
    TimestampedValue.of(false, SEC_1_DURATION), TimestampedValue.of(true, SEC_2_DURATION)
  )));
  Window<Boolean> paymentSystemErrorsWindow = Window.into(FixedWindows.of(Duration.standardSeconds(1)));
  Window<String> processingWindow = Window.into(FixedWindows.of(Duration.standardSeconds(5)));

  assertThatThrownBy(() -> {
    PCollectionView<Boolean> paymentErrorsPerSecond = paymentSystemErrors
        .apply(paymentSystemErrorsWindow).apply(View.asSingleton());
    orders.apply(processingWindow).apply(ParDo.of(new DoFn<String, String>() {
      @ProcessElement
      public void processElement(ProcessContext processContext) {
        boolean wasPaymentError = processContext.sideInput(paymentErrorsPerSecond);
        if (!wasPaymentError) {
          processContext.output(processContext.element());
        }
      }
    }).withSideInputs(paymentErrorsPerSecond));

    pipeline.run().waitUntilFinish();
  }).isInstanceOf(Pipeline.PipelineExecutionException.class)
    .hasMessageContaining("Empty PCollection accessed as a singleton view");
}

@Test
public void should_show_that_side_input_brings_precedence_execution_rule_when_it_s_overriden_after_pardo_using_it() {
  Pipeline pipeline = BeamFunctions.createPipeline("Side inputs with different windows");
  PCollection<String> orders = pipeline.apply(Create.timestamped(Arrays.asList(
    TimestampedValue.of("order#1", SEC_1_DURATION), TimestampedValue.of("order#2", SEC_1_DURATION),
    TimestampedValue.of("order#3", SEC_2_DURATION), TimestampedValue.of("order#4", SEC_2_DURATION)
  )));
  PCollection<Boolean> paymentSystemErrors = pipeline.apply(Create.timestamped(Arrays.asList(
    TimestampedValue.of(false, SEC_1_DURATION), TimestampedValue.of(true, SEC_2_DURATION)
  )));
  // It's illegal to define side input window lower than the data window; It returns an error
  // with empty PCollectionView
  Window<Boolean> paymentSystemErrorsWindow = Window.into(FixedWindows.of(Duration.standardSeconds(2)));
  Window<String> processingWindow = Window.into(FixedWindows.of(Duration.standardSeconds(1)));
  assertThatThrownBy(() -> {
    PCollectionView<Boolean> paymentErrorsPerSecond = null;
    orders.apply(processingWindow).apply(ParDo.of(new DoFn<String, String>() {
      @ProcessElement
      public void processElement(ProcessContext processContext) {}
    }).withSideInputs(paymentErrorsPerSecond));

    paymentErrorsPerSecond = paymentSystemErrors.apply(paymentSystemErrorsWindow).apply(View.asSingleton());
    pipeline.run().waitUntilFinish();
  }).isInstanceOf(NullPointerException.class);
}

Side inputs are a very interesting feature of Apache Beam. As described in the first section, they represent a materialized view (map, iterable, list, singleton value) of a PCollection. This materialized view can be shared and used later by subsequent processing functions. As we saw, most of side inputs require to fit into the worker's memory because of caching. It's not true for iterable that is simply not cached. Moreover, Dataflow runner brings an efficient cache mechanism that caches only really read values from list or map view.