ParDo transformation in Apache Beam

on waitingforcode.com

ParDo transformation in Apache Beam

Previous post introduced built-in transformations available in Apache Beam. Most of them were presented - except ParDo that will be described now.

Since ParDo has a little bit more logic than other transformations, it deserves a separate post. The first part defines the ParDo. The second section explains how to use it. The last part shows several use cases through learning tests.

ParDo explained

Apache Beam executes its transformations in parallel on different nodes called workers. As we shown in the post about data transformations in Apache Beam, it provides some common data processing operations. However, their scope is often limited and it's the reason why an universal transformation called ParDo exists. It can be described by the following points:

  • calls any user-specified function
  • its processing method is applied on each element of dataset, one by one
  • if different resources are allocated, the dataset's elements can be processed in parallel
  • takes one or multiple datasets and is also able to output one or more of datasets
  • processed elements keep their original timestamp and window
  • no global mutable state - it's not possible to share some mutable state among executed functions. In fact they are serialized and sent as so to the workers. So even if they reference some global variables (as collections), the workers will receive only the copy of these variables and not the variables themselves
  • the ParDo transformation is fault-tolerant, i.e. if it crashes, it's rerun. The transformation also has a concept of speculative execution (read about speculative execution in Spark, both are similar basics). The processing for given subset of dataset can be executed on 2 different workers at any time. The results coming from the quickest worker are later used and for the slower one are discarded. At this occasion it's important to emphasize that ParDo implementation must be aware of parallel execution on the same subset of data. It's easy to achieve if the transformation is a pure function without side effects. It's a little bit more difficult with side effects, as for instance external resources as database. The operations made on these resources must be idempotent and accept concurrent writes. A non idempotent operation can be an insert to database with auto-incremented key. If in this case 2 ParDos are running and both have written a half of data in such database, the results will be invalid since they'll be duplicated in a half.
  • the execution of ParDo transformation is also optimized. One of 2 different kinds of optimization is applied.
    The first one is called producer-consumer fusion. This optimization is applied when one transformation is the input for the other transformation. In this situation they're chained and executed on the same moment.
    The second category of optimization is called sibling fusion. It's used when 2 or more ParDo functions use the same PCollection as their input. In such case the functions are merged into a single execution scope. Now, when each element of the input is executed against the functions included in this scope. Thanks to that the input data is read only once.
  • can be named - by the way it's a good practice to explicitly name the function. It's useful in monitoring and debugging

ParDo API

The processing inside ParDo is specified as the implementation of DoFn<InputT, OutputT>. Obviously the function must define the processing method. But it's not the only possibility because, through DoFn's annotations, we can define the methods called at specific moment of processing:

  1. @DoFn.Setup - the method annotated with this is called every time when a new instance of DoFn is created on the worker. The initialization method must be parameter-less.
  2. @DoFn.StartBundle - the methods annotated with @DoFn.StartBundle are called every time when the created DoFn is prepared to be executed against a subset of dataset (bundle). Unlike the previous annotation, this one accepts 1 argument. If defined, it must be however an instance of StartBundleContext holding the pipeline information available before the processing.
  3. @DoFn.ProcessElement - as the name indicates, this annotation is responsible for the methods processing one element of the bundle. It requires the presence of ProcessContext as the first parameter. It can also contain other parameters being the subtypes of RestrictionTracker and BoundedWindow.
  4. @DoFn.FinishBundle - this method is used after the last element of given bundle was processed. It can take 1 parameter being an instance of FinishBundleContext that, similarly to StartBundleContext, contains the information about the processing pipeline.
  5. @DoFn.Teardown - when there are no new bundles to process, the shutdown method annotated with this annotation is called. Normally it should be invoked the same number of times than the setup method. Another invocation conditions the failure. When given function fails, it'll also call teardown method if defined.

ParDo examples

After this long and theoretical introduction it's a good moment to start to write some ParDo functions and investigate their behavior:

@Test
public void should_show_no_global_state_existence() {
  Pipeline pipeline = BeamFunctions.createPipeline("Global state not existence test");
  PCollection<Integer> numbers123 = pipeline.apply(Create.of(Arrays.asList(1, 2, 3)));
  class IdentityFunction extends DoFn<Integer, Integer> {
    private final List<Integer> numbers;

    public IdentityFunction(List<Integer> numbers) {
      this.numbers = numbers;
    }

    @DoFn.ProcessElement
    public void processElement(ProcessContext processContext) {
      processContext.output(processContext.element());
      numbers.add(processContext.element());
    }

    @DoFn.Teardown
    public void teardown() {
      if (numbers.isEmpty()) {
        throw new IllegalStateException("Numbers on worker should not be empty");
      }
      System.out.println("numbers="+numbers);
    } 
  }
  List<Integer> numbers = new ArrayList<>();

  PCollection<Integer> numbersFromChars = numbers123.apply(ParDo.of(new IdentityFunction(numbers)));

  PAssert.that(numbersFromChars).containsInAnyOrder(1, 2, 3);
  assertThat(numbers).isEmpty();
  pipeline.run().waitUntilFinish();
}

@Test
public void should_compute_singleton_pcollection_as_shared_state() {
  Pipeline pipeline = BeamFunctions.createPipeline("PCollection singleton state test");
  PCollection<Integer> processedMeanNumbers = pipeline.apply(Create.of(Arrays.asList(1, 2, 3, 4, 5, 6)));
  // Here the parameter shared by all functions must be computed
  PCollectionView<Integer> minNumber = processedMeanNumbers.apply(Min.integersGlobally().asSingletonView());
  PCollection<Integer> processedNumbers = pipeline.apply(Create.of(Arrays.asList(1, 2, 3, 4, 5, 6)));
  class MinNumberFilter extends DoFn<Integer, Integer> {

    @DoFn.ProcessElement
    public void processElement(ProcessContext processContext) throws InterruptedException {
      int minNumberProcessed = processContext.sideInput(minNumber);
      if (processContext.element() > minNumberProcessed*2) {
        processContext.output(processContext.element());
      }
    }
  }

  PCollection<Integer> numbersFromChars = processedNumbers.apply(
    ParDo.of(new MinNumberFilter()).withSideInputs(minNumber));

  PAssert.that(numbersFromChars).containsInAnyOrder(3, 4, 5, 6);
  pipeline.run().waitUntilFinish();
}

@Test
public void should_compute_shared_state_in_functions_constructor() {
  int workers = 5;
  Pipeline pipeline = BeamFunctions.createPipeline("Shared state from function constructor test", workers);
  PCollection<Integer> processedNumbers = pipeline.apply(Create.of(Arrays.asList(1, 2, 3, 4, 5, 6)));
  class IdentityFunction extends DoFn<Integer, Integer> {

    public IdentityFunction() {
      Counter.INSTANCE.incrementConstructorCalls();
    }

    @DoFn.ProcessElement
    public void processElement(ProcessContext processContext) {
      processContext.output(processContext.element());
    }

    @DoFn.Teardown
    public void tearDown() {
      Counter.INSTANCE.incrementTeardownCalls();
    }
  }
  List<Integer> numbers = new ArrayList<>();

  PCollection<Integer> numbersFromChars = processedNumbers.apply(ParDo.of(new IdentityFunction()));

  PAssert.that(numbersFromChars).containsInAnyOrder(1, 2, 3, 4, 5, 6);
  assertThat(numbers).isEmpty();
  pipeline.run().waitUntilFinish();
  // As show, the constructor was called only once and teardown function more than that (it proves the
  // presence of more than 1 function's instance)
  // It shows that we can also initialize a state shared by different functions in the constructor
  // It also shows that the function's object is initialized once and sent serialized to the workers
  assertThat(Counter.INSTANCE.getConstructorCalls()).isEqualTo(1);
  assertThat(Counter.INSTANCE.getTeardownCalls()).isEqualTo(workers);
}

@Test
public void should_show_lifecycle() {
  Pipeline pipeline = BeamFunctions.createPipeline("Lifecycle test", 2);
  PCollection<Integer> numbers1To100 =
          pipeline.apply(Create.of(IntStream.rangeClosed(1, 1_000).boxed().collect(Collectors.toList())));
  class LifecycleHandler extends DoFn<Integer, Integer> {

    @DoFn.Setup
    public void setup() {
      LifecycleEventsHandler.INSTANCE.addSetup();
    }

    @DoFn.StartBundle
    public void initMap(DoFn<Integer, Integer>.StartBundleContext startBundleContext) {
      LifecycleEventsHandler.INSTANCE.addStartBundle();
    }

    @DoFn.ProcessElement
    public void processElement(ProcessContext processContext) {
      processContext.output(processContext.element());
      LifecycleEventsHandler.INSTANCE.addProcessing();
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn<Integer, Integer>.FinishBundleContext finishBundleContext) {
      LifecycleEventsHandler.INSTANCE.addFinishBundle();
    }

    @DoFn.Teardown
    public void turnOff(){
      LifecycleEventsHandler.INSTANCE.addTeardown();
    }

  }

  PCollection<Integer> processedNumbers = numbers1To100.apply(ParDo.of(new LifecycleHandler()));

  pipeline.run().waitUntilFinish();
  assertThat(LifecycleEventsHandler.INSTANCE.getSetupCount()).isEqualTo(2);
  // The number of bundles is not fixed over the time but it'd vary between 2 and 4
  // It proves however that the @StartBundle method doesn't means the same as @Setup one
  assertThat(LifecycleEventsHandler.INSTANCE.getStartBundleCount()).isGreaterThanOrEqualTo(2);
  assertThat(LifecycleEventsHandler.INSTANCE.getProcessingCount()).isEqualTo(1_000);
  assertThat(LifecycleEventsHandler.INSTANCE.getFinishBundleCount()).isEqualTo(LifecycleEventsHandler.INSTANCE.getStartBundleCount());
  assertThat(LifecycleEventsHandler.INSTANCE.getTeardownCount()).isEqualTo(2);
}

@Test
public void should_take_different_inputs() {
  Pipeline pipeline = BeamFunctions.createPipeline("Different inputs test");
  PCollection<Integer> numbers1To5 = pipeline.apply(Create.of(Arrays.asList(1, 2, 3, 4, 5)));
  PCollection<KV<Integer, String>> numbersWords = pipeline.apply(Create.of(Arrays.asList(
    KV.of(1, "one"), KV.of(2, "two"), KV.of(3, "three"), KV.of(4, "four"), KV.of(5, "five")
  )));
  PCollectionView<Map<Integer, String>> numbersWordsView = numbersWords.apply(View.asMap());
  class ExternalMapper extends DoFn<Integer, String> {
    @DoFn.ProcessElement
    public void processElement(ProcessContext processContext) {
      String word = processContext.sideInput(numbersWordsView).get(processContext.element());
      processContext.output(word);
    }
  }

  PCollection<String> mappedNumbers = numbers1To5.apply(ParDo.of(new ExternalMapper())
  .withSideInputs(Collections.singleton(numbersWordsView)));

  PAssert.that(mappedNumbers).containsInAnyOrder("one", "two", "three", "four", "five");
  pipeline.run().waitUntilFinish();
}

@Test
public void should_produce_different_outputs() {
  Pipeline pipeline = BeamFunctions.createPipeline("Different outputs test", 2);
  PCollection<Integer> numbers1To5 = pipeline.apply(Create.of(Arrays.asList(1, 2, 3, 4, 5)));
  TupleTag<Integer> multipliedNumbersTag = new TupleTag<Integer>(){};
  TupleTag<String> originalNumbersTags = new TupleTag<String>(){};
  class Multiplicator extends DoFn<Integer, Integer> {
    @DoFn.ProcessElement
    public void processElement(ProcessContext processContext) {
      int number = processContext.element();
      processContext.output(originalNumbersTags, ""+number);
      processContext.output(number*2);
    }
  }

  PCollectionTuple results = numbers1To5.apply(ParDo.of(new Multiplicator())
    .withOutputTags(multipliedNumbersTag, TupleTagList.of(originalNumbersTags)));

  PCollection<Integer> multipliedNumbersEntries = results.get(multipliedNumbersTag);
  PAssert.that(multipliedNumbersEntries).containsInAnyOrder(2, 4, 6, 8, 10);
  PCollection<String> originalNumbersEntries = results.get(originalNumbersTags);
  PAssert.that(originalNumbersEntries).containsInAnyOrder("1", "2", "3", "4", "5");
  pipeline.run().waitUntilFinish();
}


enum LifecycleEventsHandler {
  INSTANCE;

  private AtomicInteger setupCalls = new AtomicInteger(0);

  private AtomicInteger startBundleCalls = new AtomicInteger(0);

  private AtomicInteger processingCalls = new AtomicInteger(0);

  private AtomicInteger finishBundleCalls = new AtomicInteger(0);

  private AtomicInteger teardownCalls = new AtomicInteger(0);

  public void addSetup() {
    setupCalls.incrementAndGet();
  }

  public int getSetupCount() {
    return setupCalls.get();
  }

  public void addStartBundle() {
    startBundleCalls.incrementAndGet();
  }

  public int getStartBundleCount() {
    return startBundleCalls.get();
  }

  public void addProcessing() {
    processingCalls.incrementAndGet();
  }

  public int getProcessingCount() {
    return processingCalls.get();
  }

  public void addFinishBundle() {
    finishBundleCalls.incrementAndGet();
  }

  public int getFinishBundleCount() {
    return finishBundleCalls.get();
  }

  public void addTeardown() {
    teardownCalls.incrementAndGet();
  }

  public int getTeardownCount() {
    return teardownCalls.get();
  }
}

enum Counter {
  INSTANCE;

  private AtomicInteger counterConstructorCalls = new AtomicInteger(0);

  private AtomicInteger counterTeardownCalls = new AtomicInteger(0);

  public void incrementConstructorCalls() {
    counterConstructorCalls.incrementAndGet();
  }

  public void incrementTeardownCalls() {
    counterTeardownCalls.incrementAndGet();
  }

  public int getConstructorCalls() {
    return counterConstructorCalls.get();
  }

  public int getTeardownCalls() {
    return counterTeardownCalls.get();
  }
}

Apache Beam defines an universal method to processing data. The user is not limited in any manner. He can freely define the processing logic as ParFn implementations that will be wrapped later by ParDo transformations. However, some specific rules are important to know: no mutable state, possible speculative executions and ordered rules of execution defined through DoFn's annotations.

Comments:

There are no comments for this article.

Write a comment