Apache Beam pipeline configuration

on waitingforcode.com

Apache Beam pipeline configuration

Despite the fact of serverless nature of Apache Beam's popular runners (e.g. Dataflow), the configuration is still an important point. This post, through some of provided runners, tries to shows why.

This post explains how Apache Beam's pipelines can be configured. The first part shows some common options. The next one provides the options reserved for Dataflow's runner. The next one is devoted to Apache Spark runner. Finally the last part shows how to configure runners through some learning tests.

Pipeline basic options

The execution pipeline can be configured during its construction. Factory method Pipeline.create(PipelineOptions options) takes the list of options in parameters. Basically, among these options we can retrieve:

  • jobName - specifies the name of the pipeline. The name must respect the following RegEx: [a-z]([-a-z0-9]{0,38}[a-z0-9])?. The validation applies however only for certain runners (e.g. Dataflow).
  • runner - defines the runner used to execute the pipeline. If the runner is registered, only the class name can be provided. Otherwise the fully qualified name is expected.
  • stableUniqueNames - one of 3 values can be specified here: OFF, WARNING or ERROR. If the second or the third value is chosen, Beam will check if transform names are unique. If it's not the case, it will either print a warning message or will throw an error.
  • tempLocation - represents the place where pipeline's temporary files will be stored. It can be any file system.

Dataflow runner configuration

When Apache Beam is executed with Google Cloud Platform Dataflow runner, it accepts additional options:

  • project - Google Cloud Platform (GCP) project id. GCP project is like a container storing all resources (billing, authentication, monitoring etc.) related to logical unit of processing.
  • stagingLocation - the staging place in Google Cloud Storage for local files (e.g. resources loaded from the classpath). It must be a GCS bucket.
  • templateLocation - the place where pipeline's template files will be stored. Thanks to the templates a new job can be created from already defined models instead of compiling and deploying the code each time.
  • update - a boolean flag telling if the submitted job has to update already running job with the same name. This feature can be useful when we want to change processing logic in the code (e.g. fix bugs). The changes will take effect for new data, i.e. already buffered data will still be processed with the previously defined pipeline.
  • autoscalingAlgorithm - maybe the most important property regarding to the horizontal scaling ability. It defines one of 3 available auto scaling algorithms:
    • NONE - no auto scaling will be used. Dataflow will use the number of workers specified in numWorkers property.
    • BASIC - this strategy is deprecated. Only for information, it scales up to the number defined in maxNumWorkers.
    • THROUGHPUT_BASED - this algorithm varies the number of workers between numWorkers and maxNumWorkers. For the batch processing, Dataflow chooses the required number of workers to execute given job. For the streaming processing the engine changes the number of workers according to the CPU utilization, throughput and backlog.
  • numWorkers - as the name indicates, this property sets the number of workers used to execute the pipeline. However, if any auto scaling algorithm is defined, numWorkers value will be affected. If this property is left empty, Dataflow will be in charge of determine the number of workers.
  • maxNumWorkers - can be used to limit the number of workers for given job. If empty, Dataflow will figure out the right number.
  • diskSizeGb - the disk size for each worker.
  • workerDiskType - worker disk type. It must be one of values defined in GCP Compute Engine.
  • workerMachineType - worker machine type. It must be one of values defined in GCP Compute Engine.
  • defaultWorkerLogLevel - defines the default level for the logs. It can be overridden for specific packages/classes through the option workerLogLevelOverrides.
  • streaming - a boolean flat telling if the pipeline is about streaming (true) or batch (false) processing. If the pipeline deals with unbounded data, this property is automatically set to true.

Dataflow backlog

Backlog in Dataflow defines the number of messages read from unbounded data source that haven't been processed yet.

Beside the options quoted above, Dataflow runner has a lot of other configuration properties. Most of them are related to the GCP configuration (credentials, zone, network...).

Spark runner configuration

Apache Spark is a data processing framework that was widely presented in the Apache Spark articles. Even if it appears as one of Beam's competitions, it also has its own runner that can be used to execute Beam's jobs in Spark environment. Unlike Dataflow's runner, Spark runner has less available options:

  • batchIntervalMillis - the micro-batches interval for streaming operations. Other properties impacting streaming processing are maxRecordsPerBatch and minReadTimeMillis. The first one defines the max number of records read in a micro-batch. The latter one specifies the minimum reading time for a micro-batch.
  • checkpointDir - the place where checkpoint will be saved. The frequency of checkpointing is defined through checkpointDurationMillis property.
  • sparkMaster - specifies the Spark master's location. In the post Deployment modes and master URLs in Spark you can learn more about this topic.
  • storageLevel - defines the storage level.

Beam pipeline configuration examples

Below lines present some examples of options shared by all runners:

@Test
public void should_fail_when_the_job_name_is_not_unique_and_stableUniqueNames_is_set_to_error() {
  PipelineOptions options = PipelineOptionsFactory.create();
  options.setJobName("stableUniqueNames set to ERROR");
  options.setStableUniqueNames(PipelineOptions.CheckEnabled.ERROR);
  Pipeline pipeline = Pipeline.create(options);
  PCollection<String> lettersCollection = pipeline.apply(Create.of(Arrays.asList("a", "b", "c", "d")));

  lettersCollection.apply("Counter", Count.globally());
  lettersCollection.apply("Counter", Count.globally());

  assertThatThrownBy(() -> pipeline.run().waitUntilFinish()).isInstanceOf(IllegalStateException.class)
    .hasMessage("Pipeline update will not be possible because the following transforms " +
      "do not have stable unique names: Counter2.");
}

@Test
public void should_not_fail_on_setting_job_name_not_respecting_naming_rules_for_directrunner() {
  PipelineOptions options = PipelineOptionsFactory.create();
  // Only DataflowRunner is constrained by the naming conventions:
  // See org.apache.beam.runners.dataflow.DataflowRunner.fromOptions() where the RegEx validation is defined as
  // jobName.matches("[a-z]([-a-z0-9]*[a-z0-9])?")
  options.setJobName("<>..///!...:;:;é'à''çà)");
  // DirectRunner is the default runner so there is no need to define it
  Pipeline pipeline = Pipeline.create(options);
  PCollection<String> lettersCollection = pipeline.apply(Create.of(Arrays.asList("a", "b", "c", "d")));

  lettersCollection.apply("Counter", Count.globally());

  pipeline.run().waitUntilFinish();
}

Apache Beam provides a lot of configuration options. They're defined on 2 categories: basic and runner. The first category groups the properties common for all execution environments, such as job name, runner's name or temporary files location. The second category groups the properties related to particular runners. As we could see, the richest one is Dataflow runner that helps to define the pipeline in much fine-grained way than Spark's runner. The last part shown some of common options in the example of JVM local runner.

Read also about Apache Beam pipeline configuration here: Specifying Pipeline Execution Parameters , Dataflow Frequently Asked Questions .

Share, like or comment this post on Twitter: