ACID file formats - writing: Apache Hudi

Versions: Apache Hudi 0.10.0

It's only when I was preparing the 2nd blog post of the series that I realized how bad my initial plan was. The article you're currently reading had been initially planned as the 6th of the series. But indeed, how could we understand more advanced features without discovering the writing path first?

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ contact@waitingforcode.com πŸ“©

Apart from changing the plans, I'm also changing the formula. I'll write one blog post about one ACID file format from my list, instead of describing the 3 ones in the single article. I realized that it forced me to skip some details that don't lower the learning curve. To keep the sense of the series, I'll try to publish the related blog posts weekly.

Tables, types, and config

This section is a good way to introduce different types of tables and operations in this framework. Let's start with the tables:

Regarding the operations:

These operations don't live alone. Some of them tightly couple to the Hudi configuration properties. Let's start with the easy ones:

And to complete the list, some of more advanced concepts:

Unfortunately, all the above are not the single writing-related options. These are the most important ones (subjectively speaking) I found in the DataSourceWriteOptions.

MoR vs. CoW tables

The difference between the CoW and MoR tables is where updated rows are going. CoW tables append these records to the data blocks while the MoR ones add them to the special delta log file.

That's the info. Regarding the implementation details, the magic happens in the HoodieAppendHandle class:

public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieWriteHandle<T, I, K, O> {

  private Writer createLogWriter(Option<FileSlice> fileSlice, String baseCommitTime)
      throws IOException, InterruptedException {
    Option<HoodieLogFile> latestLogFile = fileSlice.get().getLatestLogFile();

    return HoodieLogFormat.newWriterBuilder()
        .onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(), partitionPath))
        .withFileId(fileId).overBaseCommit(baseCommitTime)
        .withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
        .withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
        .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs)
        .withRolloverLogWriteToken(writeToken)
        .withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
  }

The class creates a log writer that adds a list of HoodieLogBlock to the file. The updates are buffered in memory and materialized whenever their size is bigger than the allowed block size specified in the hoodie.logfile.data.block.max.size configuration.

Row writer

The CoW vs. MoR difference is not the single Hudi writing specificity, though. Another one is the Row-based writer mentioned in the configuration section. When you perform a bulk insert with this optimization enabled, Apache Hudi will bypass the usual writing path and use this shortcut:


object HoodieSparkSqlWriter {
// ...
def bulkInsertAsRow(// ... 
{
val hoodieDF = if (populateMetaFields) {
  HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace,
    bulkInsertPartitionerRows, isGlobalIndex, dropPartitionColumns)
} else {
  HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsertWithoutMetaFields(df)
}
if (SPARK_VERSION.startsWith("2.")) {
  hoodieDF.write.format("org.apache.hudi.internal")
    .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
    .options(params)
    .mode(SaveMode.Append)
    .save()
} else if (SPARK_VERSION.startsWith("3.")) {
  hoodieDF.write.format("org.apache.hudi.spark3.internal")
    .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime)
    .option(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL)
    .options(params)
    .mode(SaveMode.Append)
    .save()

Wait, is it a shortcut and not a usual writing path? Yes, it is. Usually, Hudi writes the records with extra conversion steps:

  1. For each input row create an Apache Avro GenericRecord. In the end, it returns an RDD[GenericRecord].
  2. Map the GenericRecords to HoodieRecords. This step includes an optional deduplication (combination, hence the combine parameter in the config).
  3. Writes the record to the data files with one of the supported HoodieFileWriters (Parquet, ORC, HFile).

As you can notice, having these 3 steps for a bulk insert which is meant to be a big volume data ingestion, would be inefficient. According to the AWS benchmark, using Row-based bulk insert is 3 times faster than the classical writing path.

Balanced writes

The last thing that caught my attention is the files sizing. Apache Hudi tries to create balanced files to avoid bad input layout situations like data skew or small files. How? With a special construct called WorkloadProfile.

A WorkloadProfile is a class storing the load for each partition (number of insert and updates) and created directly before writing the input rows to the data blocks, here:

public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayload> // ...
{
  @Override
  public HoodieWriteMetadata> execute(JavaRDD> inputRecordsRDD) {
// ...
    WorkloadProfile profile = null;
    if (isWorkloadProfileNeeded()) {
      context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile");
      profile = new WorkloadProfile(buildProfile(inputRecordsRDD), operationType);
      LOG.info("Workload profile :" + profile);
      saveWorkloadProfileMetadataToInflight(profile, instantTime);
    }
// ...

The isWorkloadProfileNeeded conditional is a bit misleading in the current (0.10.0) version because it always returns true. The buildProfile method consists of running an RDD-based processing counting global and partition-based stats. Below, the Spark computation used in the method:

private Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {
  HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<>();
  WorkloadStat globalStat = new WorkloadStat();

  // group the records by partitionPath + currentLocation combination, count the number of
  // records in each partition
  Map<Tuple2<String, Option<HoodieRecordLocation>>, Long> partitionLocationCounts = inputRecordsRDD
      .mapToPair(record -> new Tuple2<>(
          new Tuple2<>(record.getPartitionPath(), Option.ofNullable(record.getCurrentLocation())), record))
      .countByKey();
// ..

The profiling is only one part of the file balancing feature. The other one is the partitioner. Just after creating the workload profiles, Hudi gets a corresponding Apache Spark partitioner. As of this writing, the default partitioner for both insert and upsert actions is UpsertPartitioner (CoW table), or SparkUpsertDeltaCommitPartitioner (MoR table). This partitioner is used as a part of partition(...) method where Hudi uses one of Apache Spark's repartitioning to evenly balance the records across the input partitions:

private JavaRDD<HoodieRecord<T>> partition(JavaRDD<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
JavaPairRDD<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> mappedRDD = dedupedRecords.mapToPair(
    record -> new Tuple2<>(new Tuple2<>(record.getKey(), Option.ofNullable(record.getCurrentLocation())), record));

JavaPairRDD<Tuple2<HoodieKey, Option<HoodieRecordLocation>>, HoodieRecord<T>> partitionedRDD;
// ...
if (table.requireSortedRecords()) {
// ...

    partitionedRDD = mappedRDD.repartitionAndSortWithinPartitions(partitioner, comparator);
  } else {
    // Partition only
    partitionedRDD = mappedRDD.partitionBy(partitioner);
  }

  return partitionedRDD.map(Tuple2::_2);
}

The distribution logic is not fully transparent to the end user who can configure the max file size in the hoodie.parquet.max.file.size property.

Even though I haven't planned to write about Apache Hudi (and Apache Iceberg, and Delta Lake) specifically, I'm satisfied with this direction changing. Hopefully by the end of the year I'll be better placed to understand what's going on with these 3 ACID file formats, and hence, explain it better in the blog post and videos. See you in the next part of the series, this time about writes in Apache Iceberg!