Data Engineering Design Patterns: Dynamic Data Overwriter

In the Data Engineering Design Patterns book I'm talking about Data Overwrite as one of the idempotency design patterns. With time, I realized I should have split it into two fine-grained patterns: Static Overwriter and Dynamic Data Overwriter. The difference between them? The capacity to replace only a subset of data thanks to the data layout properties, such as partitions or clustering columns.

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

But now, once I've initiated the Data Engineering Design Patterns category on the blog, I have some extra room to clean my backlog!

Context

You optimized the layout of your table by using a clustering or partitioning technique on a few columns. Now, you are looking for a simple way to manage the refreshed data. The dataset to refresh always stores the complete dataset for the given partition or cluster key. The schema below illustrates a refresh of the partition/cluster, represented as squares. In the example the refresh job only replaces the data stored for the grey key:

Implementation

The key part of the implementation is the scope identification. The refresh job has to detect what are the partitions/clusters impacted by the new data. Once this identification has been made, the data overwrite can happen.

In the real world, the identification step is often abstracted to your underlying compute environment. Historically, Apache Hive has brought this feature to the data engineering world with the hive.exec.dynamic.partition setting. With the data toolbox modernization, the feature has been added to Apache Spark with the dynamic strategy for the spark.sql.sources.partitionOverwriteMode property. Another framework implementing the Dynamic Data Overwriter pattern is dbt which lets you set the incremental_strategy parameter to insert_overwrite and replace only the partitions impacted by the written dataset. Whenever the partition/cluster-based overwrite is supported by the underlying database, dbt uses the data store's native capabilities. On another hand, when the support is not there, dbt implements the feature. For example, for BigQuery it starts by building a temporary table where it will get the partition values from. Next, dbt will use these partition values to identify partitions to replace as part of the operation.

The second approach to implement the Dynamic Data Overwriter pattern is the MERGE/UPSERT statement. The requirement here is to have a way to uniquely identify each row, for example with a unique id column or composite columns. If it's your case, then the merge operation involving the partitioning column and replacing whole partition(s) present in the input dataset will look like in the next query where the partitions missing in the loaded dataset but present in the target table are kept thanks to the WHEN NOT MATCHED BY SOURCE AND target.type IN (SELECT DISTINCT type FROM devices_output_temp) clause:

# type is the partition
# type and version are columns composing the unique identifier
MERGE INTO output_table AS target
USING output_temp AS source
    ON target.type = source.type AND target.version = source.version
WHEN MATCHED THEN
    UPDATE SET full_name = source.full_name
WHEN NOT MATCHED BY TARGET THEN
    INSERT (type, full_name, version)
    VALUES (source.type, source.full_name, source.version)
WHEN NOT MATCHED BY SOURCE AND target.type IN (SELECT DISTINCT type FROM devices_output_temp) THEN
    DELETE;

Consequences

The first challenge you may face is late data. The dynamic data overwrite is great because you can simply trust your data provider and apply a change for the modified partition/cluster of your dataset. However, the question you should ask yourself is about the reliability of your data provider. If in case of one arriving record the provider will expose it alongside all previously present rows, you are safe. But if it's not the case, you should take extra care and, for example, load the complete partition/cluster data before overwriting it - as long as losing data is not an option (I assume it's not). Or instead, you can simply use the Merger design pattern from Chapter 4.

The second point is the visibility. Any data operation is always more obvious when you see the exactly modified parameters. It would be the case of an explicit partition overwrite with a WHERE condition but not of a dynamic overwrite where only the partitioning/clustering columns are present in the query. That being said, the visibility shouldn't be a big deal as long as you have a different way to check data freshness, e.g. a dashboard.

The final challenge may be the performance. Although the operation simply replaces a part of the dataset, it might be faster to limit the update volume by running a MERGE statement with an insert on the missing rows, or simply append all rows that have been added since the last run (cf. Incremental Loader pattern from Chapter 2).

Examples

Let's first see how the Dynamic Data Overwriter works on Databricks for liquid clustered-tables. If we assume a table clustered by region and sales_date columns, the query relevant to the dynamic data replacement would look like that:

INSERT INTO sales REPLACE USING (region, sales_date) SELECT * FROM sales_tmp; 

When it comes to Apache Spark, it requires setting the dynamic strategy before executing the overwrite. But some data stores like Delta Lake supports the overwrite mode parameter on the writer, like in the snippet below:

df_to_write.write.format("delta")
  .option("partitionOverwriteMode", "dynamic")
  .mode(SaveMode.Overwrite).insertInto("test_table")

Similarly to Apache Spark, dbt also requires a prior configuration of the pipeline before triggering data overwriting. Here is how it works for BigQuery:

{{
  config(
    materialized = 'incremental',
    partition_by = {'field': event_time, 'data_type': 'timestamp'},
    incremental_strategy = 'insert_overwrite'
  )
}}
# �rest of the model here

The Dynamic Data Overwriter pattern is quite useful if you guarantee completeness of the written dataset and you have a partitioned/clustered table to replace with this dataset. It lets you replace a part of your dataset with a fresher data chunk without having to specify the overwriting conditions or without having to load all previously present data. Unfortunately, if you table is not partitioned, or if the refresh dataset is not complete, you'll have to make an extra effort to keep the output consistent.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©