Serialization frameworks are intrinsic part of Big Data systems. Spark is not an exception for this rule and it offers some different possibilities to manage serialization.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
Serialization modes in Spark
Spark needs serialization for every activity related to data movement and computations, such as: shuffle, spilling, caching, tasks triggering or results computation. Nativly, 2 serialization modes are supported and they are configured through spark.serializer property.
The first mode, and at the same time the default one, is based on standard Java serialization (whence the need of implement java.io.Serializable). Under-the-hood it uses classical method transforming object into a stream of bytes:
override def serialize[T: ClassTag](t: T): ByteBuffer = { val bos = new ByteBufferOutputStream() val out = serializeStream(bos) out.writeObject(t) out.close() bos.toByteBuffer }
The second choice is serialization framework called Kryo. It's activated trough spark.kryo.registrationRequired configuration entry. Regarding to Java serialization, Kryo is more performant - serialized buffer takes less place in the memory (often up to 10x less than Java serialization) and it's generated faster. Kryo also has an interesting optimization feature called registration. It means that Kryo will fail every time when it tries to serialize/deserialize unknown (unregistered) class. An unknown class is a class that was not defined in SparkConf's registerKryoClasses(Class[]) method.
One of optimisations is the mapping of full class names to more compact format (integers). For example a class identified by full name com.waitingforcode.model.Address will be represented by the number 3. It reduces, especially in the case of a lot of large objects, the size of serialized record.
Serialization examples in Spark
Below you can find some examples of serialization in Spark:
@Test public void should_fail_on_serializing_class_not_implementing_serializable() { JavaSparkContext sparContext = getJavaSerializationContext(); JavaRDD<PersonNotSerializable> personRDD = sparContext.parallelize(Collections.singletonList(new PersonNotSerializable())); try { personRDD.count(); fail("NotSerializableException should be thrown when manipulated" + " object doesn't implement Serializable"); } catch (Exception se) { assertThat(se.getMessage()) .contains("object not serializable (class: com.waitingforcode.serialization.SerializationTest$PersonNotSerializable,"); } } @Test public void should_fail_on_serializing_not_registered_class() { Class[] registeredClasses = new Class[]{ Person.class, scala.collection.mutable.WrappedArray.ofRef.class, Object[].class }; JavaSparkContext sparkContext = getKryoSerializationContext(registeredClasses, new Tuple2<>("spark.kryo.registrationRequired", "true")); List<Person> personData = Collections.singletonList( Person.valueOf("Name", 30, Address.valueOf("street", "city")) ); JavaRDD<Person> personRDD = sparkContext.parallelize(personData); try { personRDD.count(); fail("Should fail because Person class is not registered"); } catch (Exception e) { assertThat(e.getMessage()).contains("Class is not registered:" + " com.waitingforcode.serialization.SerializationTest$Address"); } } @Test public void should_correctly_serialize_registered_class() { Class[] registeredClasses = new Class[]{Person.class, Address.class, scala.collection.mutable.WrappedArray.ofRef.class, Object[].class}; JavaSparkContext sparkContext = getKryoSerializationContext(registeredClasses, new Tuple2<>("spark.kryo.registrationRequired", "true") ); List<Person> personData = Collections.singletonList( Person.valueOf("Name", 30, Address.valueOf("street", "city")) ); JavaRDD<Person> personRDD = sparkContext.parallelize(personData); long notNullCounter = personRDD.filter(person -> person != null).count(); assertThat(notNullCounter).isEqualTo(1); } @Test public void should_compare_size_of_registered_unregistered_and_java_serialized_objects() throws IOException, InterruptedException { String checkpointDir = "/home/konieczny/tmp/spark/checkpoints"; FileUtils.cleanDirectory(new File(checkpointDir)); // serialized data List<Person> personData = new ArrayList<>(); for (int i = 0; i < 1_000; i++) { personData.add(Person.valueOf("Name"+i, 30+i, Address.valueOf("street"+i, "city"+i))); } // First, we check Kroy with registered classes Class[] registeredClasses = new Class[]{Person.class, Address.class, scala.collection.mutable.WrappedArray.ofRef.class, Object[].class }; JavaSparkContext sparkContextRegisteredClasses = getKryoSerializationContext(registeredClasses, new Tuple2<>("spark.kryo.registrationRequired", "true") ); sparkContextRegisteredClasses.setCheckpointDir(checkpointDir); JavaRDD<Person> peopleRDDRegistered = sparkContextRegisteredClasses.parallelize(personData); // Mark as to checkpoint and trigger an action peopleRDDRegistered.checkpoint(); peopleRDDRegistered.count(); // Get checkpointed directory String registeredCheckpointedDir = peopleRDDRegistered.getCheckpointFile().get(); // Now, we can do the same but for Kryo with unregistered classes JavaSparkContext sparkContextUnregisteredClasses = getKryoSerializationContext(new Class[]{}, new Tuple2<>("spark.kryo.registrationRequired", "false")); sparkContextUnregisteredClasses.setCheckpointDir("/home/konieczny/tmp/spark/checkpoints"); JavaRDD<Person> peopleRDDUnregistered = sparkContextUnregisteredClasses.parallelize(personData); peopleRDDUnregistered.checkpoint(); peopleRDDUnregistered.count(); String unregisteredCheckpointedDir = peopleRDDUnregistered.getCheckpointFile().get(); // Finally, check the size of serialized objects with Java serialization JavaSparkContext sparkContextJavaSerialization = getJavaSerializationContext(); sparkContextJavaSerialization.setCheckpointDir(checkpointDir); JavaRDD<Person> peopleRDDJavaSerialization = sparkContextJavaSerialization.parallelize(personData); peopleRDDJavaSerialization.checkpoint(); peopleRDDJavaSerialization.count(); String javaSerializedCheckpointedDir = peopleRDDJavaSerialization.getCheckpointFile().get(); // Now, compare sizes long sizeRegistered = FileUtils.sizeOfDirectory(new File(registeredCheckpointedDir.replace("file:", ""))); long sizeUnregistered = FileUtils.sizeOfDirectory(new File(unregisteredCheckpointedDir.replace("file:", ""))); long sizeJavaSerialized = FileUtils.sizeOfDirectory(new File(javaSerializedCheckpointedDir.replace("file:", ""))); // Serialization with Kryo's registration should be the smallest file assertThat(sizeRegistered).isLessThan(sizeUnregistered); assertThat(sizeRegistered).isLessThan(sizeJavaSerialized); // Kroy without registration takes more place than Java serialization assertThat(sizeJavaSerialized).isLessThan(sizeUnregistered); } @Test public void should_fail_on_executing_function_on_not_serializable_object() { // Everything is serialized, at all levels. So an error will be thrown // even if closure or function is serialized and objects it uses is not. JavaSparkContext sparContext = getJavaSerializationContext(); JavaRDD<PersonNotSerializable> personRDD = sparContext.parallelize(Collections.singletonList(new PersonNotSerializable())); try { personRDD.filter(new NotNullPersonPredicate()); personRDD.count(); fail("Should fail on working on serializable function with " + "not serializable object"); } catch (Exception e) { assertThat(e.getMessage()) .contains("object not serializable (class: com.waitingforcode.serialization.SerializationTest$PersonNotSerializable,"); } } // Mark function as serializable to prove that it works on a not serializable object private static class NotNullPersonPredicate implements Function<PersonNotSerializable, Boolean>, Serializable { @Override public Boolean call(PersonNotSerializable personNotSerializable) throws Exception { return personNotSerializable != null; } } private static JavaSparkContext getJavaSerializationContext() { return new JavaSparkContext(CONFIGURATION); } private static JavaSparkContext getKryoSerializationContext(Class<?>[] registeredClasses, Tuple2<String, String>...configEntries) { SparkConf configuration = new SparkConf().setAppName("Serialization Test").setMaster("local[1]") .set("spark.ui.enabled", "false") .set("spark.driver.allowMultipleContexts", "true") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); for (Tuple2<String, String> configEntry : configEntries) { configuration.set(configEntry._1(), configEntry._2()); } if (registeredClasses.length > 0) { configuration.registerKryoClasses(registeredClasses); } return new JavaSparkContext(configuration); } public static class Person implements Serializable { private String name; private int age; private Address address; } public static class Address implements Serializable { private String street; private String city; } public static class PersonNotSerializable {}
This post shows some main facts about serialization in Spark. In the first part it explains where serialization is used and how it should be tuned to be efficient. The second part shows the use of serialization and some errors it can produce on not serializable objects.