SparkException: org.apache. spark. streaming. dstream. MappedDStream@7a388990 has not been initialized

Versions: Spark 2.1.0

Metadata checkpoint is useful in quickly restoring failing jobs. However, it won't work if the context creation and processing parts aren't declared correctly.

This post makes insight on bug that could be very frustrating because it tells only that the DStream hasn't been initialized. Unlike usual, the first part of this post shows when this error occurs. The second section explains why the problem has place by analyzing the implementation details. The last part, through simple test case, shows how to fix the error.

Reproducing "MappedDStream has not been initiaized" exception

Below code shows how to reproduce the exception of DStream initialization. As you can observe, the execute() method defines DStream processing outside of context creation method:

@Test
public void should_fail_on_restoring_context_from_checkpoint_because_of_dstream_declaration_out_of_context_scope()
        throws InterruptedException {
  execute(true);
  Thread.sleep(1000);
  execute(false);
}

private void execute(boolean shouldPass) {
    JavaStreamingContext streamingContext =
            JavaStreamingContext.getOrCreate(DATA_CHECKPOINT_DIR, () -> createStreamingContext());
  try {
    JavaDStream<String> temporarySparkFilesDStream =
      streamingContext.textFileStream("./resources/files/numbers.txt");
    // This method fails when the context is read from checkpoint and
    // DStream is created apart
    temporarySparkFilesDStream.foreachRDD(rdd -> {});
    streamingContext.start();
    streamingContext.awaitTerminationOrTimeout(BATCH_INTERVAL * 2);
    if (!shouldPass) {
      fail("Test should fail because of DStream declared after reading context from the checkpoint");
    }
  } catch (Throwable throwable) { 
    assertThat(throwable.getMessage()).contains("MappedDStream", "has not been initialized");
  } finally {
    streamingContext.stop(STOP_SPARK_CONTEXT);
  }
}

private static JavaStreamingContext createStreamingContext() {
  JavaSparkContext batchContext = new JavaSparkContext(CONFIGURATION);
  JavaStreamingContext streamingContext =
    new JavaStreamingContext(batchContext, Durations.milliseconds(BATCH_INTERVAL));
  streamingContext.checkpoint(DATA_CHECKPOINT_DIR);
  return streamingContext;
}

Explaining "MappedDStream has not been initiaized" exception

The execution of should_fail_on_restoring_context_from_checkpoint_because_of_dstream_declaration_out_of_context_scope test produces following exception:

org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MappedDStream@7aa9e414 has not been initialized
  at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:312)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
  at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
  at scala.Option.orElse(Option.scala:289)
  at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
  at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
  at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
// ...

To see exactly what happened, let's dig into source code and figure out why the exception occurs. The error is produced inside isTimeValid(Time) method:

private[streaming] def isInitialized = zeroTime != null
private[streaming] def initialize(time: Time) {
  if (zeroTime != null && zeroTime != time) {
    throw new SparkException(s"ZeroTime is already initialized to $zeroTime"
      + s", cannot initialize it again to $time")
  }
  zeroTime = time
  // ...
}

private[streaming] def isTimeValid(time: Time): Boolean = {
  if (!isInitialized) {
    throw new SparkException (this + " has not been initialized")
  }
  // ...
}

As you can see, the error comes from null value of zeroTime property representing the time when streaming starts. How it's possible than for some cases it's defined for some other cases it's not defined ? The mystery is hidden in org.apache.spark.streaming.scheduler.JobGenerator#start() method, and more precisely by this instruction:

if (ssc.isCheckpointPresent) {
  restart()
} else {
  startFirstTime()
}

When the test is executed for the first time, the checkpoint is obviously not present. So, startFirstTime() is invoked and because it, among others, calls DStream's initialize(Time), the zeroTime is defined and stored in DStream object:

def start(time: Time) {
  this.synchronized {
    require(zeroTime == null, "DStream graph computation already started")
    zeroTime = time
    startTime = time
    outputStreams.foreach(_.initialize(zeroTime))
    outputStreams.foreach(_.remember(rememberDuration))
    outputStreams.foreach(_.validateAtStart)
    inputStreams.par.foreach(_.start())
  }
}

For the case of reexecution, when a metadata checkpoint was made and the context was restored, org.apache.spark.streaming.scheduler.JobGenerator#restart() method is used. Unlike startFirstTime(), it doesn't call initialization method on output DStreams.

So we wouldn't have the problem if we hadn't declare processing code outside context factory method. But it's not the case. When job generator is restarted from checkpointed context, some of batches are triggered to reprocessing and it's expected behavior. The problem occurs when the action (temporarySparkFilesDStream.foreachRDD(rdd -> {});) is redefined. In this situation a new DStream is added to DStreamGraph as new output DStream through register method:

private def foreachRDD(
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean): Unit = {
  new ForEachDStream(this,
    context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
  
private[streaming] def register(): DStream[T] = {
  ssc.graph.addOutputStream(this)
  this
}

Now, when restored streaming context starts, it calls org.apache.spark.streaming.scheduler.JobGenerator#restart() method that computes, among others, batches to reprocess. This set includes not only batches from checkpoint but also new output DStream for redeclared foreachRDD action. This method is then called (inside restart()):

jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))

The jobs are generated in generateJobs method. And because RDDs are not defined, orElse({}) instruction is called and time validation is performed in isTimeValid(time), as shown in following methods:

// DStreamGraph.scala
// All jobs are generated here
def generateJobs(time: Time): Seq[Job] = {
  logDebug("Generating jobs for time " + time)
  val jobs = this.synchronized {
    outputStreams.flatMap { outputStream =>
      val jobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
    }
  }
  logDebug("Generated " + jobs.length + " jobs for time " + time)
  jobs
}

// Each outputStream calls this method
private[streaming] def generateJob(time: Time): Option[Job] = {
  getOrCompute(time) match {
    case Some(rdd) =>
      val jobFunc = () => {
        val emptyFunc = { (iterator: Iterator[T]) => {} }
        context.sparkContext.runJob(rdd, emptyFunc)
      }
      Some(new Job(time, jobFunc))
    case None => None
  }
}

// If RDD for given time doesn't exist, it's created in orElse({}) part
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
  // If RDD was already generated, then retrieve it from HashMap,
  // or else compute the RDD
  generatedRDDs.get(time).orElse {
    // Compute the RDD if time is valid (e.g. correct time in a sliding window)
    // of RDD generation, else generate nothing.
    if (isTimeValid(time)) {
    //...
}

The "org.apache.spark.streaming.dstream.MappedDStream@7a388990 has not been initialized" exception occurs when RDD is computed. Recall, zeroTime attribute is not set because DStreamGraph's start(Time) method is never called and the validation in isTimeValid(Time) is based on it.

Fixing DStream has not been initialized error

The fix for the problem is shown in should_correctly_create_context_when_dstream_is_set_inside_context_creation_method test where DStream processing is defined in factory method creating new streaming context (if checkpoint is missing) and not outside:

 
@Test
public void should_correctly_create_context_when_dstream_is_set_inside_context_creation_method()
        throws InterruptedException {
  executeWithCorrectDStreamDefinition();
  executeWithCorrectDStreamDefinition();
}

private void executeWithCorrectDStreamDefinition() throws InterruptedException {
  JavaStreamingContext streamingContext =
    JavaStreamingContext.getOrCreate(DATA_CHECKPOINT_DIR, () -> createStreamingContextWithProcessing());
  try {
    streamingContext.start();
    streamingContext.awaitTerminationOrTimeout(BATCH_INTERVAL * 2);
  } finally {
    streamingContext.stop(STOP_SPARK_CONTEXT);
  }
}

private static JavaStreamingContext createStreamingContextWithProcessing() {
  JavaSparkContext batchContext = new JavaSparkContext(CONFIGURATION);
  JavaStreamingContext streamingContext =
          new JavaStreamingContext(batchContext, Durations.milliseconds(BATCH_INTERVAL));
  streamingContext.checkpoint(DATA_CHECKPOINT_DIR);
  // Unlike other creation method, this one initializes DStream processing
  JavaDStream<String> temporarySparkFilesDStream =
          streamingContext.textFileStream("./resources/files/numbers.txt");
  temporarySparkFilesDStream.foreachRDD(rdd -> {});
  return streamingContext;
}

The "org.apache.spark.streaming.dstream.MappedDStream@7a388990 has not been initialized" exception is not very meaningful. It tells that something goes wrong but not mandatory what. This post explains more in details why it occurs. In the first part it shows how to reproduce the error. The secod part digs into code and looks for the reason of the exception. We can learn that it occurs because DStreamGraph is restored from checkpoint and the method setting zeroTime attribute in all output DStream is not called. The last part shows how to fix it by moving all processing code inside factory method creating streaming context.

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!