Serialization issues - part 2

Versions: Spark 2.1.0

Some of previous posts (Serialization issues - part 1) presented some of solutions for serialization problems. This post is its continuation.

This post presents some of other solutions for serialization problems. Its first part shows the use of child serializable classes. The second presents serializable wrappers. The third one explains the use of @transient annotation.

Serializable child class

Normally when some class is not serializable, the simplest thing to do is to add extends Serializable to it. However, it's not always possible. Sometimes not serializable class can come from 3rd part package and the modification won't be possible. In that case we could create a serializable child class for the not serializable one.

It appears to be simple operation but sometimes it can be difficult. The use of Serializable expects that super class has a no-arg constructor. Without it, the no valid constructor exception will be thrown.

The following tests show a simple use of serializable child class:

"serializable child of not serializable parent" should "be correctly sent through network" in {
  val labels = sparkContext.parallelize(0 to 1)
    .map(number => new NotSerializableTextWrapper(number))
    .collect()

  labels.map(wrapper => wrapper.getLabel()) should contain allOf("overridden_number#0", "overridden_number#1")
}

"not serializable object" should "make processing fail" in {
  val serializableException = intercept[SparkException] {
    sparkContext.parallelize(0 to 1)
      .map(number => new NotSerializableText(number))
      .collect()
  }

  serializableException.getMessage
    .contains("object not serializable (class: com.waitingforcode.serialization.NotSerializableText") shouldBe(true)
}


class NotSerializableTextWrapper(number: Int) extends NotSerializableText(number) with Serializable {

  override def getLabel(): String = {
    s"overridden_number#${number}"
  }


}

class NotSerializableText(number:Int) {

  // no-arg constructor is mandatory for the first
  // non serializable superclass is serializable
  def this() = {
    this(0)
  }

  def getLabel(): String = {
    s"number#${number}"
  }

}

Serializable factory wrapper

The other concept helping to beat serializable problems is serializable factory wrapper. It's looks similar to recipe functions from the previous post. The difference is that it exposes expected objects through a method and not lazy fields, and is more useful on sending data from executor to driver. The following test shows simple use of wrappers:

"serializable wrapper" should "help to create not serializable object" in {
  val labels = sparkContext.parallelize(0 to 1)
    .map(number => new SerializableWrapper(number))
    .collect()

  labels.map(wrapper => wrapper.notSerializableEntity.label) should contain
    allOf("Number#0", "Number#1")
}

"not serializable object without wrapper" should "make processing fail" in {
  val serializableException = intercept[SparkException] {
    sparkContext.parallelize(0 to 1)
      .map(number => new NotSerializableEntity(number))
      .collect()
  }

  serializableException.getMessage
    .contains("object not serializable (class: com.waitingforcode.serialization.NotSerializableEntity") shouldBe(true)
}


class SerializableWrapper(number: Int) extends Serializable {
  // Wraps not serializable entity
  // Let's suppose that NotSerializableEntity is a 3rd part class
  // and we can't modify it
  lazy val notSerializableEntity = new NotSerializableEntity(number)
  def createEntity(number: Int): NotSerializableEntity = {
    new NotSerializableEntity(number)
  }
}

class NotSerializableEntity(number: Int) {
  val label = s"Number#${number}"
}

@transient annotation

Thanks to @transient annotation we can inform serializer that given field can't be serialized with the whole object. When it's used together with lazy invocation, it suggest to recalculate object instead of serialize it.

In Spark @transient annotation is widely used with loggers that often aren't serializable and more than often are very useful. This solution is similar to the one from previous part called Lazy objects and function recipes. An example of implementation can look like:

"transient and lazy loaded not serializable field" should "be correctly sent through network" in {
  val holderWithTransient = new HolderWithTransient
  val transientObjectAccumulator = sparkContext.collectionAccumulator[String]("accumulator of lazy transient object")
  sparkContext.parallelize(0 to 1)
    .foreachPartition(numbers => {
      numbers.foreach(number => {
        val className = holderWithTransient.notSerializableField.getClass.getName
        transientObjectAccumulator.add(className)
      })
    })

  transientObjectAccumulator.value.size() shouldEqual(2)
  transientObjectAccumulator.value.get(0) shouldEqual("com.waitingforcode.serialization.NotSerializableField")
  transientObjectAccumulator.value.get(1) shouldEqual("com.waitingforcode.serialization.NotSerializableField")
}

"transient and not lazy loaded not serializable field" should "be correctly sent through network as null" in {
  val holderWithNotLazyTransient = new HolderWithNotLazyTransient
  val transientObjectAccumulator = sparkContext.collectionAccumulator[String]("accumulator of lazy transient object")
  // When @transient field is not marked as lazy loaded it means that
  // it's not serialized and that's all; i.e. it's not
  // recalculated
  sparkContext.parallelize(0 to 1)
    .foreachPartition(numbers => {
      numbers.foreach(number => {
        if (holderWithNotLazyTransient.notSerializableField == null) {
          transientObjectAccumulator.add("null")
        }
      })
    })

  transientObjectAccumulator.value.size() shouldEqual(2)
  transientObjectAccumulator.value.get(0) shouldEqual("null")
  transientObjectAccumulator.value.get(1) shouldEqual("null")
}

"not transient not serializable field" should "make processing fail" in {
  val holderWithoutTransient = new HolderWithoutTransient
  val sparkException = intercept[SparkException] {
    sparkContext.parallelize(0 to 1)
      .foreachPartition(numbers => {
        numbers.foreach(number => {
          val field = holderWithoutTransient.notSerializableField
          println(s"Field was ${field}")
        })
      })
  }

  sparkException.getCause.getMessage
    .contains("object not serializable (class: com.waitingforcode.serialization.NotSerializableField")
    .shouldBe(true)
}
 

class HolderWithoutTransient() extends Serializable {
  val notSerializableField = new NotSerializableField
}

class HolderWithNotLazyTransient() extends Serializable {

  @transient val notSerializableField = new NotSerializableField

}

class HolderWithTransient() extends Serializable {

  @transient lazy val notSerializableField = new NotSerializableField

}

class NotSerializableField {}

This post shows more solutions for the problems with not serializable objects in Spark. The first part proves that sometimes we can use serializable children classes. However, it also shows that it's painful to implement when the superclass doesn't have no-arg constructor. The second part shows the idea of wrapper factories that create not serializable objects only when they don't need to be sent through a network. However, it also has some drawbacks because the GC pressure increases with every wrapper. The last part shows the use of @transient annotation with lazy loaded fields. In Spark this solution is widely used with loggers that very often aren't serializable and only lazy loading on executor side can make them work correctly.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!