Apache Beam pipeline configuration

Versions: Apache Beam 2.2.0 https://github.com/bartosz25/beam-learning

Despite the fact of serverless nature of Apache Beam's popular runners (e.g. Dataflow), the configuration is still an important point. This post, through some of provided runners, tries to shows why.

New ebook 🔥

Learn 84 ways to solve common data engineering problems with cloud services.

👉 I want my Early Access edition

This post explains how Apache Beam's pipelines can be configured. The first part shows some common options. The next one provides the options reserved for Dataflow's runner. The next one is devoted to Apache Spark runner. Finally the last part shows how to configure runners through some learning tests.

Pipeline basic options

The execution pipeline can be configured during its construction. Factory method Pipeline.create(PipelineOptions options) takes the list of options in parameters. Basically, among these options we can retrieve:

Dataflow runner configuration

When Apache Beam is executed with Google Cloud Platform Dataflow runner, it accepts additional options:

Dataflow backlog

Backlog in Dataflow defines the number of messages read from unbounded data source that haven't been processed yet.

Beside the options quoted above, Dataflow runner has a lot of other configuration properties. Most of them are related to the GCP configuration (credentials, zone, network...).

Spark runner configuration

Apache Spark is a data processing framework that was widely presented in the Apache Spark articles. Even if it appears as one of Beam's competitions, it also has its own runner that can be used to execute Beam's jobs in Spark environment. Unlike Dataflow's runner, Spark runner has less available options:

Beam pipeline configuration examples

Below lines present some examples of options shared by all runners:

@Test
public void should_fail_when_the_job_name_is_not_unique_and_stableUniqueNames_is_set_to_error() {
  PipelineOptions options = PipelineOptionsFactory.create();
  options.setJobName("stableUniqueNames set to ERROR");
  options.setStableUniqueNames(PipelineOptions.CheckEnabled.ERROR);
  Pipeline pipeline = Pipeline.create(options);
  PCollection<String> lettersCollection = pipeline.apply(Create.of(Arrays.asList("a", "b", "c", "d")));

  lettersCollection.apply("Counter", Count.globally());
  lettersCollection.apply("Counter", Count.globally());

  assertThatThrownBy(() -> pipeline.run().waitUntilFinish()).isInstanceOf(IllegalStateException.class)
    .hasMessage("Pipeline update will not be possible because the following transforms " +
      "do not have stable unique names: Counter2.");
}

@Test
public void should_not_fail_on_setting_job_name_not_respecting_naming_rules_for_directrunner() {
  PipelineOptions options = PipelineOptionsFactory.create();
  // Only DataflowRunner is constrained by the naming conventions:
  // See org.apache.beam.runners.dataflow.DataflowRunner.fromOptions() where the RegEx validation is defined as
  // jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?")
  options.setJobName("<>..///!...:;:;é'à''çà)");
  // DirectRunner is the default runner so there is no need to define it
  Pipeline pipeline = Pipeline.create(options);
  PCollection<String> lettersCollection = pipeline.apply(Create.of(Arrays.asList("a", "b", "c", "d")));

  lettersCollection.apply("Counter", Count.globally());

  pipeline.run().waitUntilFinish();
}

Apache Beam provides a lot of configuration options. They're defined on 2 categories: basic and runner. The first category groups the properties common for all execution environments, such as job name, runner's name or temporary files location. The second category groups the properties related to particular runners. As we could see, the richest one is Dataflow runner that helps to define the pipeline in much fine-grained way than Spark's runner. The last part shown some of common options in the example of JVM local runner.