JARs split personality problem

Versions: Spark 2.1.0

Often making errors helps to progress. It was my case with spark-submit and local/remote JAR pair. They helped me to understand the role of driver, closures, serialization and some configuration properties.

This post will explain all of that. The first part focuses on the error that encouraged me to deepen the topic of code execution in driver/executors. The second part contains the explanation of this error.

Remote JAR and closures trap example

To illustrate the problem, the following code is used:

// All classes listed below are contained in the JAR 
// defined in setJars method 
object Main {

  @transient val Logger = LoggerFactory.getLogger(classOf[Main.type])

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Spark test local/remote JAR")
      .setJars(Seq("/tmp/spark-jars-error_v1.jar"))
    val sparkContext = new SparkContext(conf)

    val speller = new Speller("Hello world#1")
    Logger.info("Starting processing")
    sparkContext.parallelize((0 to 100))
      .foreach(items => {
        Logger.info("Printing label version#1")
        Action.printLabel(speller)
      })
    Logger.info("Terminating processing")
  }

}


// Actions.scala 
object Action {

  @transient val Logger = LoggerFactory.getLogger(classOf[Action.type])

  def printLabel(speller: Speller): Unit = {
    speller.speak()
    Logger.info("Version#1")
  }

}

// Speller.scala 
class Speller(sentence:String) extends Serializable {

  @transient val Logger = LoggerFactory.getLogger(this.getClass)

  def speak(): Unit = {
    Logger.info(s"I'm saying: ${sentence}")
  }

}

The comments before object and class declaration are important. The code presented above is used in the JAR defined during Spark's configuration construction (setJars(Seq[String]) method). In the other side, the code written below is defined on runtime:

// All classes defined above are contained in the 
// JAR defined during spark-submit execution

// Main.scala 
object Main {

  @transient val Logger = LoggerFactory.getLogger(classOf[Main.type])

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Spark test local/remote JAR")
      .setJars(Seq("/tmp/spark-jars-error_v1.jar"))
    val sparkContext = new SparkContext(conf)

    val speller = new Speller("Hello world#2")
    Logger.info("Starting processing")
    sparkContext.parallelize((0 to 100))
      .foreach(items => {
        Logger.info("Printing label version#2")
        Action.printLabel(speller)
      })
    Logger.info("Terminating processing")
  }

}

// Actions.scala
object Action {

  @transient val Logger = LoggerFactory.getLogger(classOf[Action.type])

  def printLabel(speller: Speller): Unit = {
    speller.speak()
    Logger.info("Version#2")
  }

}


// Speller.scala
class Speller(sentence:String) extends Serializable {

  @transient val Logger = LoggerFactory.getLogger(this.getClass)

  def speak(): Unit = {
    Logger.info(s"I told: ${sentence}")
  }

}

The program was compiled twice. The first compilation, with the logs starting with "Version#1" was moved to /tmp/spark-jars-error_v1.jar. The second JAR has the logs starting with "Version#2" and is used in spark-submit:

./bin/spark-submit --class com.waitingforcode.Main --master spark://localhost:7077 /home/bartosz/workspace/spark-jars-error/target/scala-2.11/spark-jars-error_2.11-1.0.jar

Now the question is, what the executor's logs will contain ? The logs for "Version#1" or "Version#2" ? The following logs fragment contains the answer:

INFO Executor: Fetching spark://192.168.0.12:44326/jars/spark-jars-error_v1.jar with timestamp 1495349061452
INFO TransportClientFactory: Successfully created connection to /192.168.0.12:44326 after 5 ms (0 ms spent in bootstraps)
INFO Utils: Fetching spark://192.168.0.12:44326/jars/spark-jars-error_v1.jar to /tmp/spark-2cf977cd-499b-4172-8ee6-1ceae0c2c150/executor-74f75723-bf8a-4fcb-b134-68a36354fcf7/spark-9fd4dc5f-5262-42c8-a5d2-f99731b0c0a3/fetchFileTemp4408731392027774262.tmp
INFO Utils: Copying /tmp/spark-2cf977cd-499b-4172-8ee6-1ceae0c2c150/executor-74f75723-bf8a-4fcb-b134-68a36354fcf7/spark-9fd4dc5f-5262-42c8-a5d2-f99731b0c0a3/1963530601495349061452_cache to /home/bartosz/programming/spark-2.1.0-bin-hadoop2.7/work/app-20170521084421-0002/0/./spark-jars-error_v1.jar
INFO Executor: Adding file:/home/bartosz/programming/spark-2.1.0-bin-hadoop2.7/work/app-20170521084421-0002/0/./spark-jars-error_v1.jar to class loader
# ...
INFO Main$: Printing label version#1
INFO Speller: I'm saying: Hello world#2
INFO Action$: Version#1

Explaination

As you can see, the log output is mixed. It contains mainly logging code from the 1st version (loaded from "remote" JAR). Only the content of Speller comes from the 2nd version (submitted in spark-submit).

To understand which classes are loaded, let's add some extra Java options during spark-submit execution. The used command looks now like:

./bin/spark-submit --class com.waitingforcode.Main --master spark://localhost:7077 --conf "spark.executor.extraJavaOptions=-verbose:class" --conf "spark.driver.extraJavaOptions=-verbose:class"  /home/bartosz/workspace/spark-jars-error/target/scala-2.11/spark-jars-error_2.11-1.0.jar  > loaded_classes.txt

The -verbose:class prints the details about class loader activity. loaded_classes.txt file contains classes loaded by driver. Among classes loaded from com.waitingforcode package we can find:

[Loaded com.waitingforcode.Main from file:/home/bartosz/workspace/spark-jars-error/target/scala-2.11/spark-jars-error_2.11-1.0.jar]
[Loaded com.waitingforcode.Main$ from file:/home/bartosz/workspace/spark-jars-error/target/scala-2.11/spark-jars-error_2.11-1.0.jar]
[Loaded com.waitingforcode.Speller from file:/home/bartosz/workspace/spark-jars-error/target/scala-2.11/spark-jars-error_2.11-1.0.jar]
[Loaded com.waitingforcode.Main$$anonfun$main$1 from file:/home/bartosz/workspace/spark-jars-error/target/scala-2.11/spark-jars-error_2.11-1.0.jar]

Similarly, for classes loaded by executor we can find:

[Loaded com.waitingforcode.Main$$anonfun$main$1 from file:/home/bartosz/programming/spark-2.1.0-bin-hadoop2.7/work/app-20170521090526-0011/0/./spark-jars-error_v1.jar]
[Loaded com.waitingforcode.Speller from file:/home/bartosz/programming/spark-2.1.0-bin-hadoop2.7/work/app-20170521090526-0011/0/./spark-jars-error_v1.jar]
[Loaded com.waitingforcode.Main$ from file:/home/bartosz/programming/spark-2.1.0-bin-hadoop2.7/work/app-20170521090526-0011/0/./spark-jars-error_v1.jar]
[Loaded com.waitingforcode.Action$ from file:/home/bartosz/programming/spark-2.1.0-bin-hadoop2.7/work/app-20170521090526-0011/0/./spark-jars-error_v1.jar]

As you can see, the mystery of Action is resolved. The class was loaded only on executor's side and driver serialized only an instruction about what to execute - and not with what it should be launched. It also explains what Main's foreach method prints "Printing label version#1" instead of "Printing label version#2". The driver asks the executor to run Main$$anonfun$main$1 and the executor needs to retrieve the content of this class from submitted JAR.

But the case of Speller class is different. The instance of this class is serialized and sent to the executor. We can simply deduce it from the message spelt by the object that is "Hello world#2" and not "Hello world#1". However, the expression used in the logging indicates the use of the 1st version from setJars command - "I'm saying: Hello world#2" instead of "I told: Hello world#2". It explains why among the classes loaded by the executor can find the Speller one.

To resume, the post shows Spark class loading behavior that can seem obvious at first glance but that can also make some problems during testing. For instance you can think about a bug in the executed code while the only bug was that you forgotten to upload the recompiled JAR on remote file system. The second part of the post, through JVM class loading verbosity parameter, explains the logic followed by Spark. It clearly shows that the driver sends only a kind of instructions and it's the executor which loads real content of executed operations - except objects initialized on driver side, serialized and sent to the executor.