Deployment modes and master URLs in Spark

Spark has 2 deployment modes that can be controlled in fine-grained way thanks to master URL property.

This post lists and explains different deployment modes in Spark. It's divided in 3 parts. The first concerns master URLs and it shows their different types. The second part compares client and cluster deployment mode in Spark while the last shows some tests of master URLs.

Master URLs

First of all, the master URL property represents the endpoint of the master server. As we'll see below, it can either by "relative" or "absolute". The "relative" indicates the place where master is executed (localhost, remote host). The "absolute" mode points to specific master server.

Locally Spark can be launched with various master URL values:

Another value for master URL property starts with spark://. It's widely used in Spark's standalone installation. So configured application will connect to master server defined by spark:// address.

Spark also has master URLs specific to supported resource managers. The first one is mesos://. Its address, composed of host and port, must point to started MesosClusterDispatcher. Another possible resource manager is YARN and Spark program is submitted to it when master URL is equal to yarn. Unlike Mesos, it doesn't accept any parameters. Instead, the resource manager is resolved through corresponding environment variables (HADOOP_CONF_DIR or YARN_CONF_DIR).

Client and cluster deployment

When Spark application is submitted through spark-submit script, the parameter indicating deployment mode (--deploy-mode) can take 1 of 2 values: client or cluster. The difference between them is quite important from the execution point of view.

Client deploy mode is the default value. When used, the machine executing spark-submit command will be elected as a driver. In the other side, cluster deploy mode instead of choosing driver directly, will first contact resource manager. The resource manager will analyze all slaves and select the one having enough available resources to execute Spark program as a driver.

There are several differences between both deployment modes:

Master URL examples

Before finishing this post, let's make some tests with above master URLs:

val receiver = new Receiver[Int](StorageLevel.MEMORY_ONLY) {
  var receiver:Thread = null
  override def onStart(): Unit = {
    // Run as new thread since it must be non-blocking
    // In the real world we would use something like
    // while(true) {// ...} that is a blocking
    // operation
    receiver = new Thread(new Runnable() {
      override def run(): Unit = {
        println("Starting thread")
        for (nr <- 0 to 4) {
          store(nr)
        }
      }
    })
    receiver.start()
  }

  override def onStop(): Unit = {
    receiver.interrupt()
  }
}

"local master url" should "correctly execute simple batch computation" in {
  val conf = new SparkConf().setAppName("MasterUrl local test").setMaster("local")
  sparkContext= SparkContext.getOrCreate(conf)

  val multipliedNumbers = sparkContext.parallelize(1 to 10)
    .map(_*2)
    .collect()

  multipliedNumbers should contain allOf(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
}

"local master url" should "correctly execute only 1 from 2 receiver input streams" in {
  // As explained in http://spark.apache.org/docs/latest/streaming-programming-guide.html#points-to-remember-1
  // and http://www.waitingforcode.com/apache-spark-streaming/receivers-in-spark-streaming/read
  // each receiver uses 1 thread. So running 2 receivers on "local"
  // master URL will execute only 1 of them
  // Note: sometimes any of receivers will be executed because of lack
  //       of resources.
  val conf = new SparkConf().setAppName("MasterUrl local receivers test").setMaster("local")
  sparkContext= SparkContext.getOrCreate(conf)
  val streamingContext = new StreamingContext(sparkContext, Durations.seconds(2))

  val receiverDStream1 = streamingContext.receiverStream(receiver)
  val receiverDStream2 = streamingContext.receiverStream(receiver)

  val numbersAccumulator = sparkContext.collectionAccumulator[Int]("numbers-accumulator")
  receiverDStream1.union(receiverDStream2).foreachRDD(rdd => {
    rdd.foreach(numbersAccumulator.add(_))
  })

  streamingContext.start()
  streamingContext.awaitTerminationOrTimeout(5000)
  streamingContext.stop()

  numbersAccumulator.value.size shouldEqual(5)
  numbersAccumulator.value should contain allOf(0, 1, 2, 3, 4)
}

"local[3] master url" should "correctly execute 2 receiver input streams" in {
  // Unlike previously, this time 2 threads are reserved to receivers
  // Normally the test should pass every time because
  // it follows the rule of
  // "use “local[n]” as the master URL, where n > number of receivers to run"
  // (http://spark.apache.org/docs/2.1.1/streaming-programming-guide.html#points-to-remember-1)
  val conf = new SparkConf().setAppName("MasterUrl local[3] receiver test").setMaster("local[3]")
  sparkContext= SparkContext.getOrCreate(conf)
  val streamingContext = new StreamingContext(sparkContext, Durations.seconds(2))

  val numbersAccumulator = sparkContext.collectionAccumulator[Int]("numbers-accumulator")
  val receiverDStream1 = streamingContext.receiverStream(receiver)
  val receiverDStream2 = streamingContext.receiverStream(receiver)
  receiverDStream1.union(receiverDStream2).foreachRDD(rdd => {
    rdd.foreach(numbersAccumulator.add(_))
  })

  streamingContext.start()
  streamingContext.awaitTerminationOrTimeout(5000)
  streamingContext.stop()

  numbersAccumulator.value.size shouldEqual(10)
  numbersAccumulator.value should contain theSameElementsAs Iterable(0, 1, 2, 3, 4, 0, 1, 2, 3, 4)
}

This post explores master urls and deployment modes in Spark. The master urls can be divided on local and remote. The first ones are mostly used in integration tests on Spark context created on local machine. For the remote, the execution concerns specific servers (spark://) or resource managers (yarn, mesos://). The second part shown some differences between cluster and client deployment modes. As explained, the biggest difference was related to driver selection.