Big Data patterns implemented - processing abstraction

Versions: Apache Spark 2.4.2

Do you imagine a world where everybody speaks the same language? It's difficult. Fortunately, it's much easier to do in data engineering where a single API can apply to batch and streaming processing.

In this post I will cover the processing abstraction pattern. In the following 2 parts, you will see how to process an Apache Kafka topic in batch and streaming using Apache Spark's Dataset API.

Pattern definition

The main purpose of the processing abstraction pattern is to provide a single way to process data. If it's possible you don't need to learn different APIs to work with streaming and batch data. And that was one of the problems of early data processing frameworks.

At the time when Apache Spark didn't exist, Apache Hadoop's MapReduce was used to process big volumes of batch data. And it did it pretty well. However, it was not adapted for streaming pipelines because of too big I/O overhead. Other frameworks like Apache Storm were born but they required to learn another data processing abstraction.

Fortunately, this requirement disappeared with the modern data processing frameworks like Apache Spark, Apache Beam and Apache Flink where a single API can be used to process both streaming and batch data.

The processing abstraction pattern can also apply to the data sources. Some of them are universal enough to be processed in batch or streaming manner. One of them is Apache Kafka that may be consumed continuously or at static regular interval. The following 2 parts will show how to mix both of these points.

Apache Kafka and Apache Spark in batch

To execute the following tests, you will need to follow the instructions included in data generator Kafka sink example. After executing the broker and data generator, you will need to launch the batch processing logic:

    val allRows = sparkSession.read.format("kafka")
      .option("kafka.bootstrap.servers", kafkaBroker)
      .option("client.id", s"sessionization-demo-streaming")
      .option("subscribe", topicName)
      .load()
      .map(row => row.getAs[Long]("offset"))
      .count()

    allRows should be > 0L

Apache Kafka and Apache Spark in streaming

Processing Kafka data in streaming is very similar to the batch:

    val query = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaBroker)
      .option("client.id", s"sessionization-demo-streaming")
      .option("subscribe", topicName)
      .load()
      .map(row => row.getAs[String]("topic"))
      .groupBy("value").count()


    query.writeStream.outputMode(OutputMode.Complete()).option("checkpointLocation", "/tmp/test-kafka")
            .format("console")
      .start().awaitTermination(30000L)

I agree, the API is slightly more verbose but it's mainly because of the execution context configuration (timeout, different sink). But globally the processing logic can be expressed with the same data abstraction which for this snippet is a Row object.

At the data source level there are some more important differences. Streaming-based version can automatically manage the processed offsets thanks to the checkpointing mechanism. The same is not true for the batch version where the offsets must be managed separately since the batch doesn't implement the checkpointing. That's also the reason why, if you execute the batch code multiple times on not changing Kafka topic, you'll always get the same results. Otherwise, it should remember the last processed position and return 0 for the 2nd execution.

Processing abstraction is an interesting pattern that applies not only on the API but also on the data source level. As shown in this post, you can use the same DataFrame abstraction to work with Apache Kafka in batch and streaming way. Even though there are still some subtle differences like offsets management, it's an interesting way to use the same framework and data source and to adapt them to different latency use cases.