Serialization issues - part 1

on waitingforcode.com

Serialization issues - part 1

Issues with not serializable objects are maybe the most painful when we start to work with Spark. But hopefully there are several solutions to them.

The serialization problem will be explained in 2 posts. This one is the first. At the beginning it explains when the serialization problems can occur. The next 2 parts present some common solutions to problems with serialization: objects and recipe functions.

Task not serializable explained

The infamous error org.apache.spark.SparkException: Task not serializable is produced when object used in processing (transformation, action) or its dependency can't be serialized with Java serializer and send to executor(s). This exception can occur in different places where serialization is done, for example:

  • data - data moved between executors (e.g. during shuffle) is also serialized.
  • transformations - computations executed by executors are sent in serialized format by the driver.
  • checkpoint - in Spark Streaming the checkpoint is stored in serialized format.
  • broadcast variables- the division of broadcast variable on blocks is done with serializers.
  • tasks - after defining the execution plan, its particular parts (tasks) are serialized and sent to executors where they're physically launched after deserialization (somewhere related to transformations point).
  • disk spilling - data is stored on disk as serialized objects when there are no more place in memory.

Following examples show some cases when serialization exception occurs:

"serialization" should "fail because of not serializable mapped object" in {
  val serializableException = intercept[SparkException] {
    sparkContext.parallelize(0 to 4)
      .map(new NotSerializableNumber(_))
      .collect()
  }

  serializableException.getMessage
    .contains("object not serializable (class: com.waitingforcode.serialization.NotSerializableNumber") shouldBe(true)
}

"serialization" should "fail because of transformation using not serializable object" in {
  val serializableException = intercept[SparkException] {
    val numberFilter = new NotSerializableFilter(2)
    sparkContext.parallelize(0 to 4)
      .filter(numberFilter.filter(_))
      .collect()
  }

  serializableException.getCause.getMessage
    .contains("object not serializable (class: com.waitingforcode.serialization.NotSerializableFilter") shouldBe(true)
}

"serialization" should "fail because of not serializable broadcast" in {
  val serializableException = intercept[NotSerializableException] {
    sparkContext.broadcast(new NotSerializableBroadcast)
    sparkContext.parallelize(0 to 4).collect()
  }

  serializableException.getMessage
    .contains("object not serializable (class: com.waitingforcode.serialization.NotSerializableBroadcast") shouldBe(true)
}

class NotSerializableBroadcast {}

class NotSerializableFilter(bound: Int) {
  def filter(number: Int): Boolean = {
    number > bound
  }
}

class NotSerializableNumber(number: Int) {
  val label = s"Number#${number}"
} 

Objects exposed through functions

Maybe the most obvious solution is the addition of extends Serializable for each object moved through a network. But it can be painful, especially when given object has some third not serializabe dependencies that can't be modified. One of serious alternatives is the use of objects accessed through functions.

The idea is simple - every potentially not serializable object can be accessed through a function, as in the following test:

"not serializable object" should "be correctly send through network" in {
  object Exposer {
    def get(): LabelMaker = {
      new LabelMaker()
    }
  }

  val labels = sparkContext.parallelize(0 to 1)
    .map(number => Exposer.get().convertToLabel(number))
    .collect()

  labels should contain allOf("Number#0", "Number#1")
}

"not serializable object" should "make mapping fail" in {
  val labelMaker = new LabelMaker
  val sparkException = intercept[SparkException] {
    sparkContext.parallelize(0 to 1)
      .map(number => labelMaker.convertToLabel(number))
      .collect()
  }

  sparkException.getCause.getMessage
    .contains("object not serializable (class: com.waitingforcode.serialization.LabelMaker") shouldBe(true)
} 

class LabelMaker() {

  def convertToLabel(number: Int): String = {
    s"Number#${number}"
  }
}

It works because the functions declared inside objects don't need to be serialized. They're equivalent of Java's static methods and in order to call them, they must only be reachable through the classpath (i.e. JAR shared among driver and workers).

This solution is pretty simple to implement and with some extra effort, we can easily evolve it to handle singletons or other patterns.

Lazy objects and function recipes

Another solution, similar to the previous one, is based on lazy objects and function recipes. Here the goal is to mark not serializable object as lazy and create it from function (recipe). This solution can be easily extended with broadcast variables that can help to keep only 1 given instance on executors (e.g. long running open connection to database).

It's important to access not serializable objects lazily. Thanks to that the object is accessed only when it's needed - obviously when it's used the first time for the processing on executor's side. Without laziness, the object would be created before and, alas, send to executors instead of function's recipe. The following example shows how it could work:

"lazy loaded not serializable object" should "be correctly sent through network" in {
  val numbersAccumulator = sparkContext.collectionAccumulator[Int]("iterated numbers accumulator")
  val connector = NotSerializableLazyConnector()
  sparkContext.parallelize(0 to 1)
    .foreachPartition(numbers => {
      numbers.foreach(number => {
        connector.push(number)
        numbersAccumulator.add(number)
      })
    })

  numbersAccumulator.value should contain allOf(0, 1)
}

  "lazy loaded not serializable object" should "be correctly sent once through network" in {
    val numbersAccumulator = sparkContext.collectionAccumulator[Int]("iterated numbers accumulator")
    // This version is a variation of the previous test because it
    // sends given object only once and thanks to that we can, for example,
    // keep the connection open for all tasks executed on given executor.
    // The connector keeps its connection "open" because it initialized lazily and since it's created from a broadcast variable,
    // it's guaranteed that only once such instance exists in the executor. 
    val connectorBroadcast = sparkContext.broadcast(NotSerializableLazyConnector())
    sparkContext.parallelize(0 to 1)
      .foreachPartition(numbers => {
        numbers.foreach(number => {
          connectorBroadcast.value.push(number)
          numbersAccumulator.add(number)
        })
      })
    sparkContext.parallelize(4 to 5)
      .foreachPartition(numbers => {
        numbers.foreach(number => {
          connectorBroadcast.value.push(number)
          numbersAccumulator.add(number)
        })
      })
    sparkContext.parallelize(7 to 8)
      .foreachPartition(numbers => {
        numbers.foreach(number => {
          connectorBroadcast.value.push(number)
          numbersAccumulator.add(number)
        })
      })

    numbersAccumulator.value should contain allOf(0, 1, 4, 5, 7, 8)
    NotSerializableLazyConnector.InitializationCount.get() shouldEqual 1
    NotSerializableLazyConnector.InitializationCount.set(0)
  }
}

"eagerly loaded not serializable object" should "make processing fail" in {
  val connector = NotSerializableEagerConnector()
  val sparkException = intercept[SparkException] {
    sparkContext.parallelize(0 to 1)
      .foreachPartition(numbers => {
        numbers.foreach(number => {
          connector.push(number)
        })
      })
  }

  sparkException.getCause.getMessage
    .contains("object not serializable (class: com.waitingforcode.serialization.NotSerializableSender")
    .shouldBe(true)
} 

class NotSerializableEagerConnector(creator: () => NotSerializableSender) extends Serializable {

  val sender = creator()

  def push(value: Int) = {
    sender.push(value)
  }
}

object NotSerializableEagerConnector {
  def apply(): NotSerializableEagerConnector = {
    new NotSerializableEagerConnector(() => new NotSerializableSender())
  }
}

class NotSerializableLazyConnector(creator: () => NotSerializableSender) extends Serializable {

  lazy val sender = creator()

  def push(value: Int) = {
    sender.push(value)
  }
}

class NotSerializableSender {
  def push(value: Int) = {
    println(s"Pushing ${value}")
  }
}

object NotSerializableLazyConnector {
  val InitializationCount = new AtomicInteger(0)

  def apply(): NotSerializableLazyConnector = {
    InitializationCount.incrementAndGet()
    new NotSerializableLazyConnector(() => new NotSerializableSender())
  }
}

Serialization issues, especially when we use a lot third part classes, are inherent part of Spark applications. The serialization occurs, as we could see in the first part of the post, almost everywhere (shuffling, transformations, checkpointing...). But hopefully, there are a lot of solutions and 2 of them were described in this post. The first one was based on not serializable objects exposed through Scala's object methods. The second solution was similar but it had a little bit more involved parts, i.e. wrapper holding lazy loaded not serializable object. The next part of this post will present some other solutions to serialization problems in Spark.

Read also about Serialization issues - part 1 here: Using Non-Serializable Objects in Apache Spark , Spark and Kafka integration patterns .

Share, like or comment this post on Twitter:

Share on: