Data storage in Apache Parquet

Versions: Parquet 1.9.0

Previously we focused on types available in Parquet. This time we can move forward and analyze how the framework stores the data in the files.

This post concentrates on the organization of Parquet files. It's divided in 2 theoretical sections. The first one present the structure of the file with a special zoom on metadata. The second section talks about row pages and column chunks, namely the parts storing data physically. The very last part of this post gives some code examples about Parquet files internals.

File organization

Parquet file is composed of several different parts. It begins by magic bytes number. This marker is mainly used to check if the file is really the file written in Parquet format. We can see plenty of checks using this information in ParquetFileReader class:

int FOOTER_LENGTH_SIZE = 4;
if (fileLen < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
  throw new RuntimeException(filePath + " is not a Parquet file (too small)");
}

The next layer affects row groups, column chunks and pages data that will be described more in details in subsequent sections. For now simply remember that each row group includes column chunks and each column chunk includes data pages.

At the end of the file we retrieve footer. It contains plenty of useful information to read and process Parquet files efficiently called file metadata, such as:

Data storage in the file

The row group is a structure with the data. To see its content, let's go to the org.apache.parquet.hadoop.InternalParquetRecordWriter#flushRowGroupToStore invoked every time when the threshold of bufferized data is reached:

 if (recordCount > 0) {
  parquetFileWriter.startBlock(recordCount);
  columnStore.flush();
  pageStore.flushToFileWriter(parquetFileWriter);
  recordCount = 0;
  parquetFileWriter.endBlock();
  this.nextRowGroupSize = Math.min(
      parquetFileWriter.getNextRowGroupSize(),
      rowGroupSizeThreshold);
}

As you already know, the row group contains the column chunks that in their turn contain the pages. The pages are written with org.apache.parquet.column.page.PageWriter#writePageV2 method (or without V2 if it's the 1st version):

parquetMetadataConverter.writeDataPageV2Header(
  uncompressedSize, compressedSize,
  valueCount, nullCount, rowCount,
  statistics,
  dataEncoding,
  rlByteLength,
  dlByteLength,
  tempOutputStream);

buf.collect(
    BytesInput.concat(
      BytesInput.from(tempOutputStream),
      repetitionLevels,
      definitionLevels,
      compressedData)
);

By analyzing what this method does we can find that the following information is written to the pages:

In brief we could summarize Parquet (simplified) file format as an image:

Parquet file internals by example

Below tests show some storage details explained in previous sections:

 
private static final String TEST_FILE = "/tmp/file_organization";

private static final Path TEST_FILE_PATH = new Path(TEST_FILE);

private static final Schema AVRO_SCHEMA = new Schema.Parser().parse("{\"type\":\"record\", \"name\":\"WorkingCitizen\"," +
  "\"namespace\":\"com.waitingforcode.model\", \"fields\":[" +
      "{\"name\":\"professionalSkills\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}," +
      "{\"name\":\"professionsPerYear\",\"type\":{\"type\":\"map\",\"values\":\"string\"}}," +
      "{\"name\":\"civility\",\"type\":{\"type\":\"enum\",\"name\":\"Civilities\"," +
          "\"symbols\":[\"MR\",\"MS\",\"MISS\",\"MRS\"]}}," +
      "{\"name\":\"firstName\",\"type\":\"string\"}," +
      "{\"name\":\"lastName\",\"type\":\"string\"}," +
      "{\"name\":\"creditRating\",\"type\":\"double\"}," +
      "{\"name\":\"isParent\",\"type\":\"boolean\"}]" +
  "}");

@BeforeClass
public static void createContext() throws IOException {
  new File(TEST_FILE).delete();
  WorkingCitizen workingCitizen1 = getSampleWorkingCitizen(Civilities.MISS);
  WorkingCitizen workingCitizen2 = getSampleWorkingCitizen(Civilities.MR);
  ParquetWriter<WorkingCitizen> writer = AvroParquetWriter.<WorkingCitizen>builder(TEST_FILE_PATH)
    .enableDictionaryEncoding()
    .withSchema(AVRO_SCHEMA)
    .withDataModel(ReflectData.get())
    .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
    .build();
  writer.write(workingCitizen1);
  writer.write(workingCitizen2);
  writer.close();
}

@AfterClass
public static void deleteFile() {
  new File(TEST_FILE).delete();
}

@Test
public void should_get_correct_row_group_information() throws IOException {
  ParquetFileReader fileReader = ParquetFileReader.open(new Configuration(), TEST_FILE_PATH);

  List<BlockMetaData> rowGroups = fileReader.getRowGroups();

  assertThat(rowGroups).hasSize(1);
  BlockMetaData rowGroup = rowGroups.get(0);
  // We test only against several fields
  ColumnChunkMetaData civility = getMetadataForColumn(rowGroup, "civility");
  // It varies, sometimes it's 352, 356 or 353 - so do not assert on it
  // Only show that the property exists
  long offset = civility.getFirstDataPageOffset();
  assertThat(civility.getFirstDataPageOffset()).isEqualTo(offset);
  assertThat(civility.getStartingPos()).isEqualTo(offset);
  assertThat(civility.getCodec()).isEqualTo(CompressionCodecName.UNCOMPRESSED);
  assertThat(civility.getEncodings()).contains(Encoding.DELTA_BYTE_ARRAY);
  assertThat(civility.getValueCount()).isEqualTo(2);
  assertThat(civility.getTotalSize()).isEqualTo(60L);
  assertThat(civility.getType()).isEqualTo(PrimitiveType.PrimitiveTypeName.BINARY);
  // CHeck credit rating to see stats
  ColumnChunkMetaData creditRating = getMetadataForColumn(rowGroup, "creditRating");
  assertThat(creditRating.getStatistics()).isNotNull();
  // Both have random values, so no to assert on exact values
  assertThat(creditRating.getStatistics().genericGetMax()).isNotNull();
  assertThat(creditRating.getStatistics().genericGetMin()).isNotNull();
  assertThat(creditRating.getStatistics().hasNonNullValue()).isTrue();
  assertThat(creditRating.getStatistics().getNumNulls()).isEqualTo(0);
}

private ColumnChunkMetaData getMetadataForColumn(BlockMetaData rowGroup, String columnName) {
  return rowGroup.getColumns().stream()
    .filter(columnChunkMetaData -> columnChunkMetaData.getPath().toDotString().contains(columnName))
    .findFirst().get();
}

@Test
public void should_read_footer_of_correctly_written_file() throws IOException, URISyntaxException {
  ParquetFileReader fileReader = ParquetFileReader.open(new Configuration(), TEST_FILE_PATH);
  ParquetMetadata footer = fileReader.getFooter();

  org.apache.parquet.hadoop.metadata.FileMetaData footerMetadata = footer.getFileMetaData();

  assertThat(footerMetadata.getKeyValueMetaData()).containsKeys("parquet.avro.schema", "writer.model.name");
  assertThat(footerMetadata.getCreatedBy())
    .isEqualTo("parquet-mr version 1.9.0 (build 38262e2c80015d0935dad20f8e18f2d6f9fbd03c)");
  StringBuilder schemaStringifier = new StringBuilder();
  footerMetadata.getSchema().writeToStringBuilder(schemaStringifier, "");
  assertThat(schemaStringifier.toString().replaceAll("\n", "").replaceAll("  ", "")).isEqualTo(
    "message com.waitingforcode.model.WorkingCitizen {" +
        "required group professionalSkills (LIST) {"+
            "repeated binary array (UTF8);"+
        "}"+
        "required group professionsPerYear (MAP) {"+
            "repeated group map (MAP_KEY_VALUE) {"+
                "required binary key (UTF8);"+
                "required binary value (UTF8);"+
            "}"+
        "}"+
        "required binary civility (ENUM);"+
        "required binary firstName (UTF8);"+
        "required binary lastName (UTF8);"+
        "required double creditRating;"+
        "required boolean isParent;"+
    "}");
}

The post explained data organization in the Parquet files. The first section focused on file metadata contained in the footer. As we could see, this part groups all information describing columns, such as offsets, encodings or used compression. The next part detailed better how the data is stored in pages that are included in column chunks that in their turn are included in row pages. The last section shown some tests proving what is stored in Parquet footer.

If you liked it, you should read: