org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start() explained

Versions: Spark 2.2.0

The error quoted in the title of this post is quite common when you want to copy conception logic from Spark DStream/RDD to Spark structured streaming. This post makes some insight on it.

The first section shows the case producing this error. The second section explains why the exception occurs. Finally, the third section gives the solution.

AnalysisException: Queries with streaming sources must be executed with writeStream.start()

Producing this exception is quite easy. Let's create a streaming with structured streaming and apply the consumption logic of low level APIs (e.g. foreach method):

val sparkSession = SparkSession.builder().appName("Failing source")
  .master("local[*]")
  .getOrCreate()
val rows = sparkSession.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .load()      
  .collect()

Note that the code above is perfectly valid in Spark SQL semantics:

val ordersReader = sparkSession.read.format("jdbc")
  .option("url", InMemoryDatabase.DbConnection)
  .option("driver", InMemoryDatabase.DbDriver)
  .option("dbtable", "orders")
  .option("user", InMemoryDatabase.DbUser)
  .option("password", InMemoryDatabase.DbPassword)
  .load()

import sparkSession.implicits._
val ordersIds: Array[Int] = ordersReader.select("id")
  .map(row => row.getInt(0))
  .collect()
// comes from the post: http://www.waitingforcode.com/apache-spark-sql/loading-data-rdbms/read

However, the first snippet executed in the context of structured streaming gives the following exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
	at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
	at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
	at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
	at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
	at org.apache.spark.sql.execution.QueryExecution.completeString(QueryExecution.scala:219)
	at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:202)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:62)
	at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
	at org.apache.spark.sql.Dataset.foreach(Dataset.scala:2324)

AnalysisException: Queries with streaming sources must be executed with writeStream.start() explained

To understand the problem, we can do several things. In our case, we'll jump between exception's stack trace execution lines and discover what every method is doing.

First of all, the check producing the exception starts in org.apache.spark.sql.execution.QueryExecution#assertSupported(). This method is triggered only when the configuration entry spark.sql.streaming.unsupportedOperationCheck is set to true, what is the case of executed code.

assertSupported() calls utility method UnsupportedOperationChecker#checkForBatch(plan: LogicalPlan) that executes the following function in logic plan (children + parent):

{
  case p if p.isStreaming =>
    throwError("Queries with streaming sources must be executed with writeStream.start()")(p)

  case _ =>
}

As you can see, if one of plan's parts is a streaming, the exception we've met is thrown. The key to understand why is hidden in this famous isStreaming part. This isStreaming is in fact a method checking if any of logical plan's parts has a streaming data source:

def isStreaming: Boolean = children.exists(_.isStreaming == true)

But what is the difference between streaming and batch data sources ? Quite easy - streaming data source, unlike the batch one, returns data in continuous manner. And how Spark can distinguish them ? In fact, isStreaming method is defined in LogicalPlan abstract class and the implementation quoted above is only the default one. The logical plan is implemented by every operation leaf (algebraic or language construct) in Spark SQL, as DISTINCT (org.apache.spark.sql.catalyst.plans.logical.Distinct), JOIN (org.apache.spark.sql.catalyst.plans.logical.Join) or WHERE (org.apache.spark.sql.catalyst.plans.logical.Filter). But it's also implemented by structured streaming parts, as org.apache.spark.sql.execution.streaming.StreamingRelation used to make a link between data source (Kafka in our case) and logical plan. And this short implementation of LogicalPlan looks like:

case class StreamingRelation(dataSource: DataSource, sourceName: String, output: Seq[Attribute])
  extends LeafNode {
  override def isStreaming: Boolean = true
  override def toString: String = sourceName
}

As you see, StreamingRelation overrides isStreaming method and returns true. And because this logical plan implementation is constructed when the DataStreamReader.load() is called, the code written with batch logic can't simply fit to the streaming requirements.

Solving AnalysisException: Queries with streaming sources must be executed with writeStream.start()

Since the problem is our batch-oriented code, nothing more simple to fix it than converting the code to streaming-oriented. In structured streaming Spark programmers introduced the concepts of sources and sinks. The second ones define how the data is consumed. There are several basic sinks, as foreach (data read in foreach loop), console (data printed to the console) or file (data persisted to files).

The sinks are created directly from read stream by calling writeStream() method on loaded DataFrame. It will create an instance of DataStreamWriter class that can be used to consume streamed data just after calling its start() method.

Fixed code consuming data in foreach loop looks like that:

// It's only a dummy implementation showing how to
// use .foreach() in structured streaming
df.writeStream
  .foreach(new ForeachWriter[Row] {

  override def process(row: Row): Unit = {
    println(s"Processing ${row}")
  }

  override def close(errorOrNull: Throwable): Unit = {}

  override def open(partitionId: Long, version: Long): Boolean = {
    true
  }
})
.start()

At first glance the use of structured streaming is not obvious. It differs a little from the DStream-based streaming. The first meaningful difference is that we can't easily consume read data by directly calling actions on source stream. Instead of that we must define the sink to which retrieved data will be put.