Side output in Apache Beam

Versions: Apache Beam 2.2.0 https://github.com/bartosz25/beam-learning

The possibility to define several additional inputs for ParDo transform is not the single feature of this type in Apache Beam. The framework provides also the possibility to define one or more extra outputs through the structures called side outputs.

This post focuses more on this another Beam's feature. As in the case of side input in Apache Beam, it begins with a short introduction followed by side output's Java API description. The last section shows how to use the side outputs in simple test cases.

Side output defined

Side output is a great manner to branch the processing. Let's take the example of an input data source that contains both valid and invalid values. Valid values must be written in place #1 and the invalid ones in place#2. A naive solution suggests to use a filter and write 2 distinct processing pipelines. However this approach has one main drawback - the input dataset is read twice. If for the mentioned problem we use side outputs, we can still have 1 ParDo transform that internally dispatches valid and invalid values to appropriate places (#1 or #2, depending on value's validity).

The side output can also be used for the situations when we need to produce the outputs of different types. For instance, we can have an input collection of JSON entries that will be transformed to Protobuf and Avro files in order to check later which of these formats is more efficient.

The side outputs are not also used by user-specific transforms. Beam also internally uses the side outputs in some of provided transforms:

Side output Java API

Technically the use of side outputs is based on the declaration of TupleTag<T>. Since the output generated by the processing function is not homogeneous, this object helps to distinguish them and facilitate their use in subsequent transforms. It also enforces type safety of processed data.

The tags are passed in ParDo's withOutputTags( TupleTag<OutputT> mainOutputTag, TupleTagList additionalOutputTags). The first argument of this method represents the type of the main produced PCollection. This main dataset is produced with the usual ProcessContext's output(OutputT output) method. The additional outputs are specified as the 2nd argument of withOutputTags(...) and are produced with output(TupleTag<T> tag, T output) method.

All side outputs are bundled to the PCollectionTuple or KeyedPCollectionTuple if the key-value pairs are produced. They can be later retrieved with simple getters of these objects.

Using side outputs brings a specific rule regarding to the coders. The TupleTag must be declared as an anonymous class (suffixed with {} to the constructor call). Otherwise coder's inference would be compromised.

Side output example

The following tests illustrate the use of side outputs:

private static final Instant NOW = new Instant(0);
private static final Instant SEC_1_DURATION = NOW.plus(Duration.standardSeconds(1));
private static final Instant SEC_2_DURATION = NOW.plus(Duration.standardSeconds(2));
private static final Instant SEC_3_DURATION = NOW.plus(Duration.standardSeconds(3));

@Test
public void should_build_collections_of_2_different_types() {
  Pipeline pipeline = BeamFunctions.createPipeline("Side output with 2 different types");
  TupleTag<String> stringBooleans = new TupleTag<String>(){};
  TupleTag<Integer> integerBooleans = new TupleTag<Integer>(){};

  PCollection<Boolean> booleanFlags = pipeline.apply(Create.of(Arrays.asList(true, true, false, false, true)));

  PCollectionTuple outputDatasets = booleanFlags.apply(ParDo.of(new DoFn<Boolean, String>() {
    @ProcessElement
    public void processElement(ProcessContext processContext) {
      processContext.output(processContext.element().toString());
      processContext.output(integerBooleans, getNumberFromBoolean(processContext.element()));
    }
    private int getNumberFromBoolean(boolean flag) {
      return flag ? 1 : 0;
    }
  }).withOutputTags(stringBooleans, TupleTagList.of(integerBooleans)));

  PCollection<String> stringFlags = outputDatasets.get(stringBooleans);
  PCollection<Integer> integerFlags = outputDatasets.get(integerBooleans);
  PAssert.that(stringFlags).containsInAnyOrder("true", "true", "false", "false", "true");
  PAssert.that(integerFlags).containsInAnyOrder(1, 1, 0, 0, 1);
  pipeline.run().waitUntilFinish();
}

@Test
public void should_build_collections_of_the_same_type_but_for_different_processing_later() {
  Pipeline pipeline = BeamFunctions.createPipeline("Side output branching processing");
  TupleTag<Letter> validLetters = new TupleTag<Letter>(){};
  TupleTag<Letter> invalidLetters = new TupleTag<Letter>(){};
  PCollection<Letter> booleanFlags = pipeline.apply(Create.of(Arrays.asList(
    new Letter("a"), new Letter("b"), new Letter(""),
    new Letter("c"), new Letter("d")
  )));

  PCollectionTuple outputDatasets = booleanFlags.apply(ParDo.of(new DoFn<Letter, Letter>() {
    @ProcessElement
    public void processElement(ProcessContext processContext) {
      Letter processedLetter = processContext.element();
      if (processedLetter.isValid()) {
        processContext.output(processedLetter);
      } else {
        processContext.output(invalidLetters, processedLetter);
      }
    }
  }).withOutputTags(validLetters, TupleTagList.of(invalidLetters)));

  PCollection<Letter> validLettersPCollection = outputDatasets.get(validLetters);
  validLettersPCollection.apply(ParDo.of(new DoFn<Letter, Void>() {
    @ProcessElement
    public void processElement(ProcessContext processContext) {
      // Sample store taking valid letters
      LetterStore.VALID.addLetter(processContext.element());
    }
  }));
  PCollection<Letter> invalidLettersPCollection = outputDatasets.get(invalidLetters);
  invalidLettersPCollection.apply(ParDo.of(new DoFn<Letter, Void>() {
    @ProcessElement
    public void processElement(ProcessContext processContext) {
      // Sample store taking invalid letters
      LetterStore.INVALID.addLetter(processContext.element());
    }
  }));
  pipeline.run().waitUntilFinish();

  List<Letter> validLettersFromStore = LetterStore.VALID.getData();
  assertThat(validLettersFromStore).extracting("letter").containsOnly("a", "b", "c", "d");
  List<Letter> invalidLettersFromStore = LetterStore.INVALID.getData();
  assertThat(invalidLettersFromStore).extracting("letter").containsOnly("");
}

@Test
public void should_allow_to_build_empty_side_output() {
  Pipeline pipeline = BeamFunctions.createPipeline("Empty side output", 2);
  TestStream<String> letters = TestStream.create(StringUtf8Coder.of()).addElements(
    TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("b", SEC_1_DURATION),
    TimestampedValue.of("c", SEC_1_DURATION), TimestampedValue.of("d", SEC_1_DURATION),
    TimestampedValue.of("e", SEC_1_DURATION), TimestampedValue.of("f", SEC_1_DURATION)
  )
  .advanceWatermarkToInfinity();
  TupleTag<String> notEmptyLetters = new TupleTag<String>(){};
  TupleTag<String> emptyLetters = new TupleTag<String>(){};
  PCollectionTuple outputDatasets = pipeline.apply(letters).apply(ParDo.of(new DoFn<String, String>() {
    @ProcessElement
    public void processElement(ProcessContext processContext) throws InterruptedException {
    }
  }).withOutputTags(notEmptyLetters, TupleTagList.of(emptyLetters)));

  PCollection<String> notEmptyLettersDataset = outputDatasets.get(notEmptyLetters);
  notEmptyLettersDataset.apply(ParDo.of(new DoFn<String, String>() {
    @ProcessElement
    public void processElement(ProcessContext processContext) {}
  }));

  pipeline.run().waitUntilFinish();
}

@Test
public void should_show_that_declaring_side_output_without_bracets_should_not_work() {
  Pipeline pipeline = BeamFunctions.createPipeline("TupleTag side output declaration as not anonymous class (no {})");
  TupleTag<String> stringBooleans = new TupleTag<>();
  TupleTag<Integer> integerBooleans = new TupleTag<>();

  PCollection<Boolean> booleanFlags = pipeline.apply(Create.of(Arrays.asList(true, true, false, false, true)));

  assertThatThrownBy(() -> {
    PCollectionTuple outputDatasets = booleanFlags.apply(ParDo.of(new DoFn<Boolean, String>() {
      @ProcessElement
      public void processElement(ProcessContext processContext) {
      }
    }).withOutputTags(stringBooleans, TupleTagList.of(integerBooleans)));
    pipeline.run().waitUntilFinish();
  })
  .isInstanceOf(IllegalStateException.class)
  .hasMessageContaining("Unable to return a default Coder for ParMultiDo(Anonymous).out1 [PCollection]. " +
          "Correct one of the following root causes")
  .hasMessageContaining("Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for V.")
  .hasMessageContaining("No Coder has been manually specified;  you may do so using .setCoder().");
}

@Test
public void should_compute_different_side_outputs_in_different_windows() {
  Pipeline pipeline = BeamFunctions.createPipeline("Different side outputs in 2 different windows");
  TestStream<String> letters = TestStream.create(StringUtf8Coder.of()).addElements(
    TimestampedValue.of("a", SEC_1_DURATION), TimestampedValue.of("b", SEC_1_DURATION),
    TimestampedValue.of("f", SEC_1_DURATION), TimestampedValue.of("c", SEC_1_DURATION),
    TimestampedValue.of("d", SEC_2_DURATION), TimestampedValue.of("e", SEC_3_DURATION)
  )
  .advanceWatermarkToInfinity();

  TupleTag<String> lettersRepeatedOnce = new TupleTag<String>(){};
  TupleTag<String> lettersRepeatedTwice = new TupleTag<String>(){};
  Duration windowDuration = Duration.standardSeconds(2);
  Window<String> window = Window.<String>into(FixedWindows.of(windowDuration)) ; // .withAllowedLateness(Duration.ZERO, Window.ClosingBehavior.FIRE_ALWAYS) .accumulatingFiredPanes();
  PCollectionTuple outputDatasets = pipeline.apply(letters).apply(window)
    .apply(ParDo.of(new DoFn<String, String>() {
      @ProcessElement
      public void processElement(ProcessContext processContext, IntervalWindow window) {
        String letter = processContext.element();
        processContext.output(letter);
        String repeatedTwiceLetter = letter+letter;
        processContext.output(lettersRepeatedTwice, repeatedTwiceLetter);
      }
    }).withOutputTags(lettersRepeatedOnce, TupleTagList.of(lettersRepeatedTwice)));

  IntervalWindow window1 = new IntervalWindow(NOW, NOW.plus(windowDuration));
  PAssert.that(outputDatasets.get(lettersRepeatedOnce)).inFinalPane(window1)
    .containsInAnyOrder("a", "b", "c", "f");
  PAssert.that(outputDatasets.get(lettersRepeatedTwice)).inFinalPane(window1)
    .containsInAnyOrder("aa", "bb", "cc", "ff");
  IntervalWindow window2 = new IntervalWindow(NOW.plus(windowDuration), NOW.plus(windowDuration).plus(windowDuration));
  PAssert.that(outputDatasets.get(lettersRepeatedOnce)).inFinalPane(window2)
    .containsInAnyOrder("d", "e");
  PAssert.that(outputDatasets.get(lettersRepeatedTwice)).inFinalPane(window2)
    .containsInAnyOrder("dd", "ee");
  pipeline.run().waitUntilFinish();
}


enum LetterStore {
  VALID, INVALID;

  private List<Letter> data = new ArrayList<>();

  public void addLetter(Letter letter) {
    data.add(letter);
  }

  public List<Letter> getData() {
    return data;
  }

}

class Letter implements Serializable {
  private final String letter;
  private final boolean valid;

  public Letter(String letter) {
    this.letter = letter;
    if (this.letter.isEmpty()) {
      valid = false;
    } else {
      valid = true;
    }
  }

  public String getLetter() {
    return letter;
  }

  public boolean isValid() {
    return valid;
  }

  @Override
  public String toString() {
    return MoreObjects.toStringHelper(this).add("letter", letter).add("valid", valid).toString();
  }
}

The following video shows how side output behaves with unbounded source. In this case we use Kafka 0.10.1 and we should see that side output is computed with every processed element within a window - it doesn't wait that all elements of a window are processed:

In this post we can clearly see how side outputs beneficial can be. As introduced in the first section, side outputs are similar to side input, except that they concern produced and not consumed data. Thus the side output helps to produce more than 1 usual dataset from a given ParDo transform. It's a serious alternative to the classical approach constructing 2 distinguish PCollections since it traverses the input dataset only once.