Structured streaming

Versions: Spark 2.1.0

Project Tungsten, explained in one of previous posts, brought a lot of optimizations - especially in terms of memory use. Until now it was essentially used by Spark SQL and Spark MLib projects. However, since 2.0.0, some work was done to integrate DataFrame/Dataset in streaming processing (Spark Streaming).

This post discusses the main concepts of structured streaming brought with Spark 2.0.0. The first part explain the reasons of the introduction of new streaming processing. The second part describes more in details this new part. The last part shows some simple uses of structured streaming.

Why structured streaming ?

As a lot of Spark evolutions (Spark Project Tungsten is one of them), the ones related to structured streaming come from community experience. As often justified by Spark contributors, streaming based on DStreams was:

As the solution for above points, structured streaming was conceived. At the time of writing (Spark 2.1.0), it's still under experimental status.

Structured streaming details

New streaming processing can be understood as an unbounded table, growing with new incoming data, i.e. can be thought as stream processing built on Spark SQL. Thus, it uses optimization methods coming from Project Tungsten and similar programming model. Thanks to processing changes, structured streaming also reduces the implementation effort related to high availability (fault tolerance, exactly-once processing semantics).

More concretely, structured streaming brought some new concepts to Spark - at least in terms of project's keywords:

To put mentioned improvements in place, structured streaming makes some new assumptions:

Structured streaming example

Following code shows an example of structure streaming:

val sparkSession = SparkSession.builder().appName("Structured Streaming test")
  .master("local").getOrCreate()

val csvDirectory = "/tmp/spark-structured-streaming-example"

before {
  val path = Paths.get(csvDirectory)
  val directory = path.toFile
  directory.mkdir
  val files = directory.listFiles
  if (files != null) {
    files.foreach(_.delete)
  }
  addFile(s"${csvDirectory}/file_1.csv", "player_1;3\nplayer_2;1\nplayer_3;9")
  addFile(s"${csvDirectory}/file_2.csv", "player_1;3\nplayer_2;4\nplayer_3;5")
  addFile(s"${csvDirectory}/file_3.csv", "player_1;3\nplayer_2;7\nplayer_3;1")
}

private def addFile(fileName: String, content: String) = {
  val file = new File(fileName)
  file.createNewFile
  val writer = new BufferedWriter(new FileWriter(file))
  writer.write(content)
  writer.close()
}

"CSV file" should "be consumed in Structured Spark Streaming" in {
  val csvEntrySchema = StructType(
    Seq(StructField("player", StringType, false),
    StructField("points", IntegerType, false))
  )

  // maxFilesPerTrigger = number of files - with that all files will be read at once
  // and the accumulator will store aggregated values for only 3 players
  val entriesDataFrame = sparkSession.readStream.option("sep", ";").option("maxFilesPerTrigger", "3")
    .option("basePath", csvDirectory)
    .schema(csvEntrySchema)
    .csv(csvDirectory)

  // Below processing first groups rows by field called 'player'
  // and after sum 'points' property of grouped rows
  val summedPointsByPlayer = entriesDataFrame.groupBy("player")
    .agg(sum("points").alias("collectedPoints"))

  val accumulator = sparkSession.sparkContext.collectionAccumulator[(String, Long)]
  val query:StreamingQuery = summedPointsByPlayer.writeStream
    .foreach(new ForeachWriter[Row]() {
      // true means that all partitions will be opened
      override def open(partitionId: Long, version: Long): Boolean = true

      override def process(row: Row): Unit = {
        println(s">> Processing ${row}")
        accumulator.add((row.getAs("player").asInstanceOf[String], row.getAs("collectedPoints").asInstanceOf[Long]))
      }

      override def close(errorOrNull: Throwable): Unit = {
        // do nothing
      }
    })
    .outputMode("complete")
    .start()

  // Shorter time can made that all files won't be processed
  query.awaitTermination(20000)

  accumulator.value.size shouldEqual 3
  accumulator.value should contain allOf (("player_1", 9), ("player_2", 12), ("player_3", 15))
}

case class CsvEntry(player: String, points:Int)

This post introduces the structured streaming. The first part explains the reasons of change. It's justified by the will to standardize data abstraction (Dataset/DataFrame instead of RDD/DStream). The second part shows some points brought by structured streaming, as: exactly-once processing, event time or output modes. The last part presents an example of structured streaming consisting on reading CSV files.