The error quoted in the title of this post is quite common when you want to copy conception logic from Spark DStream/RDD to Spark structured streaming. This post makes some insight on it.
What would it take for you to trust your Databricks pipelines in production?
A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.
Konieczny
The first section shows the case producing this error. The second section explains why the exception occurs. Finally, the third section gives the solution.
AnalysisException: Queries with streaming sources must be executed with writeStream.start()
Producing this exception is quite easy. Let's create a streaming with structured streaming and apply the consumption logic of low level APIs (e.g. foreach method):
val sparkSession = SparkSession.builder().appName("Failing source")
.master("local[*]")
.getOrCreate()
val rows = sparkSession.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.load()
.collect()
Note that the code above is perfectly valid in Spark SQL semantics:
val ordersReader = sparkSession.read.format("jdbc")
.option("url", InMemoryDatabase.DbConnection)
.option("driver", InMemoryDatabase.DbDriver)
.option("dbtable", "orders")
.option("user", InMemoryDatabase.DbUser)
.option("password", InMemoryDatabase.DbPassword)
.load()
import sparkSession.implicits._
val ordersIds: Array[Int] = ordersReader.select("id")
.map(row => row.getInt(0))
.collect()
// comes from the post: http://www.waitingforcode.com/apache-spark-sql/loading-data-rdbms/read
However, the first snippet executed in the context of structured streaming gives the following exception:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();; kafka at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127) at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34) at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63) at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74) at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78) at org.apache.spark.sql.execution.QueryExecution.completeString(QueryExecution.scala:219) at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:202) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:62) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823) at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2324)
AnalysisException: Queries with streaming sources must be executed with writeStream.start() explained
To understand the problem, we can do several things. In our case, we'll jump between exception's stack trace execution lines and discover what every method is doing.
First of all, the check producing the exception starts in org.apache.spark.sql.execution.QueryExecution#assertSupported(). This method is triggered only when the configuration entry spark.sql.streaming.unsupportedOperationCheck is set to true, what is the case of executed code.
assertSupported() calls utility method UnsupportedOperationChecker#checkForBatch(plan: LogicalPlan) that executes the following function in logic plan (children + parent):
{
case p if p.isStreaming =>
throwError("Queries with streaming sources must be executed with writeStream.start()")(p)
case _ =>
}
As you can see, if one of plan's parts is a streaming, the exception we've met is thrown. The key to understand why is hidden in this famous isStreaming part. This isStreaming is in fact a method checking if any of logical plan's parts has a streaming data source:
def isStreaming: Boolean = children.exists(_.isStreaming == true)
But what is the difference between streaming and batch data sources ? Quite easy - streaming data source, unlike the batch one, returns data in continuous manner. And how Spark can distinguish them ? In fact, isStreaming method is defined in LogicalPlan abstract class and the implementation quoted above is only the default one. The logical plan is implemented by every operation leaf (algebraic or language construct) in Spark SQL, as DISTINCT (org.apache.spark.sql.catalyst.plans.logical.Distinct), JOIN (org.apache.spark.sql.catalyst.plans.logical.Join) or WHERE (org.apache.spark.sql.catalyst.plans.logical.Filter). But it's also implemented by structured streaming parts, as org.apache.spark.sql.execution.streaming.StreamingRelation used to make a link between data source (Kafka in our case) and logical plan. And this short implementation of LogicalPlan looks like:
case class StreamingRelation(dataSource: DataSource, sourceName: String, output: Seq[Attribute])
extends LeafNode {
override def isStreaming: Boolean = true
override def toString: String = sourceName
}
As you see, StreamingRelation overrides isStreaming method and returns true. And because this logical plan implementation is constructed when the DataStreamReader.load() is called, the code written with batch logic can't simply fit to the streaming requirements.
Solving AnalysisException: Queries with streaming sources must be executed with writeStream.start()
Since the problem is our batch-oriented code, nothing more simple to fix it than converting the code to streaming-oriented. In structured streaming Spark programmers introduced the concepts of sources and sinks. The second ones define how the data is consumed. There are several basic sinks, as foreach (data read in foreach loop), console (data printed to the console) or file (data persisted to files).
The sinks are created directly from read stream by calling writeStream() method on loaded DataFrame. It will create an instance of DataStreamWriter class that can be used to consume streamed data just after calling its start() method.
Fixed code consuming data in foreach loop looks like that:
// It's only a dummy implementation showing how to
// use .foreach() in structured streaming
df.writeStream
.foreach(new ForeachWriter[Row] {
override def process(row: Row): Unit = {
println(s"Processing ${row}")
}
override def close(errorOrNull: Throwable): Unit = {}
override def open(partitionId: Long, version: Long): Boolean = {
true
}
})
.start()
At first glance the use of structured streaming is not obvious. It differs a little from the DStream-based streaming. The first meaningful difference is that we can't easily consume read data by directly calling actions on source stream. Instead of that we must define the sink to which retrieved data will be put.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I wrote
one on that topic! You can read it online
on the O'Reilly platform,
or get a print copy on Amazon.
I also help solve your data engineering problems contact@waitingforcode.com đź“©
Read also about org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start() explained here:
Related blog posts:
- What's new in Apache Spark 3.0 - Apache Kafka integration improvements
- Apache Kafka source in Structured Streaming - "beyond the offsets"
- Apache Kafka sink in Structured Streaming
- Apache Spark Structured Streaming and Apache Kafka offsets management
- Analyzing Structured Streaming Kafka integration - Kafka source
