Good to know if you merge - reprocessing challenges

MERGE, aka UPSERT, is a useful operation to combine two datasets if records identity is preserved. It appears then as a natural candidate for idempotent operations. Although it's true, there will be some challenges when things go wrong and you need to reprocess the data.

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

The MERGE operation is a universal data combination action that you can use to add, update, or even remove rows. The next snippet shows a query that, depending on the MATCHED outcome, does one of these 3 things:

MERGE INTO devices_output AS target
USING devices_input AS input
ON target.type = input.type AND target.version = input.version
WHEN MATCHED AND input.is_deleted = true THEN
	DELETE
WHEN MATCHED AND input.is_deleted = false THEN
	UPDATE SET full_name = input.full_name
WHEN NOT MATCHED AND input.is_deleted = falseTHEN
	INSERT (full_name, version, type) VALUES (input.full_name, input.version, input.type);

However, behind this apparent simplicity, MERGE hides some operational challenges. At the end of the day, you won't run a MERGE alone. Instead, the operation will be part of a more or less complex data processing environment, where things can go wrong. And what to do when things go wrong? It requires more explanation.

Data looks bad

The bad first thing can come from your data provider. If you get a wrong or incomplete dataset, you'll expose these inconsistencies to your downstream consumers. But lucky you, the solution for this problem doesn't require any action on your own. In that case, you should ask your data provider to generate a new dataset that will fix the mistakes introduced by the previous one. Generating new dataset covers two errors categories:

The single requirement for your provider is to keep the same ids between the buggy and fixed dataset versions. If it's the case, you can simply relaunch the previous job execution. But unfortunately, things are not always that simple and sometimes the issue comes from your side.

Code looks bad

If the problem doesn't come from your provider, you may be in trouble. Why "may be"? It depends whether you do care of the dataset consistency while reprocessing or not. Let me explain it with the picture below where the green box is the last correctly written version and all the red ones are the buggy ones:

When you decide to fix the bug by reprocessing the dataset for the past wrong executions (2-9), you can approach the problem with two different points of view:

As the first use case doesn't require any action on your side, in the rest of this blog post let's see how to address the second one with data stores supporting versioning, like Delta Lake.

Version mapping

Even though from our example you could think that each pipeline's run increments the version by 1, it rarely happens that way in real life because between two pipelines runs you might have some housekeeping actions, like compaction in Delta Lake. For that reason, the solution to the second scenario requires some extra implementation effort.

Without this consistency in mind, the pipeline could just run a job running the MERGE statement. However, if consistency is important, the single MERGE won't be enough as you won't be able to figure out the version to restore. For that reason, the MERGE operation needs a mapping table between the pipeline runs and the generated version by each run. The following schemas introduces the state table and extra steps in the pipeline:

These extra steps control the behavior of the workflow and have the following responsibilities: