Simplified Delta Lake operations with Mack

I like writing code and each time there is a data processing job to write with some business logic I'm very happy. However, with time I've learned to appreciate the Open Source contributions enhancing my daily work. Mack library, the topic of this blog post, is one of those projects discovered recently.

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

To put it simply, Mack is a Python library with helper methods for Delta Lake, a little bit like my other recent discover, Apache DataFu is for Apache Spark. It's designed around an interesting concept of pure functions to favor the code reusability without importing the whole library every time:

...
  • ...
  • We avoid classes whenever possible. Classes make it harder to copy / paste little chunks of code into notebooks. It's good to Stop Writing Classes.

Even though the project contains a nice README with plenty of examples, I'll try to complete it in this blog post with some extra explanation for the implementation details of the Slowly Changing Dimension update and duplicates removal.

Slowly Changing Dimension Type 2 update

To recall, a SCD Type 2 consists of updating the validity period of a row and inserting a new active row additionally. Mack helps managing this requirement with the following API:

def type_2_scd_generic_upsert(
	path, updates_df,
	primary_key, attr_col_names,
	is_current_col_name,
	effective_time_col_name, end_time_col_name,

The implementation consists of 2 main steps:

Duplicates removal

Besides the SCD, Mack has several helper methods to remove all duplicates or only a part of them. You may be thinking, "PySpark does it already with drop_duplicates". Indeed, it does and the function is used for the scenario removing all duplicates from a DataFrame. But there are 2 other deduplication functions that rely on the MERGE.

The first of these deduplication methods is kill_duplicates. It removes all duplicated rows from the table. In the end, the table contains only rows that have never been duplicated. The logic first creates a DataFrame with these duplicated rows only. The dataset is created from the WINDOW operation and later joined with the current table so that any match made on the defined columns provokes row deletion:

duplicate_records = (
    data_frame.withColumn(
        "amount_of_records",
        	count("*").over(Window.partitionBy(duplication_columns)),
    )
    .filter(col("amount_of_records") > 1)
    .drop("amount_of_records")
    .distinct()
)

# ...
# Remove all the duplicate records
delta_table.alias("old").merge(
    duplicate_records.alias("new"), q
).whenMatchedDelete().execute()

The second method relies on the same principle but it has a different semantic for creating duplicated_records DataFrame. It completes the partitionBy with an orderBy call on top of a unique primary key column defined as a part of the discussed function drop_duplicates_pkey:

duplicate_records = (
    data_frame.withColumn(
       "row_number",
        row_number().over(
                	Window().partitionBy(duplication_columns).orderBy(primary_key)
     ),
     )
     .filter(col("row_number") > 1)
     .drop("row_number")
     .distinct()
)
# ...
# Remove all the duplicate records
delta_table.alias("old").merge(
    duplicate_records.alias("new"), q
).whenMatchedDelete().execute()

That way only the oldest rows are kept in the table.

Even though I only discussed 2 features, Mack does more. It can print table information in a more human-friendly format, copy a table, or even add a schema validation to the append action. But I'll let you discover them by yourself!

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©