Coders in Apache Beam

Versions: Apache Beam 2.2.0 https://github.com/bartosz25/beam-learning

Since in distributed computing the data moves either locally (within single worker) or remotely (between several different workers), it must have a format understandable by the machine. And this format is guaranteed by the operation of serialization, also present in Apache Beam.

This post presents the coders that in Apache Beam are responsible for serialization. The first part introduces the coders in the data processing pipelines. The second part presents the Java API of this layer. The next one focuses on the role of coder in the pipeline. The final section shows some examples with the use of coders.

Coders explained

As mentioned in the introduction, the data in Apache Beam moves. It can either be persisted on disk or shuffled between workers. The coder is responsible for these 2 operations since it explains how the data can be written to disk or transferred over the network and converted back to objects. The coder is then required for every PCollection<T> manipulation. Without it, the pipeline won't execute because the runner won't know how to translate Java objects to persistable format.

The coder can either be defined explicitly or inferred from PCollection's type. The first definition manner is provided through one of available methods in CoderRegistry (registerCoderProvider, registerCoderForClass, registerCoderForType). In the other side, the inference solution is based on type properties retrieved with the help of java.lang.Class. Globally, each of common Java types (Integer, String, Long...) has an associated default coder that is returned later in the processing by another structure called coder provider.

Technically the coder is represented by typed org.apache.beam.sdk.coders.Coder<T> instance. The type represents the encoded and decoded values. And according to the Javadoc, the coder itself is...serialized. The serialization occurs during the job creation. The deserialization has place later, before ooder's use in the pipeline. The coder is used among others when reading data from a source or writing data to a sink. It's also called to materialize intermediate results.

Coders internals

More in details, Coder's 2 core methods are void encode(T value, OutputStream outStream) and T decode(InputStream inStream). The first one is responsible for translating Java object to a persistable format. It explains the presence of OutputStream instance as a parameter. The latter method decodes, i.e. reads back the values from this persistable format to Java objects.

Default coder implementations are defined in the org.apache.beam.sdk.coders package. It contains the coders for the most of common Java objects: List, Map, Double, Long, Integer, String and so on. It's important to mention that the values are not encoded 1-to-1 with Java types. That said, even if Java's Long takes 8 bytes, in Apache Beam it can take a variable form and occupy between 1 and 10 bytes.

Other coder type concerns serializable objects and is represented as org.apache.beam.sdk.coders.SerializableCoder<T extends Serializable> class. Its use is the simplest way to deal with Java bean classes since it's automatically used when given class implements Serializable and has no custom coder defined.

Except encode and decode method, another important property exposed through Coder's API is the determinism. The coder is considered to be deterministic if it produces the same encoded representation of given Java object every time - even if it's called on different workers at different moments. The encoding determinism is used for instance by group by key transform. It's because the equality of keys is resolved there not by comparing Java's objects but by comparing their encoded bytes. And it's only possible if all coders involved in compared key construction are deterministic (= their representations are always the same).

Coder in pipeline

However, the coder itself doesn't exist in the processing pipeline. It's exposed through another object called CoderRegistry. As its name indicates, the object represents a registry with all coders that can be used in the processing.

But the coder is not created by the registry. The creation is delegated to another object called CoderProvider. Different providers are registered and it's the order of registration that determines what provider will be responsible for producing the coder for a particular class or type. In the first place are used providers registered explicitly. After them are used the providers related to common Java types. At the end of resolution chain are placed the providers registered automatically with ServiceLoader.

Among the implementations of CoderProvider we can distinguish: a provider for common Java types (CommonTypes, a provider for Java's serializable types (SerializableCoderProvider) or for Protobuf messages (ProtoCoderProvider).

The coder in the pipeline is resolved before the physical execution. It's used during the construction of transform hierarchy (read more about in the post TransformHierarchy in Apache Beam), when the outputs for each transforms are built.

Coders examples

The different cases mentioned above are represented in the following learning tests:

@Test
public void should_fail_when_not_deterministic_coder_is_defined_for_group_by_key_transform() {
  Pipeline pipeline = BeamFunctions.createPipeline("Not deterministic coder for group by key transform");
  CoderProvider stringCustomProvider = CoderProviders.fromStaticMethods(String.class,
    CustomStringNonDeterministicCoder.class);
  pipeline.getCoderRegistry().registerCoderProvider(stringCustomProvider);
  PCollection<KV<String, Integer>> lettersCollection = pipeline.apply(Create.of(
    KV.of("a", 1), KV.of("a", 1), KV.of("b", 2)));

  assertThatThrownBy(() -> {
    lettersCollection.apply(GroupByKey.create()).apply(Count.perKey());
    pipeline.run().waitUntilFinish();
  }).isInstanceOf(IllegalStateException.class)
    .hasMessage("the keyCoder of a GroupByKey must be deterministic");
}

@Test
public void should_override_default_coder_for_strings() throws ClassNotFoundException {
  Pipeline pipeline = BeamFunctions.createPipeline("String coder overridden");
  CoderProvider stringCustomProvider = CoderProviders.fromStaticMethods(String.class, CustomStringCoder.class);
  pipeline.getCoderRegistry().registerCoderProvider(stringCustomProvider);
  pipeline.apply(Create.of(Arrays.asList("a", "b", "c")));

  pipeline.run().waitUntilFinish();

  List<String> encodedWords = LETTERS.getTexts();
  System.out.println("texts were="+encodedWords);
  assertThat(encodedWords).contains("a", "b", "c", "a", "b", "a", "b", "c", "c");
}

@Test
public void should_use_correct_coder_for_java_bean() throws CannotProvideCoderException {
  Pipeline pipeline = BeamFunctions.createPipeline("Java bean coder");
  PCollection<Person> people = pipeline.apply(Create.of(Arrays.asList(Person.of("a", "1"), Person.of("b", "2"), Person.of("c", "3"))));

  Coder<Person> personCoder = people.getCoder();

  pipeline.run().waitUntilFinish();

  assertThat(personCoder).isInstanceOf(SerializableCoder.class);
}

@Test
public void should_fail_when_no_default_coder_can_be_found_for_java_bean() throws CannotProvideCoderException {
  Pipeline pipeline = BeamFunctions.createPipeline("Java bean PCollection without default coder");

  assertThatThrownBy(() ->  {
    pipeline.apply(Create.of(Arrays.asList(new PersonNoSerializable())));
    pipeline.run().waitUntilFinish();
  }).isInstanceOf(IllegalArgumentException.class).hasMessage("Unable to infer a coder and no Coder was specified. " +
    "Please set a coder by invoking Create.withCoder() explicitly.");
}

@Test
public void should_correctly_handle_collection() throws CannotProvideCoderException {
  Pipeline pipeline = BeamFunctions.createPipeline("Collection coder");
  PCollection<List<Integer>> numbers = pipeline.apply(Create.of(Arrays.asList(Arrays.asList(1, 2, 3),
    Arrays.asList(3, 4, 5))));
  Coder<List<Integer>> listCoder = numbers.getCoder();

  pipeline.run().waitUntilFinish();

  assertThat(listCoder).isInstanceOf(ListCoder.class);
  assertThat(listCoder.getCoderArguments().get(0)).isInstanceOf(VarIntCoder.class);
}

@Test
public void should_read_java_bean_with_custom_coder() {
  Pipeline pipeline = BeamFunctions.createPipeline("Custom Java bean coder");
  PCollection<PersonNoSerializable> peopleDataset = pipeline
          .apply(Create.of(Arrays.asList(PersonNoSerializable.of("fname2", "lname2")))
                  .withCoder(new CustomPersonNoSerializableCoder()));
  peopleDataset.apply(ParDo.of(new DoFn<PersonNoSerializable, String>() {
    @ProcessElement
    public void process(ProcessContext processContext) {
      String decodedPerson = processContext.element().getFirstName() + "_" + 
        processContext.element().getLastName();
      NOT_SERIALIZABLE_PEOPLE.addText(decodedPerson);
      processContext.output(decodedPerson);
    }
  }));

  pipeline.run().waitUntilFinish();

  assertThat(NOT_SERIALIZABLE_PEOPLE.getTexts()).hasSize(1)
    .containsOnly("fname2-write-read-write-read_lname2-write-read-write-read");
}

public static class CustomStringNonDeterministicCoder extends AtomicCoder<String> {

  public static CustomStringNonDeterministicCoder of() {
    return new CustomStringNonDeterministicCoder();
  }

  @Override
  public void encode(String text, OutputStream outStream) throws CoderException, IOException {
    byte[] textBytes = text.getBytes();
    outStream.write(textBytes);
  }

  @Override
  public String decode(InputStream inStream) throws IOException {
    return "";
  }

  @Override
  public void verifyDeterministic() throws NonDeterministicException {
    throw new NonDeterministicException(this, "Custom string coder is not deterministic");
  }
}

public static class CustomStringCoder extends AtomicCoder<String> {

  public static CustomStringCoder of() {
    return new CustomStringCoder();
  }

  @Override
  public void encode(String text, OutputStream outStream) throws IOException {
    byte[] textBytes = text.getBytes();
    LETTERS.addText(text);
    outStream.write(textBytes);
  }

  @Override
  public String decode(InputStream inStream) throws IOException {
    byte[] bytes = StreamUtils.getBytes(inStream);
    return new String(bytes, StandardCharsets.UTF_8);
  }
}

public static class Person implements Serializable {
  private String firstName;

  private String lastName;


  public String getFirstName() {
    return firstName;
  }

  public void setFirstName(String firstName) {
    this.firstName = firstName;
  }

  public String getLastName() {
    return lastName;
  }

  public void setLastName(String lastName) {
    this.lastName = lastName;
  }

  public static Person of(String firstName, String lastName) {
    Person person = new Person();
    person.firstName = firstName;
    person.lastName = lastName;
    return person;
  }
}

public static class PersonNoSerializable {
  private String firstName;
  private String lastName;

  public String getFirstName() {
    return firstName;
  }

  public void setFirstName(String firstName) {
    this.firstName = firstName;
  }

  public String getLastName() {
    return lastName;
  }

  public void setLastName(String lastName) {
    this.lastName = lastName;
  }

  public static PersonNoSerializable of(String firstName, String lastName) {
    PersonNoSerializable person = new PersonNoSerializable();
    person.firstName = firstName;
    person.lastName = lastName;
    return person;
  }
}

public static class CustomPersonNoSerializableCoder extends Coder<PersonNoSerializable> {

  private static final String NAMES_SEPARATOR = "_";

  @Override
  public void encode(PersonNoSerializable person, OutputStream outStream) throws IOException {
    // Dummy encoder, separates first and last names by _
    String serializablePerson = person.getFirstName()+"-write"+NAMES_SEPARATOR+person.getLastName()+"-write";
    outStream.write(serializablePerson.getBytes());
  }

  @Override
  public PersonNoSerializable decode(InputStream inStream) throws CoderException, IOException {
    String serializedPerson = new String(StreamUtils.getBytes(inStream));
    String[] names = serializedPerson.split(NAMES_SEPARATOR);
    return PersonNoSerializable.of(names[0]+"-read", names[1]+"-read");
  }

  @Override
  public List<? extends Coder<?>> getCoderArguments() {
    return Collections.emptyList();
  }

  @Override
  public void verifyDeterministic() throws NonDeterministicException {}
}


enum TextAccumulators {
  LETTERS, NOT_SERIALIZABLE_PEOPLE;

  private List<String> texts = new ArrayList<>();

  public void addText(String text) {
    texts.add(text);
  }

  public List<String> getTexts() {
    return texts;
  }
}

This post makes an insight of coders that are an important part of Beam's processing pipeline. They help to represent processed values in the format understandable by network and workers. The object making this transformation is a typed Coder implementation. Beam provides the coders for main Java types (long, string, double...) and offers also an universal serializable coder that can apply to all serializable objects. However, the coder doesn't interact itself with the pipeline. It does that through a CoderRegistry object. The registry stores all available coders that are resolved during transform hierarchy construction. However we'd keep in mind that an inefficient serialization can efficiently contribute to pipeline performance problems and instead of using custom coders we'd first try to use the provided ones that very often are optimized to the massive data processing.


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!