Project Tungsten, explained in one of previous posts, brought a lot of optimizations - especially in terms of memory use. Until now it was essentially used by Spark SQL and Spark MLib projects. However, since 2.0.0, some work was done to integrate DataFrame/Dataset in streaming processing (Spark Streaming).
Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in 👉
Early Release on the O'Reilly platform
I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩
This post discusses the main concepts of structured streaming brought with Spark 2.0.0. The first part explain the reasons of the introduction of new streaming processing. The second part describes more in details this new part. The last part shows some simple uses of structured streaming.
Why structured streaming ?
As a lot of Spark evolutions (Spark Project Tungsten is one of them), the ones related to structured streaming come from community experience. As often justified by Spark contributors, streaming based on DStreams was:
- incosistent - API used to generate batch processing (RDD, Dataset) was different that the API of streaming processing (DStream). Sure, nothing blocker to code but it's always simpler (maintenance cost especially) to deal with at least abstractions as possible.
- difficult - it was not simple to built streaming pipelines supporting delivery policies: exactly once guarantee, handling data arrival in late or fault tolerance. Sure, all of them were implementable but they needed some extra work from the part of programmers.
As the solution for above points, structured streaming was conceived. At the time of writing (Spark 2.1.0), it's still under experimental status.
Structured streaming details
New streaming processing can be understood as an unbounded table, growing with new incoming data, i.e. can be thought as stream processing built on Spark SQL. Thus, it uses optimization methods coming from Project Tungsten and similar programming model. Thanks to processing changes, structured streaming also reduces the implementation effort related to high availability (fault tolerance, exactly-once processing semantics).
More concretely, structured streaming brought some new concepts to Spark - at least in terms of project's keywords:
- exactly-once guarantee (semantic) - structured streaming focuses on that concept. It means that data is processed only once and output doesn't contain duplicates.
- event time - one of observed problems with DStream streaming was processing order, i.e the case when data generated earlier was processed after later generated data. Structured streaming handles this problem with a concept called event time that, under some conditions, allows to correctly aggregate late data in processing pipelines.
- sink - generally in programming, a sink is a class or a function receiving events from another object or function. Spark uses sinks as a place where results of computations are stored (output sink). In Spark 2.1 we can find following output sinks: file, foreach loop, console or memory (2 last sinks are marked as for debugging purposes).
- Result Table - represents the place where result of the query applied on input data is sent. Result from Result Table goes directly to output sink.
- output mode - every regular interval of time some output is generated by streaming process. The output is grouped in concept called output mode and it can be: complete (all data returned to sink), append (only new elements sent to sink) or update (only updated elements outputted to sink).
- watermark - this process is used to handle data arrived in late. It tracks current event time and defines a delay during which late events have the chance to be aggregated with events arrived on time and sent together to Result Table. It can be thought in terms of events window, closed after defined max, that after closing is returned to Result Table.
To put mentioned improvements in place, structured streaming makes some new assumptions:
- replayable sources - streaming sources should be able to send again unprocessed data because of processing failures. It means that they need to track reading state. As an example of replayable source we could mention Apache Kafka that stores an offset indicating which messages were already consumed from given partition.
- idempotent sinks - output sinks should handle duplicate writes, i.e. do not add already saved data.
- schema - since the streaming is now structured and based on DataFrame/Dataset, it's obvious that data must follow schema. It simplifies a lot of processing and guarantees types safety.
Structured streaming example
Following code shows an example of structure streaming:
val sparkSession = SparkSession.builder().appName("Structured Streaming test") .master("local").getOrCreate() val csvDirectory = "/tmp/spark-structured-streaming-example" before { val path = Paths.get(csvDirectory) val directory = path.toFile directory.mkdir val files = directory.listFiles if (files != null) { files.foreach(_.delete) } addFile(s"${csvDirectory}/file_1.csv", "player_1;3\nplayer_2;1\nplayer_3;9") addFile(s"${csvDirectory}/file_2.csv", "player_1;3\nplayer_2;4\nplayer_3;5") addFile(s"${csvDirectory}/file_3.csv", "player_1;3\nplayer_2;7\nplayer_3;1") } private def addFile(fileName: String, content: String) = { val file = new File(fileName) file.createNewFile val writer = new BufferedWriter(new FileWriter(file)) writer.write(content) writer.close() } "CSV file" should "be consumed in Structured Spark Streaming" in { val csvEntrySchema = StructType( Seq(StructField("player", StringType, false), StructField("points", IntegerType, false)) ) // maxFilesPerTrigger = number of files - with that all files will be read at once // and the accumulator will store aggregated values for only 3 players val entriesDataFrame = sparkSession.readStream.option("sep", ";").option("maxFilesPerTrigger", "3") .option("basePath", csvDirectory) .schema(csvEntrySchema) .csv(csvDirectory) // Below processing first groups rows by field called 'player' // and after sum 'points' property of grouped rows val summedPointsByPlayer = entriesDataFrame.groupBy("player") .agg(sum("points").alias("collectedPoints")) val accumulator = sparkSession.sparkContext.collectionAccumulator[(String, Long)] val query:StreamingQuery = summedPointsByPlayer.writeStream .foreach(new ForeachWriter[Row]() { // true means that all partitions will be opened override def open(partitionId: Long, version: Long): Boolean = true override def process(row: Row): Unit = { println(s">> Processing ${row}") accumulator.add((row.getAs("player").asInstanceOf[String], row.getAs("collectedPoints").asInstanceOf[Long])) } override def close(errorOrNull: Throwable): Unit = { // do nothing } }) .outputMode("complete") .start() // Shorter time can made that all files won't be processed query.awaitTermination(20000) accumulator.value.size shouldEqual 3 accumulator.value should contain allOf (("player_1", 9), ("player_2", 12), ("player_3", 15)) } case class CsvEntry(player: String, points:Int)
This post introduces the structured streaming. The first part explains the reasons of change. It's justified by the will to standardize data abstraction (Dataset/DataFrame instead of RDD/DStream). The second part shows some points brought by structured streaming, as: exactly-once processing, event time or output modes. The last part presents an example of structured streaming consisting on reading CSV files.
Consulting

With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions.
As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and
drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!
👉 contact@waitingforcode.com
đź”— past projects