Serialization in Spark

Versions: Spark 2.0.0

Serialization frameworks are intrinsic part of Big Data systems. Spark is not an exception for this rule and it offers some different possibilities to manage serialization.

Serialization modes in Spark

Spark needs serialization for every activity related to data movement and computations, such as: shuffle, spilling, caching, tasks triggering or results computation. Nativly, 2 serialization modes are supported and they are configured through spark.serializer property.

The first mode, and at the same time the default one, is based on standard Java serialization (whence the need of implement java.io.Serializable). Under-the-hood it uses classical method transforming object into a stream of bytes:

override def serialize[T: ClassTag](t: T): ByteBuffer = {
  val bos = new ByteBufferOutputStream()
  val out = serializeStream(bos)
  out.writeObject(t)
  out.close()
  bos.toByteBuffer
}

The second choice is serialization framework called Kryo. It's activated trough spark.kryo.registrationRequired configuration entry. Regarding to Java serialization, Kryo is more performant - serialized buffer takes less place in the memory (often up to 10x less than Java serialization) and it's generated faster. Kryo also has an interesting optimization feature called registration. It means that Kryo will fail every time when it tries to serialize/deserialize unknown (unregistered) class. An unknown class is a class that was not defined in SparkConf's registerKryoClasses(Class[]) method.

One of optimisations is the mapping of full class names to more compact format (integers). For example a class identified by full name com.waitingforcode.model.Address will be represented by the number 3. It reduces, especially in the case of a lot of large objects, the size of serialized record.

Serialization examples in Spark

Below you can find some examples of serialization in Spark:

@Test
public void should_fail_on_serializing_class_not_implementing_serializable() {
  JavaSparkContext sparContext = getJavaSerializationContext();
  JavaRDD<PersonNotSerializable> personRDD =
    sparContext.parallelize(Collections.singletonList(new PersonNotSerializable()));
  try {
    personRDD.count();
    fail("NotSerializableException should be thrown when manipulated" +
      " object doesn't implement Serializable");
  } catch (Exception se) {
    assertThat(se.getMessage())
      .contains("object not serializable (class: com.waitingforcode.serialization.SerializationTest$PersonNotSerializable,");
  }
}

@Test
public void should_fail_on_serializing_not_registered_class() {
  Class[] registeredClasses = new Class[]{
    Person.class, scala.collection.mutable.WrappedArray.ofRef.class, Object[].class
  };
  JavaSparkContext sparkContext =
    getKryoSerializationContext(registeredClasses, 
      new Tuple2<>("spark.kryo.registrationRequired", "true"));
  List<Person> personData = Collections.singletonList(
    Person.valueOf("Name", 30, Address.valueOf("street", "city"))
  );
  JavaRDD<Person> personRDD = sparkContext.parallelize(personData);

  try {
    personRDD.count();
    fail("Should fail because Person class is not registered");
  } catch (Exception e) {
    assertThat(e.getMessage()).contains("Class is not registered:" +
      " com.waitingforcode.serialization.SerializationTest$Address");
  }
}

@Test
public void should_correctly_serialize_registered_class() {
  Class[] registeredClasses = new Class[]{Person.class, Address.class,
      scala.collection.mutable.WrappedArray.ofRef.class, Object[].class};
  JavaSparkContext sparkContext =
    getKryoSerializationContext(registeredClasses, 
    new Tuple2<>("spark.kryo.registrationRequired", "true")
  );
  List<Person> personData = Collections.singletonList(
    Person.valueOf("Name", 30, Address.valueOf("street", "city"))
  );
  JavaRDD<Person> personRDD = sparkContext.parallelize(personData);

  long notNullCounter = personRDD.filter(person -> person != null).count();

  assertThat(notNullCounter).isEqualTo(1);
}

@Test
public void should_compare_size_of_registered_unregistered_and_java_serialized_objects() throws IOException, InterruptedException {
  String checkpointDir = "/home/konieczny/tmp/spark/checkpoints";
  FileUtils.cleanDirectory(new File(checkpointDir));
  // serialized data
  List<Person> personData = new ArrayList<>();
  for (int i = 0; i < 1_000; i++) {
    personData.add(Person.valueOf("Name"+i, 30+i, Address.valueOf("street"+i, "city"+i)));
  }
  // First, we check Kroy with registered classes
  Class[] registeredClasses = new Class[]{Person.class, Address.class,
    scala.collection.mutable.WrappedArray.ofRef.class, Object[].class
  };
  JavaSparkContext sparkContextRegisteredClasses =
    getKryoSerializationContext(registeredClasses, 
    new Tuple2<>("spark.kryo.registrationRequired", "true")
  );
  sparkContextRegisteredClasses.setCheckpointDir(checkpointDir);
  JavaRDD<Person> peopleRDDRegistered = sparkContextRegisteredClasses.parallelize(personData);
  // Mark as to checkpoint and trigger an action
  peopleRDDRegistered.checkpoint();
  peopleRDDRegistered.count();
  // Get checkpointed directory
  String registeredCheckpointedDir = peopleRDDRegistered.getCheckpointFile().get();

  // Now, we can do the same but for Kryo with unregistered classes
  JavaSparkContext sparkContextUnregisteredClasses =
    getKryoSerializationContext(new Class[]{}, new Tuple2<>("spark.kryo.registrationRequired", "false"));
  sparkContextUnregisteredClasses.setCheckpointDir("/home/konieczny/tmp/spark/checkpoints");
  JavaRDD<Person> peopleRDDUnregistered = sparkContextUnregisteredClasses.parallelize(personData);

  peopleRDDUnregistered.checkpoint();
  peopleRDDUnregistered.count();

  String unregisteredCheckpointedDir = peopleRDDUnregistered.getCheckpointFile().get();

  // Finally, check the size of serialized objects with Java serialization
  JavaSparkContext sparkContextJavaSerialization = getJavaSerializationContext();
  sparkContextJavaSerialization.setCheckpointDir(checkpointDir);
  JavaRDD<Person> peopleRDDJavaSerialization =
    sparkContextJavaSerialization.parallelize(personData);

  peopleRDDJavaSerialization.checkpoint();
  peopleRDDJavaSerialization.count();

  String javaSerializedCheckpointedDir = peopleRDDJavaSerialization.getCheckpointFile().get();

  // Now, compare sizes
  long sizeRegistered = FileUtils.sizeOfDirectory(new File(registeredCheckpointedDir.replace("file:", "")));
  long sizeUnregistered = FileUtils.sizeOfDirectory(new File(unregisteredCheckpointedDir.replace("file:", "")));
  long sizeJavaSerialized = FileUtils.sizeOfDirectory(new File(javaSerializedCheckpointedDir.replace("file:", "")));

  // Serialization with Kryo's registration should be the smallest file
  assertThat(sizeRegistered).isLessThan(sizeUnregistered);
  assertThat(sizeRegistered).isLessThan(sizeJavaSerialized);
  // Kroy without registration takes more place than Java serialization
  assertThat(sizeJavaSerialized).isLessThan(sizeUnregistered);
}

@Test
public void should_fail_on_executing_function_on_not_serializable_object() {
  // Everything is serialized, at all levels. So an error will be thrown
  // even if closure or function is serialized and objects it uses is not.
  JavaSparkContext sparContext = getJavaSerializationContext();
  JavaRDD<PersonNotSerializable> personRDD =
    sparContext.parallelize(Collections.singletonList(new PersonNotSerializable()));
  try {
    personRDD.filter(new NotNullPersonPredicate());

    personRDD.count();
    fail("Should fail on working on serializable function with " +
      "not serializable object");
  } catch (Exception e) {
    assertThat(e.getMessage())
      .contains("object not serializable (class: com.waitingforcode.serialization.SerializationTest$PersonNotSerializable,");
  }
}

// Mark function as serializable to prove that it works on a not serializable object
private static class NotNullPersonPredicate 
    implements Function<PersonNotSerializable, Boolean>, Serializable { 
  @Override
  public Boolean call(PersonNotSerializable personNotSerializable) throws Exception {
    return personNotSerializable != null;
  }
}


private static JavaSparkContext getJavaSerializationContext() {
    return new JavaSparkContext(CONFIGURATION);
}

private static JavaSparkContext getKryoSerializationContext(Class<?>[] registeredClasses, 
    Tuple2<String, String>...configEntries) {
  SparkConf configuration = new SparkConf().setAppName("Serialization Test").setMaster("local[1]")
    .set("spark.ui.enabled", "false")
    .set("spark.driver.allowMultipleContexts", "true")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
  for (Tuple2<String, String> configEntry : configEntries) {
    configuration.set(configEntry._1(), configEntry._2());
  }
  if (registeredClasses.length > 0) {
    configuration.registerKryoClasses(registeredClasses);
  }
  return new JavaSparkContext(configuration);
}

public static class Person implements Serializable { 
  private String name; 
  private int age; 
  private Address address; 
}

public static class Address implements Serializable {
  private String street; 
  private String city; 
}

public static class PersonNotSerializable {}

This post shows some main facts about serialization in Spark. In the first part it explains where serialization is used and how it should be tuned to be efficient. The second part shows the use of serialization and some errors it can produce on not serializable objects.