Apache Spark 2.4.0 features - foreachBatch

Versions: Apache Spark 2.4.0 https://github.com/bartosz25/spark-...tructuredstreaming/foreachbatchsink

When I first heard about the foreachBatch feature, I thought that it was the implementation of foreachPartition in the Structured Streaming module. However, after some analysis I saw how I was wrong because this new feature addresses other but also important problems. You will find more .

In this new post of Apache Spark 2.4.0 features series, I will show the implementation of foreachBatch method. In the first section, I will shortly describe the main points about this feature. I will also add there some details about the implementation. In the next 2 sections, I will show the problems addressed by the .foreachBatch data sink.

Definition

Prior to 2.4.0 release, the foreach was the single sink where we could put some custom logic. It is quite easy to use because it looks like a foreach loop wrapped in a class. Also, the foreach sink is a good fit for the continuous execution because we focus there on the information brought by one row at a time. However, it's a little bit worse adapted to the micro batch-based pipelines because very often we will want to do something with the whole accumulated micro-batch.

The 2.4.0 release solved these problems of micro-batch processing with the new org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink sink. Its main idea is straightforward. The engine accumulates the data processed in the given micro batch and passes it into the sink as a Dataset. That means not only that you can apply one logic to the whole data but also that you can do some pure batch processing inside the streaming pipeline, as for instance write the data into a not streamable data store.

Aside from the materialized Dataset, foreachBatch consumer method also accepts a property called batchId. This argument contains the id of the micro-batch which generated the Dataset. You can use this property to implement exactly-once delivery semantic because by default, the engine runs with at-least-once semantic.

Finally, since the ForeachBatchSink addresses micro-batches issues, you can't use it with continuous triggers. A simple check inside the DataStreamWriter shows that:

    } else if (source == "foreachBatch") {
      assertNotPartitioned("foreachBatch")
      if (trigger.isInstanceOf[ContinuousTrigger]) {
        throw new AnalysisException("'foreachBatch' is not supported with continuous trigger")
      }

You can also see that the sink doesn't support the partitioned pipelines (.partitionBy(...)). If you're interested in the task which introduced this logic, you'll find a link in the "Read also" section.

Use case: not streamable sink

I've already mentioned the first use case of foreachBatch in the previous section. This new sink is useful when you want to save the processed data into a not streamable sink like a relational database or a key-value store. For the sake of simplicity, I'll use an in-memory singleton key-value store:

  "foreachBatch" should "save the data into a key-value memory store" in {
    val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
    inputStream.addData(1, 2, 3, 4)
    new Thread(new Runnable() {
      override def run(): Unit = {
        while (true) {
          inputStream.addData(1, 2, 3, 4)
          Thread.sleep(1000L)
        }
      }
    }).start()
    val stream = inputStream.toDS().toDF("number")

    val query = stream.writeStream.trigger(Trigger.ProcessingTime(2000L))
      .foreachBatch((dataset, batchId) => {
        dataset.foreachPartition(rows => {
          rows.foreach(row => {
            InMemoryKeyedStore.addValue(s"batch_${batchId}_${row.getAs[Int]("number")}", "")
          })
        })
      })
      .start()

    query.awaitTermination(20000L)

    // According to the defined timeout, we should have at least 10 processed batches
    val distinctKeys = InMemoryKeyedStore.allValues.keys.map(key => key.dropRight(2)).toSeq.distinct
    distinctKeys should contain atLeastOneElementOf(Seq("batch_0", "batch_1", "batch_2", "batch_3", "batch_4", "batch_5",
    "batch_6", "batch_7", "batch_8", "batch_9"))
  }

Of course, you should take care about the delivery semantics. If your sink is a key-based one and your processing always generates the same data, you're safe. Even if you need to reprocess the records, you'll always end up with the same results in the store. If it's not the case, you should add an extra step to deduplicate the data before writing it.

Use case: multiple sinks from a single source (aka side output)

The second use case is very similar to side output in Apache Beam because foreachBatch gives the possibility to write the micro-batch into different places. You must only remember to cache the batch Dataset inside the sink. Otherwise, you risk to recompute it entirely. The next snippet gives an example of side output implementation with Apache Spark foreachBatch sink:

  "foreachBatch" should "generate 2 outputs" in {
    val inputStream = new MemoryStream[Int](1, sparkSession.sqlContext)
    inputStream.addData(1, 2, 3, 4)
    new Thread(new Runnable() {
      override def run(): Unit = {
        while (true) {
          inputStream.addData(1, 2, 3, 4)
          Thread.sleep(1000L)
        }
      }
    }).start()
    val stream = inputStream.toDS().toDF("number")

    val query = stream.writeStream.trigger(Trigger.ProcessingTime(2000L))
      .foreachBatch((dataset, batchId) => {
        dataset.persist()
        dataset.write.mode(SaveMode.Overwrite).json(s"/tmp/spark/side-output/json/batch_${batchId}")
        dataset.write.mode(SaveMode.Overwrite).parquet(s"/tmp/spark/side-output/parquet/batch_${batchId}")
        dataset.unpersist()
      })
      .start()

    query.awaitTermination(20000L)

    def jsonBatch(batchNr: Int) = s"/tmp/spark/side-output/json/batch_${batchNr}"
    val jsonFiles = FileUtils.getFile(new File("/tmp/spark/side-output/json")).listFiles().toSeq.map(file => file.getAbsolutePath)
    jsonFiles should contain allElementsOf (Seq(jsonBatch(0), jsonBatch(1), jsonBatch(2), jsonBatch(3),
      jsonBatch(4), jsonBatch(5), jsonBatch(6), jsonBatch(7), jsonBatch(8)))
    def parquetBatch(batchNr: Int) = s"/tmp/spark/side-output/parquet/batch_${batchNr}"
    val parquetFiles = FileUtils.getFile(new File("/tmp/spark/side-output/parquet")).listFiles.toSeq.map(file => file.getAbsolutePath)
    parquetFiles should contain allElementsOf(Seq(parquetBatch(0), parquetBatch(1), parquetBatch(2), parquetBatch(3),
      parquetBatch(4), parquetBatch(5), parquetBatch(6), parquetBatch(7), parquetBatch(8)))
  }

The code looks similar to the code from the previous section. It also writes the data into a not streamable sink. The difference is that the same output is used to write the data into 2 different places, in 2 different formats. It can be useful if you want to prepare the data for multiple workflows. In my case, I generated JSON files for the transactional and Parquet ones for analytical purposes.

According to the official documentation, it's important to cache and unpersist the batch Dataset in order to ensure that the engine won't recompute it every time.

foreachBatch sink was a missing piece in the Structured Streaming module. This feature added in 2.4.0 release is a bridge between streaming and batch worlds. As shown in this post, it facilitates the integration of streaming data into batch parts of our pipelines. Instead of creating "batches" manually, now Apache Spark does it for us and exposes the access to the whole Dataset API inside the foreachBatch sink.