I like writing code and each time there is a data processing job to write with some business logic I'm very happy. However, with time I've learned to appreciate the Open Source contributions enhancing my daily work. Mack library, the topic of this blog post, is one of those projects discovered recently.

Looking for a better data engineering position and skills?
You have been working as a data engineer but feel stuck? You don't have any new challenges and are still writing the same jobs all over again? You have now different options. You can try to look for a new job, now or later, or learn from the others! "Become a Better Data Engineer" initiative is one of these places where you can find online learning resources where the theory meets the practice. They will help you prepare maybe for the next job, or at least, improve your current skillset without looking for something else.
๐ I'm interested in improving my data engineering skillset
To put it simply, Mack is a Python library with helper methods for Delta Lake, a little bit like my other recent discover, Apache DataFu is for Apache Spark. It's designed around an interesting concept of pure functions to favor the code reusability without importing the whole library every time:
...
- ...
- We avoid classes whenever possible. Classes make it harder to copy / paste little chunks of code into notebooks. It's good to Stop Writing Classes.
Even though the project contains a nice README with plenty of examples, I'll try to complete it in this blog post with some extra explanation for the implementation details of the Slowly Changing Dimension update and duplicates removal.
Slowly Changing Dimension Type 2 update
To recall, a SCD Type 2 consists of updating the validity period of a row and inserting a new active row additionally. Mack helps managing this requirement with the following API:
def type_2_scd_generic_upsert( path, updates_df, primary_key, attr_col_names, is_current_col_name, effective_time_col_name, end_time_col_name,
The implementation consists of 2 main steps:
- Creating a staging table. The table results from a union between a DataFrame with the rows to insert and all other rows to update (= set the end validity date):
staged_part_1 = ( updates_df.alias("updates") .join(base_table.toDF().alias("base"), primary_key) .where(f"base.{is_current_col_name} = true AND ({updates_attrs})") .selectExpr("NULL as mergeKey", "updates.*") ) staged_part_2 = updates_df.selectExpr(f"{primary_key} as mergeKey", "*") staged_updates = staged_part_1.union(staged_part_2)
- Running a MERGE operation between the current and staging tables. Any match sets the current flag column to false and adds a validity end date. The opposite inserts a new row. As you can see from the snippet above, the staged_part_1 guarantees the insert for the expired rows thanks to the missing mergeKey attribute:
s = ( base_table.alias("base") .merge( source=staged_updates.alias("staged_updates"), condition=pyspark.sql.functions.expr( f"base.{primary_key} = mergeKey AND base.{is_current_col_name} = true AND ({staged_updates_attrs})" ), ) .whenMatchedUpdate( set={ is_current_col_name: "false", end_time_col_name: f"staged_updates.{effective_time_col_name}", } ) .whenNotMatchedInsert(values=res_thing) .execute()
Duplicates removal
Besides the SCD, Mack has several helper methods to remove all duplicates or only a part of them. You may be thinking, "PySpark does it already with drop_duplicates". Indeed, it does and the function is used for the scenario removing all duplicates from a DataFrame. But there are 2 other deduplication functions that rely on the MERGE.
The first of these deduplication methods is kill_duplicates. It removes all duplicated rows from the table. In the end, the table contains only rows that have never been duplicated. The logic first creates a DataFrame with these duplicated rows only. The dataset is created from the WINDOW operation and later joined with the current table so that any match made on the defined columns provokes row deletion:
duplicate_records = ( data_frame.withColumn( "amount_of_records", count("*").over(Window.partitionBy(duplication_columns)), ) .filter(col("amount_of_records") > 1) .drop("amount_of_records") .distinct() ) # ... # Remove all the duplicate records delta_table.alias("old").merge( duplicate_records.alias("new"), q ).whenMatchedDelete().execute()
The second method relies on the same principle but it has a different semantic for creating duplicated_records DataFrame. It completes the partitionBy with an orderBy call on top of a unique primary key column defined as a part of the discussed function drop_duplicates_pkey:
duplicate_records = ( data_frame.withColumn( "row_number", row_number().over( Window().partitionBy(duplication_columns).orderBy(primary_key) ), ) .filter(col("row_number") > 1) .drop("row_number") .distinct() ) # ... # Remove all the duplicate records delta_table.alias("old").merge( duplicate_records.alias("new"), q ).whenMatchedDelete().execute()
That way only the oldest rows are kept in the table.
Even though I only discussed 2 features, Mack does more. It can print table information in a more human-friendly format, copy a table, or even add a schema validation to the append action. But I'll let you discover them by yourself!