Table file formats - reading path: Apache Hudi

Versions: Apache Hudi 0.12.0

After Delta Lake and Apache Iceberg it's time to see the reading part of Apache Hudi. Despite an apparent similarity with the aforementioned table formats, Apache Hudi has an interesting reading specificity related to the different table types.

New ebook ๐Ÿ”ฅ

Learn 84 ways to solve common data engineering problems with cloud services.

๐Ÿ‘‰ I want my Early Access edition

Read types

That's the first thing that you'll see when analyzing the reading logic. Besides different table types (Merge-On-Read and Copy-On-Write), Apache Hudi also has different query types. Both combined generate a pretty interesting data reading capability matrix where each scenario has a dedicated BaseRelation implementation to communicate with Apache Spark:


That's the implementation used for consistent reads of Merge-On-Read tables, i.e, it's the one reading the base (data) Parquet files and applying all the changes made to the records from the log Avro files (more on that here โ†’ ACID file formats - writing: Apache Hudi). You'll find this resolution complexity in the code reading the data, and more specifically in the HoodieMergeOnReadRDD.

But before Hudi creates this RDD, it does multiple metadata-based operations to prepare the reading context, such as:

After doing all this heavy work, Apache Hudi is finally ready to initialize the HoodieMergeOnReadRDD. In addition to the attributes resolved before, the constructor takes a merge type parameter from the hoodie.datasource.merge.type option. Is it important? Yes, and how much! It's one of the properties conditioning the physical files reading. Take a look at the HoodieMergeOnReadRDD's compute method:

class HoodieMergeOnReadRDD(@transient sc: SparkContext,
                           @transient config: Configuration, fileReaders: HoodieMergeOnReadBaseFileReaders, 
                           tableSchema: HoodieTableSchema,
                           requiredSchema: HoodieTableSchema, tableState: HoodieTableState,
                           mergeType: String, @transient fileSplits: Seq[HoodieMergeOnReadFileSplit])
  extends RDD[InternalRow](sc, Nil) with HoodieUnsafeRDD {
 /// ...
  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
    val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
    val iter = mergeOnReadPartition.split match {
      case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
        // ...

      case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
// ..

      case split if mergeType.equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
// ...

      case split if mergeType.equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
// ...

As you can see, the data reading depends on the input files associated with the partition and the merge type:

The SkipMergeIterator is basically a LogFileIterator except that it defines an extra iterator for the data files that is invoked before the delta files, potentially leading to duplicates:

  private class SkipMergeIterator(split: HoodieMergeOnReadFileSplit,
                                  baseFileReader: BaseFileReader,
                                  config: Configuration)
    extends LogFileIterator(split, config) {

    private val baseFileIterator = baseFileReader(split.dataFile.get)

    override def hasNext: Boolean = {
      if (baseFileIterator.hasNext) {
        // No merge is required, simply load current row and project into required schema
        recordToLoad = requiredSchemaUnsafeProjection(
      } else {

Even though the RecordMergingFileIterator also extends the LogFileIterator, it works differently. The LogFileIterator creates an instance of HoodieMergedLogRecordScanner that iterates over all the delta log files and builds a map storing the most recent entry for each key:

public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
    implements Iterable<HoodieRecord<? extends HoodieRecordPayload>> {
// ...
protected final ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records;
  protected void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) throws IOException {
    String key = hoodieRecord.getRecordKey();
    if (records.containsKey(key)) {
      // Merge and store the merged record. The HoodieRecordPayload implementation is free to decide what should be
      // done when a DELETE (empty payload) is encountered before or after an insert/update.

      HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
      HoodieRecordPayload oldValue = oldRecord.getData();
      HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(oldValue);
      // If combinedValue is oldValue, no need rePut oldRecord
      if (combinedValue != oldValue) {
        HoodieOperation operation = hoodieRecord.getOperation();
        records.put(key, new HoodieAvroRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue, operation));
    } else {
      // Put the record as is
      records.put(key, hoodieRecord);

During the merge, the RecordMergingFileIterator iterates over all records from the data files and calls the remove(...) method of the records map. If the call returns something, it means there is a merge action to do (skip the row if removed or get the value from the delta file). Otherwise, Hudi returns the version from the data file:


The BaseFileOnlyRelation ignores the changes from delta files. Logically then, its reading logic should be easier than for MergeOnReadSnapshot. Indeed, it is, but there are also some gotchas!

First, the results. Since the BaseFileOnlyRelation is present for the Merge-On-Read table or read-optimized queries, it might generate different results than the snapshot query for Merge-On-Read tables. And that's quite normal since the reader doesn't perform any merge step with the log data.

Second, the BaseFileOnlyRelation might not be the final class communicating with Apache Spark! Why? The answer comes from HUDI-3896:

After migrating to Hudi's own Relation impls, we unfortunately broke off some of the optimizations that Spark apply exclusively for `HadoopFsRelation`.

While these optimizations could be perfectly implemented for any `FileRelation`, Spark is unfortunately predicating them on usage of HadoopFsRelation, therefore making them non-applicable to any of the Hudi's relations.

Proper longterm solutions would be fixing this in Spark and could be either of:

1. Generalizing such optimizations to any `FileRelation`
2. Making `HadoopFsRelation` extensible (making it non-case class)

One example of this is Spark's `SchemaPrunning` optimization rule (HUDI-3891): Spark 3.2.x is able to effectively reduce amount of data read via schema pruning (projecting read data) even for nested structs, however this optimization is predicated on the usage of `HadoopFsRelation`:

That's the reason why for all but one scenario, the reading falls back to the HadoopFsRelation. What is this particular scenario? When this method returns true:

abstract class HoodieBaseRelation(val sqlContext: SQLContext, val metaClient: HoodieTableMetaClient, 
val optParams: Map[String, String], 
schemaSpec: Option[StructType]) {
// ..
  def hasSchemaOnRead: Boolean = internalSchemaOpt.isDefined
// ...

This schema-on-read flag is the schema evolution flag of the table represented by the property. Although, the schema evolution will be the topic of another blog post, it's good to know why only the BaseFileOnlyRelation can handle it. This Hudi-specific relation creates a reader capable of merging the files schema to the schema required by the reader:

object HoodieBaseRelation extends SparkAdapterSupport {
// ...
  def projectReader(reader: BaseFileReader, requiredSchema: StructType): BaseFileReader = {
    checkState(reader.schema.fields.toSet.intersect(requiredSchema.fields.toSet).size == requiredSchema.size)

    if (reader.schema == requiredSchema) {
    } else {
      val read = reader.apply(_)
      val projectedRead: PartitionedFile => Iterator[InternalRow] = (file: PartitionedFile) => {
        // NOTE: Projection is not a serializable object, hence it creation should only happen w/in
        //       the executor process
        val unsafeProjection = generateUnsafeProjection(reader.schema, requiredSchema)

      BaseFileReader(projectedRead, requiredSchema)
// ...

class BaseFileOnlyRelation(sqlContext: SQLContext,
                           metaClient: HoodieTableMetaClient,
                           optParams: Map[String, String],
                           userSchema: Option[StructType],
                           globPaths: Seq[Path])
  extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {
// ...
  protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
                                    tableSchema: HoodieTableSchema,
                                    requiredSchema: HoodieTableSchema,
                                    requestedColumns: Array[String],
                                    filters: Array[Filter]): RDD[InternalRow] = {
    val (partitionSchema, dataSchema, requiredDataSchema) =
      tryPrunePartitionColumns(tableSchema, requiredSchema)
// ...
   val projectedReader = projectReader(baseFileReader, requiredSchema.structTypeSchema)

    // SPARK-37273 FileScanRDD constructor changed in SPARK 3.3
    sparkAdapter.createHoodieFileScanRDD(sparkSession, projectedReader.apply,, requiredSchema.structTypeSchema)
// ...

That was the last part of the series about table file formats readers. But as you saw, it introduced several other interesting features, such as schema evolution. I haven't picked the theme for the next part of the series, though!

If you liked it, you should read:

The comments are moderated. I publish them when I answer, so don't worry if you don't see yours immediately :)

๐Ÿ“š Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!