Serialization and deserialization with schemas in Apache Avro

on waitingforcode.com

Serialization and deserialization with schemas in Apache Avro

After theoretical introduction to Apache Avro, we can see how it can be used.

This time, the article focuses on practical use of Apache Avro. But before starting to write code with Java API, we should explain some points about schemas definition.

Schema definition in Apache Avro

Schema is in the center of all in Apache Avro. An example of schema in JSON can look like:

{"namespace": "com.waitingforcode.model", "type": "record", "name": "Club",
    "fields": [
        {"name": "full_name", "type": "string", "aliases": ["fullName"]},
        {"name": "foundation_year", "type": "int", "aliases": ["foundationYear"]},
        {"name": "disappearance_year", "type": ["null", "int"], "aliases": ["disappearanceYear"]}
    ]
}

As you deduce, it represents a football club, identified by name, foundation and disappearance year. Because a club can disappear or can still exist, we must use a flexible structure to handle this information. In Avro, this structure is called union and in our case the field can have assigned null (no value) or an integer value. Unions are one of supported complex types which includes also: records, enums, arrays, maps and fixed. Our example contains also another complex type - record. This types describes stored structure and could be assimilated to Java class.

Except complex types, schemas allow also a definition of fields being a primitive types. Our schema contains two of them: string and int. As Java, Avro supports some other primitive types: boolean, long, float, double or bytes.

Aliases, also defined in the schema, are arrays providing alternate names for schema. So, schema's field "full_name" can be also manipulated as "fullName". Aliases can also be used fields are renamed. Previously described schema is a schema used to write data. Schema corresponding to reading step (suppose that Avro doesn't generate Java classes) looks like (only fields part is different):

"fields": [
        {"name": "fullName", "type": "string", "aliases": ["full_name"]},
        {"name": "foundationYear", "type": "int", "aliases": ["foundation_year"]},
        {"name": "disappearanceYear", "type": ["int", "null"], "aliases": ["disappearance_year"]}
    ]

For object looking like:

public class Club {

  private String fullName;
  private int foundationYear;
  private Integer disappearanceYear;
  // getters and setters omitted here
}

Schema contains also an information about type. The type can be identified by namespace, type and name attributes. In our specific Java case, they correspond to package and class name of object represented by the schema.

Serialization and deserialization example in Apache Avro

After this short introduction to schemas (Apache Avro documentation is better source for deep understanding), we can see serialization and deserialization in action. All tests are based on 2 previously presented schemas. One additional schema appears - it's the same as the second presented schema but its namespace refers to not-existent Java package (com.waitingforcode.bean). Below you can find simple some serialization cases:

public class ClubSerializationTest {

  private static Schema CLUB_SCHEMA_WRITE;
  private static final String RC_LENS_AVRO = "rc_lens.avro";
  private static final String MATRA_AVRO = "matra_racing.avro";

  @BeforeClass
  public static void initClubSchema() throws IOException {
    CLUB_SCHEMA_WRITE = new Schema.Parser().parse(
      new File(ClubSerializationTest.class.getClassLoader().getResource("schemas/club.json").getFile())
    );
  }

  @Test
  public void should_correctly_serialize_disappeared_club() throws IOException {
    GenericRecord rcLens = new GenericData.Record(CLUB_SCHEMA_WRITE);
    rcLens.put("full_name", "RC Lens");
    rcLens.put("foundation_year", 1906);
    // null value doesn't need to be specified

    // Serialize first and deserialize after
    File outputClubsFile = File.createTempFile(RC_LENS_AVRO, "");
    DatumWriter<GenericRecord> datumWriter =  
      new GenericDatumWriter<>(CLUB_SCHEMA_WRITE);
    DataFileWriter<GenericRecord> dataFileWriter =  
      new DataFileWriter<>(datumWriter);
    dataFileWriter.create(CLUB_SCHEMA_WRITE, outputClubsFile);
    dataFileWriter.append(rcLens);
    dataFileWriter.close();
    DatumReader<GenericRecord> datumReader =  
      new GenericDatumReader<>(CLUB_SCHEMA_WRITE);
    DataFileReader<GenericRecord> dataFileReader =  
      new DataFileReader<>(outputClubsFile, datumReader);

    assertThat(dataFileReader.hasNext()).isTrue();
    assertThat(dataFileReader.next(null).toString())
      .isEqualTo("{\"full_name\": \"RC Lens\", \"foundation_year\": 1906, \"disappearance_year\": null}");
  }

  @Test
  public void should_fail_on_flushing_serialization_without_close_call() throws IOException {
    GenericRecord racingMatra = new GenericData.Record(CLUB_SCHEMA_WRITE);
    racingMatra.put("full_name", "Matra Racing");
    racingMatra.put("foundation_year", 1987);
    racingMatra.put("disappearance_year", 1989);

    File outputClubsFile = File.createTempFile(MATRA_AVRO, "");
    DatumWriter<GenericRecord> datumWriter =  
      new GenericDatumWriter<>(CLUB_SCHEMA_WRITE);
    DataFileWriter<GenericRecord> dataFileWriter =  
      new DataFileWriter<>(datumWriter);
    dataFileWriter.create(CLUB_SCHEMA_WRITE, outputClubsFile);
    dataFileWriter.append(racingMatra);
    // no flush() call
    DatumReader<GenericRecord> datumReader =  
      new GenericDatumReader<GenericRecord>(CLUB_SCHEMA_WRITE);
    DataFileReader<GenericRecord> dataFileReader =  
      new DataFileReader<GenericRecord>(outputClubsFile, datumReader);

    assertThat(dataFileReader.hasNext()).isFalse();
    dataFileWriter.close();
  }

  @Test
  public void should_correctly_serialize_club_with_flush() throws IOException {
    GenericRecord racingMatra = new GenericData.Record(CLUB_SCHEMA_WRITE);
    racingMatra.put("full_name", "Matra Racing");
    racingMatra.put("foundation_year", 1987);
    racingMatra.put("disappearance_year", 1989);

    File outputClubsFile = File.createTempFile(MATRA_AVRO, "");
    DatumWriter<GenericRecord> datumWriter =  
      new GenericDatumWriter<>(CLUB_SCHEMA_WRITE);
    DataFileWriter<GenericRecord> dataFileWriter =  
      new DataFileWriter<>(datumWriter);
    dataFileWriter.create(CLUB_SCHEMA_WRITE, outputClubsFile);
    dataFileWriter.append(racingMatra);
    dataFileWriter.flush();
    DatumReader<GenericRecord> datumReader =  
      new GenericDatumReader<>(CLUB_SCHEMA_WRITE);
    DataFileReader<GenericRecord> dataFileReader =  
      new DataFileReader<>(outputClubsFile, datumReader);

    assertThat(dataFileReader.hasNext()).isTrue();
    dataFileWriter.close();
  }

  @Test
  public void should_detect_that_json_with_one_entry_is_smaller_than_avro_with_one_item() throws IOException {
    File jsonFile =
        new File(ClubSerializationTest.class.getClassLoader().getResource("output/single_rc_lens.json").getFile());

    GenericRecord rcLens = new GenericData.Record(CLUB_SCHEMA_WRITE);
    rcLens.put("full_name", "RC Lens");
    rcLens.put("foundation_year", 1906);

    // Serialize first and deserialize after
    File outputClubsFile = File.createTempFile(RC_LENS_AVRO, "");
    DatumWriter<GenericRecord> datumWriter =  
      new GenericDatumWriter<>(CLUB_SCHEMA_WRITE);
    DataFileWriter<GenericRecord> dataFileWriter =  
      new DataFileWriter<>(datumWriter);
    dataFileWriter.create(CLUB_SCHEMA_WRITE, outputClubsFile);
    dataFileWriter.append(rcLens);
    dataFileWriter.close();

    System.out.println("Avro file length="+outputClubsFile.length());
    System.out.println("JSON file length="+jsonFile.length());
    // Because Avro stores writer's mapping, it's logically bigger than
    // plain JSON file storing only key-value pairs
    // Expected Avro size is 359 while JSON only 77 bytes
    assertThat(outputClubsFile.length()).isGreaterThan(jsonFile.length());
  }

  @Test
  public void should_detect_that_json_with_6_lines_is_bigger_than_avro_with_the_same_numer_of_items() throws IOException {
    // Since Avro doesn't repeat field names in saved file, from specific point of serialized items,
    // it occupies less place than, for example, original JSON files
    File jsonFile =
      new File(ClubSerializationTest.class.getClassLoader().getResource("output/6_teams.json").getFile());

    // Serialize first and deserialize after
    File outputClubsFile = File.createTempFile(RC_LENS_AVRO, "");
    DatumWriter<GenericRecord> datumWriter =  
      new GenericDatumWriter<>(CLUB_SCHEMA_WRITE);
    DataFileWriter<GenericRecord> dataFileWriter =  
      new DataFileWriter<>(datumWriter);
    dataFileWriter.create(CLUB_SCHEMA_WRITE, outputClubsFile);
    createTeams(6).forEach(team -> append(dataFileWriter, team));
    dataFileWriter.close();

    System.out.println("Avro file length="+outputClubsFile.length());
    System.out.println("JSON file length="+jsonFile.length());
    // The difference should be quite important, of order of 11 bytes; 437 for Avro, 494 for JSON
    assertThat(jsonFile.length()).isGreaterThan(outputClubsFile.length());
  }

  @Test
  public void should_detect_that_5_lines_in_json_is_smaller_than_in_avro() throws IOException {
    File jsonFile =
        new File(ClubSerializationTest.class.getClassLoader().getResource("output/5_teams.json").getFile());
    // Serialize first and deserialize after
    File outputClubsFile = File.createTempFile(RC_LENS_AVRO, "");
    DatumWriter<GenericRecord> datumWriter = 
      new GenericDatumWriter<>(CLUB_SCHEMA_WRITE);
    DataFileWriter<GenericRecord> dataFileWriter =  
      new DataFileWriter<>(datumWriter);
    dataFileWriter.create(CLUB_SCHEMA_WRITE, outputClubsFile);
    createTeams(5).forEach(team -> append(dataFileWriter, team));
    dataFileWriter.close();

    System.out.println("Avro file length="+outputClubsFile.length());
    System.out.println("JSON file length="+jsonFile.length());
    // The difference should be small, of order of 11 bytes; 422 for Avro, 411 for JSON
    assertThat(outputClubsFile.length()).isGreaterThan(jsonFile.length());
  }

  private void append(DataFileWriter<GenericRecord> dataFileWriter, GenericRecord record) {
    try {
      dataFileWriter.append(record);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  private List<GenericRecord> createTeams(int number) {
    List<GenericRecord> teams = new ArrayList<>(number);
    GenericRecord rcLens = new GenericData.Record(CLUB_SCHEMA_WRITE);
    rcLens.put("full_name", "RC Lens");
    rcLens.put("foundation_year", 1906);
    teams.add(rcLens);
    GenericRecord lilleOsc = new GenericData.Record(CLUB_SCHEMA_WRITE);
    lilleOsc.put("full_name", "Lille OSC");
    lilleOsc.put("foundation_year", 1944);
    teams.add(lilleOsc);
    GenericRecord matraRacig = new GenericData.Record(CLUB_SCHEMA_WRITE);
    matraRacig.put("full_name", "Matra Racing");
    matraRacig.put("foundation_year", 1987);
    matraRacig.put("disappearance_year", 1989);
    teams.add(matraRacig);
    GenericRecord usValenciennes = new GenericData.Record(CLUB_SCHEMA_WRITE);
    usValenciennes.put("full_name", "US Valenciennes");
    usValenciennes.put("foundation_year", 1913);
    teams.add(usValenciennes);
    GenericRecord parisSg = new GenericData.Record(CLUB_SCHEMA_WRITE);
    parisSg.put("full_name", "Paris-SG");
    parisSg.put("foundation_year", 1970);
    teams.add(parisSg);

    if (number == 6) {
      GenericRecord redStart = new GenericData.Record(CLUB_SCHEMA_WRITE);
      redStart.put("full_name", "Red Star 93");
      redStart.put("foundation_year", 1897);
      teams.add(redStart);
    }
    return teams;
  } 
}

And tests for Avro deserialization looks like:

public class ClubDeserializationTest {

  private static Schema CLUB_SCHEMA_WRITE;
  private static Schema CLUB_SCHEMA_READ;
  private static Schema CLUB_BAD_SCHEMA_READ;
  private static final String RC_LENS_AVRO = "rc_lens.avro";
  private static final String TWO_TEAMS_AVRO = "2_teams.avro";

  @BeforeClass
  public static void setupContext() throws IOException {
    CLUB_SCHEMA_WRITE = new Schema.Parser().parse(
      new File(ClubSerializationTest.class.getClassLoader().getResource("schemas/club.json").getFile())
    );
    CLUB_SCHEMA_READ = new Schema.Parser().parse(
      new File(ClubSerializationTest.class.getClassLoader().getResource("schemas/club_read.json").getFile())
    );
    CLUB_BAD_SCHEMA_READ = new Schema.Parser().parse(
      new File(ClubSerializationTest.class.getClassLoader().getResource("schemas/club_bad_package.json").getFile())
    );
    GenericRecord rcLens = new GenericData.Record(CLUB_SCHEMA_WRITE);
    rcLens.put("full_name", "RC Lens");
    rcLens.put("foundation_year", 1906);
    GenericRecord lilleOsc = new GenericData.Record(CLUB_SCHEMA_WRITE);
    lilleOsc.put("full_name", "Lille OSC");
    lilleOsc.put("foundation_year", 1944);

    File rcLensFile = new File(RC_LENS_AVRO);
    DatumWriter<GenericRecord> datumWriter = 
      new GenericDatumWriter<>(CLUB_SCHEMA_WRITE);
    DataFileWriter<GenericRecord> dataFileWriter =
      new DataFileWriter<>(datumWriter);
    dataFileWriter.create(CLUB_SCHEMA_WRITE, rcLensFile);
    dataFileWriter.append(rcLens);
    dataFileWriter.close();

    File twoTeamsFile = new File(TWO_TEAMS_AVRO);
    dataFileWriter.create(CLUB_SCHEMA_WRITE, twoTeamsFile);
    dataFileWriter.append(rcLens);
    dataFileWriter.append(lilleOsc);
    dataFileWriter.close();

    rcLensFile.deleteOnExit();
    twoTeamsFile.deleteOnExit();
  }

  @Test
  public void should_correctly_deserialize_club_with_reader_schema() throws IOException {
    File outputClubsFile = new File(RC_LENS_AVRO);

    DatumReader<Club> datumReader = 
      new ReflectDatumReader<>(CLUB_SCHEMA_READ);
    DataFileReader<Club> dataFileReader = 
      new DataFileReader<>(outputClubsFile, datumReader);

    assertThat(dataFileReader.hasNext()).isTrue();
    Club deserializedClub = dataFileReader.next(new Club());
    assertThat(deserializedClub.getFullName()).isEqualTo("RC Lens");
    assertThat(deserializedClub.getDisappearanceYear()).isNull();
    assertThat(deserializedClub.getFoundationYear()).isEqualTo(1906);
  }

  @Test
  public void should_correctly_deserialize_two_clubs() throws IOException {
    File outputClubsFile = new File(TWO_TEAMS_AVRO);

    DatumReader<Club> datumReader = 
      new ReflectDatumReader<>(CLUB_SCHEMA_READ);
    DataFileReader<Club> dataFileReader = 
      new DataFileReader<>(outputClubsFile, datumReader);

    assertThat(dataFileReader.hasNext()).isTrue();
    Club rcLens = dataFileReader.next(new Club());
    Club lilleOsc = dataFileReader.next(new Club());
    assertThat(rcLens.getFullName()).isEqualTo("RC Lens");
    assertThat(rcLens.getDisappearanceYear()).isNull();
    assertThat(rcLens.getFoundationYear()).isEqualTo(1906);
    assertThat(lilleOsc.getFullName()).isEqualTo("Lille OSC");
    assertThat(lilleOsc.getDisappearanceYear()).isNull();
    assertThat(lilleOsc.getFoundationYear()).isEqualTo(1944);
  }

  @Test(expected = NullPointerException.class)
  public void should_fail_deserializing_with_writer_schema() throws IOException {
    // NPE is caused by the fact that reader schema saves object with different names
    // that the names used by class fields (full_name instead of fullName etc)
    // ReflectDatumReader will cause an exception of similar content:
    /**
      * java.lang.NullPointerException
      *      at org.apache.avro.reflect.ReflectData.setField(ReflectData.java:138)
      *      at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
      *      at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:310)
      *      (...)
      */
    File outputClubsFile = new File(RC_LENS_AVRO);

    DatumReader<Club> datumReader = 
      new ReflectDatumReader<>(CLUB_SCHEMA_WRITE);
    DataFileReader<Club> dataFileReader = 
      new DataFileReader<>(outputClubsFile, datumReader);

    dataFileReader.hasNext();
    dataFileReader.next();
  }

  @Test(expected = AvroTypeException.class)
  public void should_fail_on_deserializing_because_java_class_doesn_t_match_with_fields() throws IOException {
    // other way to get Schema is:
    Schema schema = ReflectData.get().getSchema(Club.class);
    // However, it doesn't know aliases, so the conversion will fail with exceptions similar to
    /**
      * org.apache.avro.AvroTypeException: Found com.waitingforcode.model.Club,
      * expecting com.waitingforcode.model.Club, missing required field fullName
      *      at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
      *      at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
      *  at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130)
      */
    File outputClubsFile = new File(RC_LENS_AVRO);

    DatumReader<Club> datumReader = 
      new ReflectDatumReader<>(schema);
    DataFileReader<Club> dataFileReader = 
      new DataFileReader<>(outputClubsFile, datumReader);

    assertThat(dataFileReader.hasNext()).isTrue();
    Club deserializedClub = dataFileReader.next(new Club());
    assertThat(deserializedClub.getFullName()).isEqualTo("RC Lens");
    assertThat(deserializedClub.getDisappearanceYear()).isNull();
    assertThat(deserializedClub.getFoundationYear()).isEqualTo(1906);
  }

  @Test
  public void should_deserialize_schema_for_generic_data() throws IOException {
    File outputClubsFile = new File(RC_LENS_AVRO);

    DatumReader<GenericRecord> datumReader = 
      new GenericDatumReader<>(CLUB_SCHEMA_WRITE);
    DataFileReader<GenericRecord> dataFileReader = 
      new DataFileReader<>(outputClubsFile, datumReader);

    assertThat(dataFileReader.hasNext()).isTrue();
    GenericRecord record = dataFileReader.next();
    // Notice that textual data stored in serialized files are not Java's String but
    // Avro's Utf8 objects. The difference between them is that Utf8 is mutable
    // and it's more efficient on reading and writing
    assertThat(((Utf8)record.get("full_name")).toString()).isEqualTo("RC Lens");
    assertThat((int)record.get("foundation_year")).isEqualTo(1906);
    assertThat(record.get("disappearance_year")).isNull();
    // Aliases are useful in the case of reflection-based readers. But they cannot
    // be used as aliases for accessing fields through GenericRecord
    assertThat(record.get("fullName")).isNull();
  }

  @Test
  public void should_correctly_parse_manually_defined_json_with_specified_union_field() throws IOException {
    // Note that for fields of union type, corresponding JSON is nested object composed by type and value
    String badJsonFormat = "{\"fullName\": \"Old club\", \"foundationYear\": 1900, \"disappearanceYear\": {\"int\": 1990}}";
    ByteArrayInputStream inputStream = new ByteArrayInputStream(badJsonFormat.getBytes());

    JsonDecoder decoder = 
      new DecoderFactory().jsonDecoder(CLUB_SCHEMA_READ, inputStream);
    DatumReader<Club> reader = 
      new ReflectDatumReader<>(CLUB_SCHEMA_READ);
    Club club = reader.read(null, decoder);

    assertThat(club.getFullName()).isEqualTo("Old club");
    assertThat(club.getFoundationYear()).isEqualTo(1900);
    assertThat(club.getDisappearanceYear()).isEqualTo(1990);
  }

  @Test
  public void should_correctly_parse_manually_defined_json_with_null_union_value() throws IOException {
    String badJsonFormat = "{\"fullName\": \"Old club\", \"foundationYear\": 1900, \"disappearanceYear\": null}";
    ByteArrayInputStream inputStream = new ByteArrayInputStream(badJsonFormat.getBytes());

    JsonDecoder decoder = 
      new DecoderFactory().jsonDecoder(CLUB_SCHEMA_READ, inputStream);
    DatumReader<Club> reader = 
      new ReflectDatumReader<>(CLUB_SCHEMA_READ);
    Club club = reader.read(null, decoder);

    assertThat(club.getFullName()).isEqualTo("Old club");
    assertThat(club.getFoundationYear()).isEqualTo(1900);
    assertThat(club.getDisappearanceYear()).isNull();
  }

  @Test(expected = AvroTypeException.class)
  public void should_fail_on_reading_badly_defined_union_field_of_json() throws IOException {
    // Union field is bad defined here, the parse will fail - nested object is expected
    String badJsonFormat = "{\"fullName\": \"Old club\", \"foundationYear\": 1900, \"disappearanceYear\": 1990}";
    ByteArrayInputStream inputStream = new ByteArrayInputStream(badJsonFormat.getBytes());

    JsonDecoder decoder = 
      new DecoderFactory().jsonDecoder(CLUB_SCHEMA_READ, inputStream);
    DatumReader<Club> reader = 
      new ReflectDatumReader<>(CLUB_SCHEMA_READ);
    reader.read(null, decoder);
  }

  @Test(expected = AvroTypeException.class)
  public void should_fail_on_converting_schema_with_bad_package() throws IOException {
    File outputClubsFile = new File(RC_LENS_AVRO);

    DatumReader<Club> datumReader = 
      new ReflectDatumReader<>(CLUB_BAD_SCHEMA_READ);
    DataFileReader<Club> dataFileReader = 
      new DataFileReader<>(outputClubsFile, datumReader);

    dataFileReader.next(new Club());
  }

  @Test(expected = AvroTypeException.class)
  public void should_also_fail_on_converting_generic_record_from_bad_package() throws IOException {
    File outputClubsFile = new File(RC_LENS_AVRO);

    DatumReader<GenericRecord> datumReader = 
      new GenericDatumReader<>(CLUB_BAD_SCHEMA_READ);
    DataFileReader<GenericRecord> dataFileReader = 
      new DataFileReader<>(outputClubsFile, datumReader);

    assertThat(dataFileReader.hasNext()).isTrue();
    dataFileReader.next();
  }

}

The article introduces some key information about Apache Avro schemas. It presents sample schema with two family types: complex and primivites. In the second part it uses the schema for serialization and deserialization in Java.

Share on: