Deployment modes and master URLs in Spark

on waitingforcode.com

Deployment modes and master URLs in Spark

Spark has 2 deployment modes that can be controlled in fine-grained way thanks to master URL property.

This post lists and explains different deployment modes in Spark. It's divided in 3 parts. The first concerns master URLs and it shows their different types. The second part compares client and cluster deployment mode in Spark while the last shows some tests of master URLs.

Master URLs

First of all, the master URL property represents the endpoint of the master server. As we'll see below, it can either by "relative" or "absolute". The "relative" indicates the place where master is executed (localhost, remote host). The "absolute" mode points to specific master server.

Locally Spark can be launched with various master URL values:

  • local - it's the simplest strategy that executed Spark in 1 thread without any parallelism where driver and executor are executed on 1 thread.
  • local[K] - where K represents the number of threads reserved to Spark program. If K is equal to *, it means that Spark will be executed with as many worker threads as logical cores on the machine.
  • local-cluster[S, SC, MS] - this master URL is widely used in unit tests to simulate the execution in cluster locally. The S parameter represents the number of slaves, SC the number of cores per slave and MS the number of memory per slave. However, using it outside Spark project's code is not simple because of following 3 prerequisites: environment variable SPARK_SCALA_VERSION definition, the need of compiled jars at ./assembly/target/${Scala_Version}/jars and an additional definition of properties in configuration: spark.driver.extraClassPath = sys.props("java.class.path") and "spark.executor.extraClassPath", sys.props("java.class.path").
  • local[N, M] - where N is equal to the number of threads and M to the number of maximal task failures. M property is especially useful during tests to verify if task is launched M number of times in the case of errors. By default the local TaskSetManager is created with M=1. It means that once a given particular task fails, the entire task will be aborted and not retried.

Another value for master URL property starts with spark://. It's widely used in Spark's standalone installation. So configured application will connect to master server defined by spark:// address.

Spark also has master URLs specific to supported resource managers. The first one is mesos://. Its address, composed of host and port, must point to started MesosClusterDispatcher. Another possible resource manager is YARN and Spark program is submitted to it when master URL is equal to yarn. Unlike Mesos, it doesn't accept any parameters. Instead, the resource manager is resolved through corresponding environment variables (HADOOP_CONF_DIR or YARN_CONF_DIR).

Client and cluster deployment

When Spark application is submitted through spark-submit script, the parameter indicating deployment mode (--deploy-mode) can take 1 of 2 values: client or cluster. The difference between them is quite important from the execution point of view.

Client deploy mode is the default value. When used, the machine executing spark-submit command will be elected as a driver. In the other side, cluster deploy mode instead of choosing driver directly, will first contact resource manager. The resource manager will analyze all slaves and select the one having enough available resources to execute Spark program as a driver.

There are several differences between both deployment modes:

  • in client mode the driver is executed in the same process as the client submitting the application; in cluster mode the driver can be executed on slave nodes
  • in client mode driver sends JAR files to workers through direct connection while in cluster mode these JARs must be available somewhere publicly because they're downloaded by workers
  • cluster mode supports --supervise flag that can be set during spark-submit execution. Thanks to it, if the application exists with non-zero exit code, it will be automatically relaunched.
  • when YARN is used, the client mode uses the ApplicationMaster to request resources. In cluster mode, the driver runs as ApplicationMaster process on one of cluster's nodes

Master URL examples

Before finishing this post, let's make some tests with above master URLs:

val receiver = new Receiver[Int](StorageLevel.MEMORY_ONLY) {
  var receiver:Thread = null
  override def onStart(): Unit = {
    // Run as new thread since it must be non-blocking
    // In the real world we would use something like
    // while(true) {// ...} that is a blocking
    // operation
    receiver = new Thread(new Runnable() {
      override def run(): Unit = {
        println("Starting thread")
        for (nr <- 0 to 4) {
          store(nr)
        }
      }
    })
    receiver.start()
  }

  override def onStop(): Unit = {
    receiver.interrupt()
  }
}

"local master url" should "correctly execute simple batch computation" in {
  val conf = new SparkConf().setAppName("MasterUrl local test").setMaster("local")
  sparkContext= SparkContext.getOrCreate(conf)

  val multipliedNumbers = sparkContext.parallelize(1 to 10)
    .map(_*2)
    .collect()

  multipliedNumbers should contain allOf(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
}

"local master url" should "correctly execute only 1 from 2 receiver input streams" in {
  // As explained in http://spark.apache.org/docs/latest/streaming-programming-guide.html#points-to-remember-1
  // and http://www.waitingforcode.com/apache-spark-streaming/receivers-in-spark-streaming/read
  // each receiver uses 1 thread. So running 2 receivers on "local"
  // master URL will execute only 1 of them
  // Note: sometimes any of receivers will be executed because of lack
  //       of resources.
  val conf = new SparkConf().setAppName("MasterUrl local receivers test").setMaster("local")
  sparkContext= SparkContext.getOrCreate(conf)
  val streamingContext = new StreamingContext(sparkContext, Durations.seconds(2))

  val receiverDStream1 = streamingContext.receiverStream(receiver)
  val receiverDStream2 = streamingContext.receiverStream(receiver)

  val numbersAccumulator = sparkContext.collectionAccumulator[Int]("numbers-accumulator")
  receiverDStream1.union(receiverDStream2).foreachRDD(rdd => {
    rdd.foreach(numbersAccumulator.add(_))
  })

  streamingContext.start()
  streamingContext.awaitTerminationOrTimeout(5000)
  streamingContext.stop()

  numbersAccumulator.value.size shouldEqual(5)
  numbersAccumulator.value should contain allOf(0, 1, 2, 3, 4)
}

"local[3] master url" should "correctly execute 2 receiver input streams" in {
  // Unlike previously, this time 2 threads are reserved to receivers
  // Normally the test should pass every time because
  // it follows the rule of
  // "use “local[n]” as the master URL, where n > number of receivers to run"
  // (http://spark.apache.org/docs/2.1.1/streaming-programming-guide.html#points-to-remember-1)
  val conf = new SparkConf().setAppName("MasterUrl local[3] receiver test").setMaster("local[3]")
  sparkContext= SparkContext.getOrCreate(conf)
  val streamingContext = new StreamingContext(sparkContext, Durations.seconds(2))

  val numbersAccumulator = sparkContext.collectionAccumulator[Int]("numbers-accumulator")
  val receiverDStream1 = streamingContext.receiverStream(receiver)
  val receiverDStream2 = streamingContext.receiverStream(receiver)
  receiverDStream1.union(receiverDStream2).foreachRDD(rdd => {
    rdd.foreach(numbersAccumulator.add(_))
  })

  streamingContext.start()
  streamingContext.awaitTerminationOrTimeout(5000)
  streamingContext.stop()

  numbersAccumulator.value.size shouldEqual(10)
  numbersAccumulator.value should contain theSameElementsAs Iterable(0, 1, 2, 3, 4, 0, 1, 2, 3, 4)
}

This post explores master urls and deployment modes in Spark. The master urls can be divided on local and remote. The first ones are mostly used in integration tests on Spark context created on local machine. For the remote, the execution concerns specific servers (spark://) or resource managers (yarn, mesos://). The second part shown some differences between cluster and client deployment modes. As explained, the biggest difference was related to driver selection.

Share on: