Closures in Scala

on waitingforcode.com

Closures in Scala

If you're reading this blog, you've certainly noticed its big interest for Apache Spark. One of first problems we encounter with this data processing framework is a "Task not serializable" error that is caused by a not serializable closure. In this post, outside of Spark's context, we'll focus on these specific functions.

The post is divided in 3 sections. In the beginning it defines closures. Later, through some learning tests, it shows their internal details and use cases in 2 separated parts.

Closures defined

Functions in Scala are first-class citizens. Thus, they can be represented as class fields or be passed to other functions as input parameters. If the values used by these "function-parameters" rely only on the values from the function's body, it shouldn't be a problem. But sometimes it's not true and the function's body contains one or more references to its outside scope. It's for instance the case of the following code:

  class Sum {
    private val Nr1 = 1
    
    def add(nr2: Int): Int = Nr1 + nr2
  }

As you can see, the function's result depends on the value stored in Sum class field. It's exactly the situation for which the closures are used. The name of this function comes from the fact of closing the function literal by capturing the bindings of its free variables. Free variables are the ones used in the closed function that are neither local nor format parameters to this function. It's worth mentioning that free variables are used as it, that said a var stays a var and a val stays a val. It means that if we create a closure for a method using a var, the result of the closure may be different for 2 different calls with the same input parameter.

Paul Cantrell summarizes the closures pretty well in its article about Closures in Ruby as the functions that:

  • We can pass around the code block as a value
  • We can execute at any time in any place in the code knowing the closure
  • Are able to use the variables from their creation context

Maybe the most popular use of closures is their serialization and sending across the network. It's the case of Apache Spark where the engine transforms data processing functions to closures in order to distribute them to their physical executors.

Closures decompiled

To get a better idea about what happens when the closures are declared, let's write some examples and see disassembled code with javap -v -p ClosureValTest$$anonfun$main$1.class and javap -v -p ClosureVarTest$$anonfun$main$2.class commands. One of the closures in this example uses val and another var as free variable:

// ClosureAddVal.scala
object ClosureAddVal {

  private val NrToAdd = 30

  def add(nr: Int) = nr + NrToAdd

}

// ClosureAddVar.scala
object ClosureAddVar {

  private var NrToAdd = 30

  def add(nr: Int) = nr + NrToAdd

}

// ClosuresTests.scala
object ClosureValTest {

  def main(args: Array[String]): Unit = {
    localSum(ClosureAddVal.add, 10)
  }

  def localSum(sumMethod: (Int) => Int, nr: Int): Int = {
    sumMethod(nr)
  }

}

object ClosureVarTest {

  def main(args: Array[String]): Unit = {
    localSum(ClosureAddVar.add, 10)
  }

  def localSum(sumMethod: (Int) => Int, nr: Int): Int = {
    sumMethod(nr)
  }

}

Decompiled code returned the same results for both cases. The free variables are represented here as references:

# var
  #27 = Class              #26            // ClosureAddVar$
  #28 = Utf8               MODULE$
  #29 = Utf8               LClosureAddVar$;
  #30 = NameAndType        #28:#29        // MODULE$:LClosureAddVar$;
  #31 = Fieldref           #27.#30        // ClosureAddVar$.MODULE$:LClosureAddVar$;
  #32 = Utf8               add
  #33 = NameAndType        #32:#18        // add:(I)I
  #34 = Methodref          #27.#33        // ClosureAddVar$.add:(I)I

# val
  #27 = Class              #26            // ClosureAddVal$
  #28 = Utf8               MODULE$
  #29 = Utf8               LClosureAddVal$;
  #30 = NameAndType        #28:#29        // MODULE$:LClosureAddVal$;
  #31 = Fieldref           #27.#30        // ClosureAddVal$.MODULE$:LClosureAddVal$;
  #32 = Utf8               add
  #33 = NameAndType        #32:#18        // add:(I)I
  #34 = Methodref          #27.#33        // ClosureAddVal$.add:(I)I

Let's see now what happens when we define the closure in the same scope as the analyzed class:

object ClosureTest {
  def test() {
    val immutableNr = 3
    var mutableNr = 0
    val closure:() => Unit = () => {
      val immutableSum = immutableNr + mutableNr
    }
    closure()
  }
  test()
}

After decompiling (javap -p -v ClosureTest$$anonfun$1.class) it we can notice that the compiler made some interesting optimizations. The bytecode generated for above version looks like:

Constant pool:
# ...
    #23 = NameAndType        #22:#11        // apply$mcV$sp:()V
    #24 = Methodref          #2.#23         // ClosureTest$$anonfun$1.apply$mcV$sp:()V
    #25 = Utf8               this
    #26 = Utf8               LClosureTest$$anonfun$1;
    #27 = NameAndType        #17:#18        // immutableNr$1:I
    #28 = Fieldref           #2.#27         // ClosureTest$$anonfun$1.immutableNr$1:I
    #29 = NameAndType        #19:#20        // mutableNr$1:Lscala/runtime/IntRef;
    #30 = Fieldref           #2.#29         // ClosureTest$$anonfun$1.mutableNr$1:Lscala/runtime/IntRef;
    #31 = Utf8               scala/runtime/IntRef
    #32 = Class              #31            // scala/runtime/IntRef
    #33 = Utf8               elem
    #34 = NameAndType        #33:#18        // elem:I
    #35 = Fieldref           #32.#34        // scala/runtime/IntRef.elem:I
    #36 = Utf8               immutableSum
public void apply$mcV$sp();
descriptor: ()V
flags: ACC_PUBLIC
Code:
  stack=2, locals=2, args_size=1
     0: aload_0
     1: getfield      #28                 // Field immutableNr$1:I
     4: aload_0
     5: getfield      #30                 // Field mutableNr$1:Lscala/runtime/IntRef;
     8: getfield      #35                 // Field scala/runtime/IntRef.elem:I
    11: iadd
    12: istore_1
    13: return

To get a better idea what these optimizations can be, let's replace var by val and reload the bytecode for it:

Constant pool:
# ...
  #22 = NameAndType        #21:#11        // apply$mcV$sp:()V
  #23 = Methodref          #2.#22         // ClosureTest$$anonfun$1.apply$mcV$sp:()V
  #24 = Utf8               this
  #25 = Utf8               LClosureTest$$anonfun$1;
  #26 = NameAndType        #17:#18        // immutableNr$1:I
  #27 = Fieldref           #2.#26         // ClosureTest$$anonfun$1.immutableNr$1:I
  #28 = NameAndType        #19:#18        // mutableNr$1:I
  #29 = Fieldref           #2.#28         // ClosureTest$$anonfun$1.mutableNr$1:I
  #30 = Utf8               immutableSum

public void apply$mcV$sp();
descriptor: ()V
flags: ACC_PUBLIC
Code:
  stack=2, locals=2, args_size=1
     0: aload_0
     1: getfield      #27                 // Field immutableNr$1:I
     4: aload_0
     5: getfield      #29                 // Field mutableNr$1:I
     8: iadd
     9: istore_1
    10: return

The code using a var is much more verbose because it retrieves the reference to the mutable field in 2 operations while the one for immutable does it only once. Moreover, the immutable fields are stored as ints and not as IntRef, so it takes slightly less space.

Scala closures examples

The following example shows the main closure specificity - enclosing character. Here TestedPerson class has the field called the same way as the one used by closure, but it's always the latter one is used. There is no shadowing effect:

behavior of "closure"

it should "always use the parameter from the function's scope" in {
  val testedPerson = TestedPerson("X", "Z")

  val personNames = testedPerson.getNames(Concatenators.concatenate _)

  // As you can see, the result returns 'A Z' and not 'X Z' as we could expect
  // The closure *always* references to the variables in its own scope
  // Thus here, even if TestedPerson has a parameter of the same name as concatenator,
  // it doesn't use it
  personNames shouldEqual "A Z"
}
case class TestedPerson(firstName: String, lastName: String) {

  def getNames(concatenator: (String) => String): String = {
    concatenator(lastName)
  }

}

object Concatenators {
  private val firstName = "A"
  def concatenate(lastName: String): String = s"${firstName} ${lastName}"
}

Another closure feature relates to parameter type. If it's immutable, there is not magic - closure result is always the same. But if it's a variable, the result will vary with the variable change:

it should "be aware of var modifications in function's scope" in {
  def adder(sumMethod: (Int) => Int, nr1: Int): Int = sumMethod(nr1)

  val sumWithDefault = adder(Closures.addWithDefault, 20)

  sumWithDefault shouldEqual 70

  Closures.DefaultNr2ToAdd = 20
  val sumWithDefaultAfterChange = adder(Closures.addWithDefault, 20)

  sumWithDefaultAfterChange shouldEqual 40
}
object Closures {

  var DefaultNr2ToAdd = 50

  def addWithDefault(nr1: Int) = nr1 + DefaultNr2ToAdd

  def addFromFactory(nr1: Int) = (nr2: Int) => nr1 + nr2

}

Two another closure points, equality and construction method, are presented in the next snippet:

it should "treat 2 closures for the same function as different" in {
  val firstClosure = Concatenators.concatenate _
  val secondClosure = Concatenators.concatenate _

  firstClosure shouldNot equal(secondClosure)
}
it should "be created from a closure factory method" in {
  def adder(sumMethod: (Int) => Int, nr1: Int): Int = sumMethod(nr1)
  val sumFor20 = ClosuresWithFactory .addFromFactory(20)
  val sumFor40 = ClosuresWithFactory .addFromFactory(40)

  val sumFor20Plus5 = adder(sumFor20, 5)
  val sumFor40Plus5 = adder(sumFor40, 5)

  sumFor20Plus5 shouldEqual 25
  sumFor40Plus5 shouldEqual 45
}
object ClosuresWithFactory {

  def addFromFactory(nr1: Int) = (nr2: Int) => nr1 + nr2
 
}

The final point concerns serialization that it's closure's use case in Apache Spark. Through these 2 examples we can get an insight on the problems we can encounter on serializing the code:

it should "fail when the code is not serializable" in {
  val computer = new NotSerializableMathComputer()

  val serializationError = intercept[NotSerializableException] {
    serialize(computer.add)
  }

  serializationError.toString shouldEqual "java.io.NotSerializableException: MathComputer"
}

it should "work when the class is explicitly serializable" in {
  val computer = new SerializableMathComputer()
  serialize(computer.add)
}
def serialize(fn: Int => Int): Array[Byte] = {
  val stream = new ByteArrayOutputStream
  val oos = new ObjectOutputStream(stream)
  oos.writeObject(fn)
  oos.close()
  stream.toByteArray
}
class NotSerializableMathComputer {
  val nr1 = 3
  def add = (nr2: Int) => nr1 + nr2
}

class SerializableMathComputer extends Serializable {
  val nr1 = 3
  def add = (nr2: Int) => nr1 + nr2
}

Scala closures are an interesting construction to pass a function with some elements of its scope to other functions. It can be useful in local programs but also in distributed ones where some processing code is serialized and sent to its physical executors. But to make it work, the code must be serializable, either by explicitly implementing Serializable interface or by being a singleton, without using any external not serializable dependencies.

Share, like or comment this post on Twitter:

Share on: