TransformHierarchy in Apache Beam

Versions: Apache Beam 2.2.0 https://github.com/bartosz25/beam-learning

Apache Beam has some similarities with Apache Spark. One of them is the definition of processing pipeline as a Directed Acyclic Graph.

This post focuses on Directed Acyclic Graph (DAG) abstraction in Apache Beam that is managed by Pipeline object. The first section introduces the transform types. The second part analyzes TransforHierarchy that is a structure holding pipeline transforms. The next section focuses on TransformHierarchy construction. The final section shows some Pipeline characteristics through the learning tests.

Transform types

Beam's processing flow is built on transforms. They are in-out operations where some data is consumed and other data is produced. The whole structure is wrapped on org.apache.beam.sdk.runners.TransformHierarchy object. The members of this hierarchy class are nodes composed of: enclosing node (= the container for this node), transformation, inputs and outputs. Before going further it's important to stop and focus on 3 different categories of transformations:

De facto the transforms provided natively with Beam are composite ones. It's dictated by the fact that all of them implements PTransform interface so they need to implement expand method. The following snippet shows the example of filter transform:

private SerializableFunction<T, Boolean> predicate;

@Override
public PCollection<T> expand(PCollection<T> input) {
  return input.apply(ParDo.of(new DoFn<T, T>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
      if (predicate.apply(c.element())) {
        c.output(c.element());
      }
    }
  }));
}

Transform hierarchy graph

Every time when a new transform is added to the pipeline it's not physically executed. The addition consists only on creating TransformHierarchy.Node instance containing all required information to execute the transform after launching the pipeline runner.

To understand better how the TransformHierarchy looks like, nothing better than an example:

Pipeline pipeline = BeamFunctions.createPipeline("No file generated");
PCollection<Integer> inputNumbers = pipeline.apply(Create.of(Arrays.asList(1, 2, 3)));;
inputNumbers.apply("filter_1", Filter.greaterThanEq(2));

pipeline.run().waitUntilFinish();

As you can see, the code has nothing complicated. It's a simple filter operation. After investigating the activity between Pipeline and TransformHierarchy objects we can clearly observe what nodes are created and what properties they have:

Transform hierarchy role

Then, what is the purpose of this TransformHierarchy object ? After learning some points about Spark we could think that the DAG in Beam helps to achieve fault-tolerance. Even if it can be considered as that, the main responsibility of TransformHierarchy is different.

In fact TransformHierarchy serves as an intermediate state between Beam program and its physical executor (= runner). It's used in conjunction with org.apache.beam.sdk.Pipeline.PipelineVisitor interface to traverse the graph of transforms and converts it into objects expected by specific runner.

For instance, in the case of Dataflow runner, a class called DataflowPipelineTranslator.Translator implements the visitor's interface. After preparing the Dataflow execution context it traverses the transforms graph. On each met transform the translator calls org.apache.beam.runners.dataflow.TransformTranslator#translate(<TransformT extends PTransform> transform, TranslationContext context) that converts given action to Dataflow-understandable structure.

The concept of translator applied on TransformHierarchy is also used in the case of Spark runner. For instance in org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator we can observe a map defining the relations between Beam transforms and Spark ones:

static {
  EVALUATORS.put(Read.Unbounded.class, readUnbounded());
  EVALUATORS.put(GroupByKey.class, groupByKey());
  EVALUATORS.put(Combine.GroupedValues.class, combineGrouped());
  EVALUATORS.put(ParDo.MultiOutput.class, parDo());
  EVALUATORS.put(ConsoleIO.Write.Unbound.class, print());
  EVALUATORS.put(CreateStream.class, createFromQueue());
  EVALUATORS.put(Window.Assign.class, window());
  EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
  EVALUATORS.put(Reshuffle.class, reshuffle());
}

Each of defined methods calls Spark's DStream objects. The simplest of them, print(), shows that pretty well:

private static <T> TransformEvaluator<ConsoleIO.Write.Unbound<T>> print() {
  return new TransformEvaluator<ConsoleIO.Write.Unbound<T>>() {
    @Override
    public void evaluate(ConsoleIO.Write.Unbound<T> transform, EvaluationContext context) {
      @SuppressWarnings("unchecked")
      JavaDStream<WindowedValue<T>> dstream =
        ((UnboundedDataset<T>) (context).borrowDataset(transform)).getDStream();
      dstream.map(WindowingHelpers.<T>unwindowFunction()).print(transform.getNum());
    }

    @Override
    public String toNativeString() {
      return ".print(...)";
    }
  };
}

TransformHierarchy building

New nodes are added to TransformHierarchy through org.apache.beam.sdk.Pipeline#applyInternal(String name, InputT input, PTransform<? super InputT, OutputT> transform) method. Under-the-hood it invokes TransformHierarchy lifecycle methods: pushNode, finishSpecifyingInput, setOutput and popNode. The first one, as the name indicates, adds new node to the transform graph.

In finishSpecifyingInput, the TransformHierarchy looks for the node producing the input for currently added transform. It ensures that all inputs needed by given transform are already specified. In its turn, setOutput method handles the data generated in given transform. Among others it adds this data to producers map that is later used by other transforms to retrieve their inputs. The last popNode method pops the added node off the top of the stack.

Thus as you can see, Pipeline's applyInternal method only constructs the transform graph. It's physically computed only when the runner defined in pipeline's options is invoked.

TransformHierarchy examples

In order to see what happens with TransformHierarchy inside Pipeline object we'll use the implementation of org.apache.beam.sdk.Pipeline.PipelineVisitor. The object is used to traverse the transform graph. Thanks to it we can get an idea about the nodes composing the graph:

@Test
public void should_not_execute_pipeline_when_runner_is_not_called() {
  String fileName = "/tmp/beam/file_no_runner";
  Pipeline pipeline = BeamFunctions.createPipeline("No file generated");
  PCollection<String> inputNumbers = pipeline.apply(Create.of(Arrays.asList("a", "b", "c")));

  inputNumbers.apply(TextIO.write().to(fileName));

  File writtenFile = new File(fileName);
  assertThat(writtenFile).doesNotExist();
}


@Test
public void should_show_composite_transform_wrapping_primitive_ones() {
  Pipeline pipeline = BeamFunctions.createPipeline("Composite transform");
  List<Integer> numbers = Arrays.asList(1, 100, 200, 201, 202, 203, 330, 400, 500);
  PCollection<Integer> inputNumbers = pipeline.apply("create", Create.of(numbers));
  class MathOperator extends PTransform<PCollection<Integer>, PCollection<Integer>> {

      private int minValue;
      private int multiplier;
      private int divisor;

      public MathOperator(int minValue, int multiplier, int divisor) {
        this.minValue = minValue;
        this.multiplier = multiplier;
        this.divisor = divisor;
      }

      @Override
      public PCollection<Integer> expand(PCollection<Integer> inputNumbers) {
        return inputNumbers.apply("gt filter", Filter.greaterThan(minValue))
          .apply("multiplier", MapElements.into(TypeDescriptors.integers()).via(number -> number*multiplier))
          .apply("divisor", MapElements.into(TypeDescriptors.integers()).via(number -> number/divisor));
      }
  }
  inputNumbers.apply("composite operation", new MathOperator(200, 5, 2));
  NodesVisitor visitor = new NodesVisitor();

  pipeline.traverseTopologically(visitor);

  pipeline.run().waitUntilFinish();
  List<TransformHierarchy.Node> visitedNodes = visitor.getVisitedNodes();
  String graph = visitor.stringifyVisitedNodes();
  assertThat(graph).isEqualTo("[ROOT]  -> create[composite](out: create/Read(CreateSource).out) -> " +
    "create/Read(CreateSource)(out: create/Read(CreateSource).out) ->  " +
    "(in:  create/Read(CreateSource).out) composite operation[composite]" +
        "(out: composite operation/divisor/Map/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  create/Read(CreateSource).out) composite operation/gt filter[composite]" +
        "(out: composite operation/gt filter/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  create/Read(CreateSource).out) composite operation/gt filter/ParDo(Anonymous)[composite]" +
        "(out: composite operation/gt filter/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  create/Read(CreateSource).out) composite operation/gt filter/ParDo(Anonymous)/ParMultiDo(Anonymous)" +
        "(out: composite operation/gt filter/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  composite operation/gt filter/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) " +
        "composite operation/multiplier[composite]" +
        "(out: composite operation/multiplier/Map/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  composite operation/gt filter/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) " +
        "composite operation/multiplier/Map[composite]" +
        "(out: composite operation/multiplier/Map/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  composite operation/gt filter/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) " +
        "composite operation/multiplier/Map/ParMultiDo(Anonymous)" +
        "(out: composite operation/multiplier/Map/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  composite operation/multiplier/Map/ParMultiDo(Anonymous).out0) " +
        "composite operation/divisor[composite]" +
        "(out: composite operation/divisor/Map/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  composite operation/multiplier/Map/ParMultiDo(Anonymous).out0) " +
        "composite operation/divisor/Map[composite]" +
        "(out: composite operation/divisor/Map/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  composite operation/multiplier/Map/ParMultiDo(Anonymous).out0) " +
        "composite operation/divisor/Map/ParMultiDo(Anonymous)" +
        "(out: composite operation/divisor/Map/ParMultiDo(Anonymous).out0)");
}

@Test
public void should_show_pipeline_with_filter_and_2_transforms() {
  Pipeline pipeline = BeamFunctions.createPipeline("Filter and 2 transforms");
  List<Integer> numbers = Arrays.asList(1, 100, 200, 201, 202, 203, 330, 400, 500);
  PCollection<Integer> inputNumbers = pipeline.apply("create", Create.of(numbers));
  // every almost native transform is a composite, e.g. filter implements expand method (BTW it's the contract
  // since every PTransform implementation must implement this method)
  PCollection<Integer> filteredNumbers = inputNumbers.apply("filter", Filter.greaterThan(200));
  PCollection<Integer> multipliedNumbers = filteredNumbers
    .apply("map1", MapElements.into(TypeDescriptors.integers()).via(number -> number * 5))
    .apply("map2", MapElements.into(TypeDescriptors.integers()).via(number -> number / 2));
  NodesVisitor visitor = new NodesVisitor();

  pipeline.traverseTopologically(visitor);

  pipeline.run().waitUntilFinish();
  String graph = visitor.stringifyVisitedNodes();
  assertThat(graph).isEqualTo("[ROOT]  -> create[composite](out: create/Read(CreateSource).out) -> " +
    "create/Read(CreateSource)(out: create/Read(CreateSource).out) ->  " +
    "(in:  create/Read(CreateSource).out) filter[composite]" +
        "(out: filter/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  create/Read(CreateSource).out) filter/ParDo(Anonymous)[composite]" +
        "(out: filter/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  create/Read(CreateSource).out) filter/ParDo(Anonymous)/ParMultiDo(Anonymous)" +
        "(out: filter/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  filter/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) map1[composite]" +
        "(out: map1/Map/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  filter/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) map1/Map[composite]" +
        "(out: map1/Map/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  filter/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) map1/Map/ParMultiDo(Anonymous)" +
        "(out: map1/Map/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  map1/Map/ParMultiDo(Anonymous).out0) map2[composite]" +
        "(out: map2/Map/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  map1/Map/ParMultiDo(Anonymous).out0) map2/Map[composite]" +
        "(out: map2/Map/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  map1/Map/ParMultiDo(Anonymous).out0) map2/Map/ParMultiDo(Anonymous)" +
        "(out: map2/Map/ParMultiDo(Anonymous).out0)");
}

@Test
public void should_show_pipeline_with_2_root_nodes() {
  Pipeline pipeline = BeamFunctions.createPipeline("2 root nodes");
  List<Integer> numbers = Arrays.asList(1, 100, 200, 201, 202, 203, 330, 400, 500);
  PCollection<Integer> rootNode1 = pipeline.apply("number1", Create.of(numbers));
  PCollection<Integer> rootNode2 = pipeline.apply("numbers2", Create.of(numbers));
  PCollection<Integer> filteredNumbers1 = rootNode1.apply("filter1", Filter.greaterThan(200));
  PCollection<Integer> filteredNumbers2 = rootNode2.apply("filter2", Filter.greaterThan(200));
  NodesVisitor visitor = new NodesVisitor();

  pipeline.traverseTopologically(visitor);

  pipeline.run().waitUntilFinish();
  String graph = visitor.stringifyVisitedNodes();
  assertThat(graph).isEqualTo("[ROOT]  -> number1[composite](out: number1/Read(CreateSource).out) -> " +
    "number1/Read(CreateSource)(out: number1/Read(CreateSource).out) -> " +
    "numbers2[composite](out: numbers2/Read(CreateSource).out) -> numbers2/Read(CreateSource)" +
        "(out: numbers2/Read(CreateSource).out) ->  " +
    "(in:  number1/Read(CreateSource).out) filter1[composite]" +
        "(out: filter1/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  number1/Read(CreateSource).out) filter1/ParDo(Anonymous)[composite]" +
        "(out: filter1/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  number1/Read(CreateSource).out) filter1/ParDo(Anonymous)/ParMultiDo(Anonymous)" +
        "(out: filter1/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  numbers2/Read(CreateSource).out) filter2[composite]" +
        "(out: filter2/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  numbers2/Read(CreateSource).out) filter2/ParDo(Anonymous)[composite]" +
        "(out: filter2/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  numbers2/Read(CreateSource).out) filter2/ParDo(Anonymous)/ParMultiDo(Anonymous)" +
        "(out: filter2/ParDo(Anonymous)/ParMultiDo(Anonymous).out0)");
}

@Test
public void should_show_pipeline_with_simple_filter_transform() {
  Pipeline pipeline = BeamFunctions.createPipeline("Filter transform");
  PCollection<Integer> inputNumbers = pipeline.apply(Create.of(Arrays.asList(1, 2, 3)));
  inputNumbers.apply("filter_1", Filter.greaterThanEq(2));
  NodesVisitor visitor = new NodesVisitor();

  pipeline.traverseTopologically(visitor);

  pipeline.run().waitUntilFinish();
  String graph = visitor.stringifyVisitedNodes();
  assertThat(graph).isEqualTo("[ROOT]  -> Create.Values[composite](out: Create.Values/Read(CreateSource).out) -> " +
    "Create.Values/Read(CreateSource)(out: Create.Values/Read(CreateSource).out) ->  " +
    "(in:  Create.Values/Read(CreateSource).out) filter_1[composite]" +
        "(out: filter_1/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  Create.Values/Read(CreateSource).out) filter_1/ParDo(Anonymous)[composite]" +
        "(out: filter_1/ParDo(Anonymous)/ParMultiDo(Anonymous).out0) ->  " +
    "(in:  Create.Values/Read(CreateSource).out) filter_1/ParDo(Anonymous)/ParMultiDo(Anonymous)" +
        "(out: filter_1/ParDo(Anonymous)/ParMultiDo(Anonymous).out0)");
}


class NodesVisitor implements Pipeline.PipelineVisitor {

  private List<TransformHierarchy.Node> visitedNodes = new ArrayList<>();

  public List<TransformHierarchy.Node> getVisitedNodes() {
    return visitedNodes;
  }

  @Override
  public void enterPipeline(Pipeline p) {
  }

  @Override
  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
    visitedNodes.add(node);
    return CompositeBehavior.ENTER_TRANSFORM;
  }

  @Override
  public void leaveCompositeTransform(TransformHierarchy.Node node) {
  }

  @Override
  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
      visitedNodes.add(node);
  }

  @Override
  public void visitValue(PValue value, TransformHierarchy.Node producer) {
  }

  @Override
  public void leavePipeline(Pipeline pipeline) { 
  }

  public String stringifyVisitedNodes() {
    return  visitedNodes.stream().map(node -> {
      if (node.isRootNode()) {
        return "[ROOT] ";
      }
      String compositeFlagStringified = node.isCompositeNode() ? "[composite]" : "";
      String inputs = stringifyValues(node.getInputs());
      String inputsStringified = inputs.isEmpty() ? "" : " (in:  " + inputs + ") ";
      String outputs = stringifyValues(node.getOutputs());
      String outputsStringified = outputs.isEmpty() ? "" : "(out: " + outputs + ")";
      return inputsStringified + node.getFullName() + compositeFlagStringified + outputsStringified;
    }).collect(Collectors.joining(" -> "));
  }

  private static String stringifyValues(Map<TupleTag<?>, PValue> values) {
    return values.entrySet().stream()
      .map(entry -> entry.getValue().getName()).collect(Collectors.joining(","));
  }
}

The understanding of TransformHierarchy helps to discover how Beam defers the real execution of the pipeline. As told in the first section, this structure is based on nodes representing different types of transforms: root, composite or primitive. All of them are added to the DAG through Pipeline's applyInternal method. As in the case of Spark, the DAG doesn't execute the computation. TransformHierarchy is used as an abstraction representing operations understandable by all runners. Later each of runners translates them to its internal methods. We could see that in the second section with Dataflow or Spark runner.