GCP Dataflow by an Apache Spark guy

Some months ago I wrote a blog post where I presented BigQuery from a perspective of an Apache Spark user. Today I will do the same exercise but applied to the same category of data processing frameworks. In other words, I will try to understand GCP Dataflow thanks to my Apache Spark knowledge!

Of course, Dataflow is not fully the same category of tools like Apache Spark. It's a managed data processing service and hence, a data processing framework. Regarding Apache Spark, in its Open Source version, it's a data processing framework. Nonetheless, both share a lot of similar concepts that I will try to highlight just below. To start, let's compare their APIs.

API

You will be maybe surprised, but currently there is no something like a Dataflow API. The GCP service uses Apache Beam API with its dedicated DataflowRunner. Put another way, you define your processing logic with Apache Beam and Dataflow service implements missing pieces like workload distribution and also provides some extra components like more efficient IO connectors for GCP services).

Let's compare now the code that will filter some input, group it by a key and map the groups to something else before writing it to files. In Apache Spark Scala API, the code will look like that:

  val sparkSession = SparkSession.builder()
    .appName("Apache Spark vs Apache Beam API").master("local[*]")
    .config("spark.sql.shuffle.partitions", 2)
    .getOrCreate()
  import sparkSession.implicits._

  val inputNumbers = (0 to 11).toDF("nr")

  val groupedNumbersGreaterThan3 = inputNumbers.filter("nr > 3").as[Int]
    .groupByKey(nr => nr % 2 == 0)

  val labelsWithSums = groupedNumbersGreaterThan3.mapGroups((evenOddFlag, numbers) => {
      val label = if (evenOddFlag) "even"  else "odd"
      (label, numbers.sum)
    })
    .withColumnRenamed("_1", "label")
    .withColumnRenamed("_2", "sum")

  labelsWithSums.write.mode("overwrite").json("/tmp/spark-vs-beam/spark")

To stay with strongly-typed languages, in Apache Beam Java API it will be:

        PipelineOptions options = PipelineOptionsFactory.create();
        options.setJobName("Apache Spark vs Apache Beam API");
        Pipeline pipeline = Pipeline.create(options);
        PCollection<Integer> inputNumbers = pipeline.apply(Create.of(
                Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)));

        PCollection<Integer> numbersGreaterThan3 = inputNumbers.apply("is_greater_than_3", Filter.greaterThan(3));

        PCollection<KV<Boolean, Integer>> evenOddGroups = numbersGreaterThan3.apply("add_even_odd_flag",
                WithKeys.of(new SerializableFunction<Integer, Boolean>() {
                    @Override
                    public Boolean apply(Integer input) {
                        return input % 2 == 0;
                    }
                })
        );

        PCollection<KV<Boolean, Iterable<Integer>>> evenOddNumbersGroups = evenOddGroups
                .apply("group_by_even_or_odd", GroupByKey.create());

        PCollection<String> evenOddsWithSums = evenOddNumbersGroups.apply("sum_numbers",
                MapElements.via(new SimpleFunction<KV<Boolean, Iterable<Integer>>, LabelWithSum>() {
                    @Override
                    public LabelWithSum apply(KV<Boolean, Iterable<Integer>> input) {
                        String label = "even";
                        if (!input.getKey()) {
                            label = "odd";
                        }
                        int sum = 0;
                        Iterator<Integer> numbersIterator = input.getValue().iterator();
                        while (numbersIterator.hasNext()) {
                            sum += numbersIterator.next();
                        }
                        return new LabelWithSum(label, sum);
                    }
                }))
                .apply("convert_to_json", AsJsons.of(LabelWithSum.class));

        evenOddsWithSums.apply("write_as_json", TextIO.write().withSuffix(".json")
                .withNumShards(2)
                .to("/tmp/spark-vs-beam/beam"));

        pipeline.run().waitUntilFinish();

Completely agree, Scala and Java are not the same but instead of focusing on the verbosity, let's analyze the building blocks. To deal with input and output, Apache Beam uses the concept of IO writers or readers. In Apache Spark, you will instead find data sources and data sinks. Regarding the transformations, in Apache Beam, you can see a lot of apply(...) methods that are used as well for the mapping, filtering, or grouping. Even though you will find a class indicating the type of the operation inside them, this apply() can be confusing for an Apache Spark user. After all, in Apache Spark, every operation is categorized in the API; i.e, filter is a filter function, map is a map function, grouping is a groupByKey function, etc. However, there is a nice thing about this whole apply(...). One of its available implementations is defining the operation label, which is very useful if you try to understand what is happening with the pipeline. And finally, you can also notice the explicit definition of the conversion from a Java class to string before writing the JSON data in Apache Beam. It's not required in Apache Spark where the framework can implicitly manage the serialization.

Even though both code snippets are different, both share a similar composition logic. And what about other aspects, like auto-scaling?

Auto-scaling

GCP Dataflow, a little bit like other services of that type like Databricks, comes with the native support for auto-scaling. It means that when you run your pipeline, you can define the min and max number of workers that will be processing your data. The exact number of running workers will change depending on various factors. For streaming applications based on the Streaming Engine feature, the scaling action relies on the CPU and system lag, i.e., how fast the current configuration processes the coming data. To know the data processing progress, Apache Beam API exposes 2 methods in the UnboundedReader:


    /**
     * Returns the size of the backlog of unread data in the underlying data source represented by
     * this split of this source.
     *
     * 

One of this or {@link #getTotalBacklogBytes} should be overridden in order to allow the * runner to scale the amount of resources allocated to the pipeline. */ public long getSplitBacklogBytes() { return BACKLOG_UNKNOWN; } /** * Returns the size of the backlog of unread data in the underlying data source represented by * all splits of this source. * *

One of this or {@link #getSplitBacklogBytes} should be overridden in order to allow the * runner to scale the amount of resources allocated to the pipeline. */ public long getTotalBacklogBytes() { return BACKLOG_UNKNOWN; }

Autoscaling for batch also has its own API indicating the estimated size of the input, used to define the starting number of workers, and the already processed part:

  public abstract long getEstimatedSizeBytes(PipelineOptions options) throws Exception;

    public @Nullable Double getFractionConsumed() {
      return null;
    }

In Apache Spark, the auto-scaling feature is called Dynamic Resource Allocation and it relies on the number of pending tasks. And it's more a configuration-based approach with spark.dynamicAllocation.schedulerBacklogTimeout and spark.dynamicAllocation.executorIdleTimeout parameters. Whenever one of them is reached, Apache Spark will ask cluster manager either to add new executors, or to remove the ones which are not active anymore.

Shuffle service

A component related to the auto-scaling feature is shuffle service. In Dataflow, you will find a mention of it in the documentation about batch pipelines scaling:

Note: For pipelines not using Dataflow Shuffle, Dataflow might not be able to scale down the workers effectively because the workers may have shuffled data stored in local disks.

Dataflow Shuffle is Apache Spark's External Shuffle Service, so the place where the framework stores the intermediary files for the operations involving data redistribution, like group by key. Thanks to this type of service, you can use smaller worker machines, not requiring big disks. It's also helpful for a more responsive scaling since the framework can take any node off from the cluster without worrying about generated intermediary files.

Stragglers management

In addition to the scaling features, Dataflow also comes with the possibility to dynamically rebalance the workload. Sounds mysterious? Let's imagine the case where one processed partition (bundles in Dataflow) is skewed and the task responsible for it takes more than the others. In other words, this task is a straggler. In Dataflow, a worker can steal the job initially reserved to another! The feature is natively available from Apache Beam's BoundedReader with the splitAtFraction method detecting what data is left for processing and returning a new reader for it :

  public abstract static class BoundedReader<T> extends Source.Reader<T> {
    public @Nullable BoundedSource<T> splitAtFraction(double fraction) {
      return null;
    }

Apache Spark doesn't have a similar stragglers management strategy, but it has various components that can help to prevent them at runtime. One of them is the Adaptive Query Execution engine, which adapts the execution plan to the processed data. Another one is speculative task execution that can help if the straggled task has some node-specific issues preventing it from moving on.

All the points presented so far are mainly about the hardware and cluster. But what about framework features? One of my favorites is the arbitrary stateful processing, described just below!

Arbitrary stateful processing

You certainly noticed that the native Apache Spark API has a more functional flavor than Apache Beam's API. It also has an impact on defining some operations like the arbitrary stateful processing. In Apache Spark Structured Streaming, it's expressed as:

// ...
      .groupByKey(row => row.getAs[Long]("user_id"))
      .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(Mapping.mapStreamingLogsToSessions(sessionTimeout))

In Apache Beam, the arbitrary stateful processing uses a procedure instead of the mapping function:

    pipeline.apply(Create.of(clubsPerCountry))
      .apply(ParDo.of(new DoFn<KV<String, String>, String>() {
        private static final String ACCUMULATOR_NAME = "accumulator";

        @StateId(ACCUMULATOR_NAME)
        private final StateSpec<BagState<String>> accumulatorStateSpec = StateSpecs.bag();

        @ProcessElement
        public void processElement(ProcessContext processContext,
                                    @StateId(ACCUMULATOR_NAME) BagState<String> clubsAccumulator) {
          clubsAccumulator.add(processContext.element().getValue());
          String clubs = Joiner.on("-").join(clubsAccumulator.read());
          StringsAccumulator.CLUBS.add(clubs);
          processContext.output(clubs);
        }
      }));

In this category, we could also include a conceptual difference for the sessionization pipelines. As you saw previously, Apache Spark uses a groupByKey expression to bring all events related to an entity together. Apache Beam relies on the concept of window session working on top of a key-value PCollection:

  PCollection<KV<String, String>> timestampedLetters = pipeline.apply(Create.timestamped(Arrays.asList(
    TimestampedValue.of(KV.of("a", "a1"), new Instant(1)),
    TimestampedValue.of(KV.of("a", "a2"), new Instant(1)),
// ...
);
  PCollection<KV<String, String>> windowedLetters = timestampedLetters
    .apply(Window.into(Sessions.withGapDuration(new Duration(2))));

Compute environment

Finally, regarding the compute environment, Apache Spark has a lot of pluggable resource managers, like historically YARN or Mesos, and more recently Kubernetes. For Dataflow, it's not easy to find the information about the underlying infrastructure, but if one day you have issues with running a job, you will probably get errors like:

Error syncing pod 112g0d123451663a6216bc0186372bx99 ("process-gcs-avro-xfnq11-harness-1s3h_default (112g0d123451663a6216bc0186372bx99)"), skipping: failed to "StartContainer" for "java-batch" with CrashLoopBackOff: 
"back-off 10s restarting failed
container=java-batch pod=process-gcs-avro-xfnq11-harness-1s3h_default  (112g0d123451663a6216bc0186372bx99)"

It makes me think that Dataflow runs as Docker containers on Kubernetes because of the keywords like "pod", "container". Moreover, if you analyze the logs of your job in Logs Explorer, you'll find log names for dataflow.googleapis.com/kubelet and knowing that Kubelet is a Kubernetes component, we can deduce that Kubernetes is the runtime. However, I didn't find any mention in the documentation or Dataflow pods in GKE; maybe it's a wrong assumption. Anyway, I will be happy to learn more if you have some information!

Apache Spark and Dataflow are 2 different projects, with different APIs and origins, but both solve the same problem of distributed data processing. From the article, I hope that despite the initial differences, they also have some important similarities.