Nested data representation in Parquet

Versions: Parquet 1.9.0

Working with nested structures appears as a problem in column-oriented storage. However, thanks to Google's Dremel solution, this task can be solved efficiently.

This post explains the role of Dremel in Apache Parquet. The first part defines two important concepts in nested structures: repetition and definition levels. The second part shows some Parquet's internals about the storage of this type of data. The last part contains some learning tests showing how Parquet deals with nested structures.

Definition and repetition levels

To begin with simple statements, definition levels are used to define the number of optional fields in the path for given column. Repetition level identifies the repeated field in the path having repeated values. Both are used to reconstruct nested structures. Sounds unclear ? Let's go to the deeper explanation.

Both levels are added only for the columns having nested structures, i.e. the columns with the path longer than 1. For not nested columns the value for both levels is always 1 and there is no need to store this obvious information. To see what happens in definition level, let's take an example of below message representing a player that maybe has been playing some games last years:

message Player {
  optional group years {
    optional group game {
      optional string name
    }
  }
}

Now we identify the definition levels, that means the levels where the value is defined or not:

Message Definition level
"years": null 0
"years": {"game": null} 1
"years": {"game": {"name": null} } 2
"years": {"game": {"name": "some game"} } 3 (called actually defined - other values tell at which level the values are missing)

In the same structure, but having game as required field, the definition levels will be different:

Message Definition level
"years": null 0
"years": {"game": null} "game" can't be null, so the definition level is not needed
"years": {"game": {"name": null} } 1
"years": {"game": {"name": "some game"} } 2 (actually defined)

The goal of repetition levels is to store flags telling when the elements belonging to a new list start. For instance let's take similar schema as previously applied for message:

# schema
message Player {
  repeated group games {
    repeated string name
  }
}

# message
{
  {
    games: {
      name: "Fifa 99", name: "Fifa 2000"
    },
    games: {
      name: "Championship Manager 01/02"
    }
  },
  {
    games: {
      name: "Age of Empires 2"
    },
    games: {
      name: "Red Alert 2", name: "Fifa 2002"
    }
  }
}

We can easily notice that for above example the number of repetition level is 2: 0 defines every new record and creates new games and name list; 1 marks the creation of name list while 2 is used to define every new element in the list of name. The summary table for so defined repetition level could look like:

Value Repetition level
Fifa 99 0 (0 implies automatically the creation of 1 and 2)
Fifa 2000 2
Championship Manager 01/02 1 (1 implies automatically the creation of 2)
Age of Empires 2 0
Red Alert 2 1
Fifa 2002 2

The overhead of storing additional level information is low. For the levels up to 1 it's sufficient to use 1 bit (0 - for 0 level, 1 - the 1st level). For the levels up to 3, only 2 bits can be used (00 - 0, 01 - 1, 10 - 2, 11 - 3). The 7-length levels can be stored with only 3 bits (000 - 0, 001 - 1, 010 - 2, 011 - 3, 100 - 4, 101 - 5, 110 - 6 and 111 - 7).

Definition and repetition levels use

Fine, we've defined some levels in a table but how it's used by Parquet ? Let's dive into the code to understand when and how they're written. The information about the maximal levels is stored in org.apache.parquet.column.ColumnDescriptor class describing column with its type, path, repetition and definition levels.

The values of both levels are encoded with RLE (you can learn more about this method in the post about Encoding in Apache Parquet) and are written by 2 methods from org.apache.parquet.column.impl.ColumnWriterV2 class (repetitionLevel(int repetitionLevel) and definitionLevel(int definitionLevel)). Both methods are called before writing the value. However, they're not saved all together. Both levels and value are stored in 3 separate buffers concatenated at the end of writing inside org.apache.parquet.hadoop.ColumnChunkPageWriteStore.ColumnChunkPageWriter#writePageV2(int rowCount, int nullCount, int valueCount, BytesInput repetitionLevels, BytesInput definitionLevels, Encoding dataEncoding, BytesInput data, Statistics<?> statistics):

buf.collect(
    BytesInput.concat(
      BytesInput.from(tempOutputStream),
      repetitionLevels,
      definitionLevels,
      compressedData)
);

The information about repetition and definition levels is also stored in data page header. The header stores the information about the lengths of repetition and definition levels.

Nested objects example

We'll iterate over examples of nested objects representations with the help of the following classes representing gamers from the first section's examples:

public class Player {

  private List<Game> games;

  public List<Game> getGames() {
    return games;
  }

  public void setGames(List<Game> games) {
    this.games = games;
  }

  public static Player valueOf(Game...games) {
    Player player = new Player();
    List<Game> gamesList = Arrays.asList(games);
    player.setGames(gamesList);
    return player;
  }

}

public class PlayerNestedGames {
  private List<List<Game>> games;

  public List<List<Game>> getGames() {
    return games;
  }

  public void setGames(List<List<Game>> games) {
    this.games = games;
  }

  public static PlayerNestedGames valueOf(Game...games) {
    PlayerNestedGames player = new PlayerNestedGames();
    List<List<Game>> nestedGames = new ArrayList<>();
    List<Game> gamesList = Arrays.asList(games);
    nestedGames.add(gamesList);
    player.setGames(nestedGames);
    return player;
  }

}

public class Game {

  private String name;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public static Game ofName(String name) {
    Game game = new Game();
    game.name = name;
    return game;
  }
}

The schemas defined between subsequent tests are changed to better illustrate the influence of fields definition on definition and repetition levels (respectively DL and RL in the asserted output):

private static final String TEST_FILE_REQUIRED_FIELDS = "/tmp/nested_organization_d1_r1";

private static final Path TEST_FILE_REQUIRED_FIELDS_PATH = new Path(TEST_FILE_REQUIRED_FIELDS);

private static final String TEST_FILE_1_OPTIONAL_FIELD = "/tmp/nested_organization_d2_r1";

private static final Path TEST_FILE_1_OPTIONAL_FIELD_PATH = new Path(TEST_FILE_1_OPTIONAL_FIELD);

private static final String TEST_FILE_2_OPTIONAL_FIELDS = "/tmp/nested_organization_d3_r1";

private static final Path TEST_FILE_2_OPTIONAL_FIELDS_PATH = new Path(TEST_FILE_2_OPTIONAL_FIELDS);

private static final String TEST_FILE_2_LEVEL_NESTED_FIELDS = "/tmp/nested_organization_d1_r2";

private static final Path TEST_FILE_2_LEVEL_NESTED_FIELDS_PATH = new Path(TEST_FILE_2_LEVEL_NESTED_FIELDS);

@BeforeClass
@AfterClass
public static void prepareTests() {
  new File(TEST_FILE_REQUIRED_FIELDS).delete();
  new File(TEST_FILE_1_OPTIONAL_FIELD).delete();
  new File(TEST_FILE_2_OPTIONAL_FIELDS).delete();
  new File(TEST_FILE_2_LEVEL_NESTED_FIELDS).delete();
}

@Test
public void should_detect_1_repetition_level_and_1_definition_level_for_required_nested_fields() throws Exception {
  Schema schema = SchemaBuilder.builder().record("Player").namespace("com.waitingforcode.model").fields()
    .name("games").type().array().items()
    .record("Game").fields()
    .name("name").type().stringType().noDefault()
    .endRecord()
    .noDefault()
    .endRecord();
  List<Player> players = getPlayers(4);
  ParquetWriter<Player> writer = getWriter(schema, TEST_FILE_REQUIRED_FIELDS_PATH);
  writePlayers(players, writer);
  writer.close();

  String details = getDetailsForFile(TEST_FILE_REQUIRED_FIELDS);

  assertThat(details).contains("games.array.name TV=12 RL=1 DL=1");
}

@Test
public void should_detect_1_repetition_level_and_2_definition_levels_for_1_optional_nested_field() throws Exception {
  Schema schema = SchemaBuilder.builder().record("Player").namespace("com.waitingforcode.model").fields()
    .name("games").type().nullable().array().items()
    .record("Game").fields()
    .name("name").type().stringType().noDefault()
    .endRecord()
    .noDefault()
    .endRecord();
  List<Player> players = getPlayers(4);
  ParquetWriter<Player> writer = getWriter(schema, TEST_FILE_1_OPTIONAL_FIELD_PATH);
  writePlayers(players, writer);
  writer.close();

  String details = getDetailsForFile(TEST_FILE_1_OPTIONAL_FIELD);

  assertThat(details).contains("games.array.name TV=12 RL=1 DL=2");
}


@Test
public void should_detect_1_repetition_level_and_2_definition_levels_for_2_optional_nested_fields() throws Exception {
  Schema schema = SchemaBuilder.builder().record("Player").namespace("com.waitingforcode.model").fields()
    .name("games").type().nullable().array().items()
    .record("Game").fields()
    .name("name").type().nullable().stringType().noDefault()
    .endRecord()
    .noDefault()
    .endRecord();
  List<Player> players = getPlayers(4);
  ParquetWriter<Player> writer = getWriter(schema, TEST_FILE_2_OPTIONAL_FIELDS_PATH);
  writePlayers(players, writer);
  writer.close();

  String details = getDetailsForFile(TEST_FILE_2_OPTIONAL_FIELDS);

  assertThat(details).contains("games.array.name TV=12 RL=1 DL=3");
}

@Test
public void should_detect_2_repetition_levels_and_2_definition_levels_for_required_twice_nested_fields() throws Exception {
  List<PlayerNestedGames> players = getPlayersForNestedGame(4);
  ParquetWriter<PlayerNestedGames> writer = getWriter(ReflectData.get().getSchema(PlayerNestedGames.class),
    TEST_FILE_2_LEVEL_NESTED_FIELDS_PATH);
  writePlayers(players, writer);
  writer.close();

  String details = getDetailsForFile(TEST_FILE_2_LEVEL_NESTED_FIELDS);

  assertThat(details).contains("games.array.array.name TV=12 RL=2 DL=2");
}

private static <T> ParquetWriter<T> getWriter(Schema schema, Path filePath) throws IOException {
  return AvroParquetWriter.<T>builder(filePath)
    .enableDictionaryEncoding()
    .withSchema(schema)
    .withDataModel(ReflectData.get())
    .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
    .build();
}

private static String getDetailsForFile(String fileName) throws Exception {
  GnuParser gnuParser = new GnuParser();
  ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
  PrintStream printStream = new PrintStream(byteOutputStream);
  Main.out = printStream;
  DumpCommand dumpCommand = new DumpCommand();
  CommandLine commandLineJsonOutput = gnuParser.parse(new Options(), new String[]{fileName});
  dumpCommand.execute(commandLineJsonOutput);
  return new String(byteOutputStream.toByteArray());
}

private static <T> void writePlayers(List<T> players, ParquetWriter<T> writer) throws IOException {
  for (int i = 0; i < players.size(); i++) {
    writer.write(players.get(i));
  }
}

private static List<Player> getPlayers(int playersNumber) {
  List<Player> players = new ArrayList<>();
  for (int i = 0; i < playersNumber; i++) {
    Player player = Player.valueOf(
      Game.ofName("Game#1_"+i), Game.ofName("Game#2_"+i), Game.ofName("Game#3_"+i)
    );
    players.add(player);
  }
  return players;
}

private static List<PlayerNestedGames> getPlayersForNestedGame(int playersNumber) {
  List<PlayerNestedGames> players = new ArrayList<>();
  for (int i = 0; i < playersNumber; i++) {
    PlayerNestedGames player = PlayerNestedGames.valueOf(
      Game.ofName("Game#1_"+i), Game.ofName("Game#2_"+i), Game.ofName("Game#3_"+i)
    );
    players.add(player);
  }
  return players;
}

Parquet stores nested structures thanks to structures called repetition and definition levels. The first one is used to determine when a new sequence of repeated data begins. The second type is used to determine how many empty values can be stored in the column. As we could see in the section section, the levels are saved in separate buffers concatenated with data when the data page is written.