Structured streaming


Structured streaming

Project Tungsten, explained in one of previous posts, brought a lot of optimizations - especially in terms of memory use. Until now it was essentially used by Spark SQL and Spark MLib projects. However, since 2.0.0, some work was done to integrate DataFrame/Dataset in streaming processing (Spark Streaming).

This post discusses the main concepts of structured streaming brought with Spark 2.0.0. The first part explain the reasons of the introduction of new streaming processing. The second part describes more in details this new part. The last part shows some simple uses of structured streaming.

Why structured streaming ?

As a lot of Spark evolutions (Spark Project Tungsten is one of them), the ones related to structured streaming come from community experience. As often justified by Spark contributors, streaming based on DStreams was:

  • incosistent - API used to generate batch processing (RDD, Dataset) was different that the API of streaming processing (DStream). Sure, nothing blocker to code but it's always simpler (maintenance cost especially) to deal with at least abstractions as possible.
  • difficult - it was not simple to built streaming pipelines supporting delivery policies: exactly once guarantee, handling data arrival in late or fault tolerance. Sure, all of them were implementable but they needed some extra work from the part of programmers.

As the solution for above points, structured streaming was conceived. At the time of writing (Spark 2.1.0), it's still under experimental status.

Structured streaming details

New streaming processing can be understood as an unbounded table, growing with new incoming data, i.e. can be thought as stream processing built on Spark SQL. Thus, it uses optimization methods coming from Project Tungsten and similar programming model. Thanks to processing changes, structured streaming also reduces the implementation effort related to high availability (fault tolerance, exactly-once processing semantics).

More concretely, structured streaming brought some new concepts to Spark - at least in terms of project's keywords:

  • exactly-once guarantee (semantic) - structured streaming focuses on that concept. It means that data is processed only once and output doesn't contain duplicates.
  • event time - one of observed problems with DStream streaming was processing order, i.e the case when data generated earlier was processed after later generated data. Structured streaming handles this problem with a concept called event time that, under some conditions, allows to correctly aggregate late data in processing pipelines.
  • sink - generally in programming, a sink is a class or a function receiving events from another object or function. Spark uses sinks as a place where results of computations are stored (output sink). In Spark 2.1 we can find following output sinks: file, foreach loop, console or memory (2 last sinks are marked as for debugging purposes).
  • Result Table - represents the place where result of the query applied on input data is sent. Result from Result Table goes directly to output sink.
  • output mode - every regular interval of time some output is generated by streaming process. The output is grouped in concept called output mode and it can be: complete (all data returned to sink), append (only new elements sent to sink) or update (only updated elements outputted to sink).
  • watermark - this process is used to handle data arrived in late. It tracks current event time and defines a delay during which late events have the chance to be aggregated with events arrived on time and sent together to Result Table. It can be thought in terms of events window, closed after defined max, that after closing is returned to Result Table.

To put mentioned improvements in place, structured streaming makes some new assumptions:

  • replayable sources - streaming sources should be able to send again unprocessed data because of processing failures. It means that they need to track reading state. As an example of replayable source we could mention Apache Kafka that stores an offset indicating which messages were already consumed from given partition.
  • idempotent sinks - output sinks should handle duplicate writes, i.e. do not add already saved data.
  • schema - since the streaming is now structured and based on DataFrame/Dataset, it's obvious that data must follow schema. It simplifies a lot of processing and guarantees types safety.

Structured streaming example

Following code shows an example of structure streaming:

val sparkSession = SparkSession.builder().appName("Structured Streaming test")

val csvDirectory = "/tmp/spark-structured-streaming-example"

before {
  val path = Paths.get(csvDirectory)
  val directory = path.toFile
  val files = directory.listFiles
  if (files != null) {
  addFile(s"${csvDirectory}/file_1.csv", "player_1;3\nplayer_2;1\nplayer_3;9")
  addFile(s"${csvDirectory}/file_2.csv", "player_1;3\nplayer_2;4\nplayer_3;5")
  addFile(s"${csvDirectory}/file_3.csv", "player_1;3\nplayer_2;7\nplayer_3;1")

private def addFile(fileName: String, content: String) = {
  val file = new File(fileName)
  val writer = new BufferedWriter(new FileWriter(file))

"CSV file" should "be consumed in Structured Spark Streaming" in {
  val csvEntrySchema = StructType(
    Seq(StructField("player", StringType, false),
    StructField("points", IntegerType, false))

  // maxFilesPerTrigger = number of files - with that all files will be read at once
  // and the accumulator will store aggregated values for only 3 players
  val entriesDataFrame = sparkSession.readStream.option("sep", ";").option("maxFilesPerTrigger", "3")
    .option("basePath", csvDirectory)

  // Below processing first groups rows by field called 'player'
  // and after sum 'points' property of grouped rows
  val summedPointsByPlayer = entriesDataFrame.groupBy("player")

  val accumulator = sparkSession.sparkContext.collectionAccumulator[(String, Long)]
  val query:StreamingQuery = summedPointsByPlayer.writeStream
    .foreach(new ForeachWriter[Row]() {
      // true means that all partitions will be opened
      override def open(partitionId: Long, version: Long): Boolean = true

      override def process(row: Row): Unit = {
        println(s">> Processing ${row}")
        accumulator.add((row.getAs("player").asInstanceOf[String], row.getAs("collectedPoints").asInstanceOf[Long]))

      override def close(errorOrNull: Throwable): Unit = {
        // do nothing

  // Shorter time can made that all files won't be processed

  accumulator.value.size shouldEqual 3
  accumulator.value should contain allOf (("player_1", 9), ("player_2", 12), ("player_3", 15))

case class CsvEntry(player: String, points:Int)

This post introduces the structured streaming. The first part explains the reasons of change. It's justified by the will to standardize data abstraction (Dataset/DataFrame instead of RDD/DStream). The second part shows some points brought by structured streaming, as: exactly-once processing, event time or output modes. The last part presents an example of structured streaming consisting on reading CSV files.

Share on: