I like writing code and each time there is a data processing job to write with some business logic I'm very happy. However, with time I've learned to appreciate the Open Source contributions enhancing my daily work. Mack library, the topic of this blog post, is one of those projects discovered recently.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
To put it simply, Mack is a Python library with helper methods for Delta Lake, a little bit like my other recent discover, Apache DataFu is for Apache Spark. It's designed around an interesting concept of pure functions to favor the code reusability without importing the whole library every time:
...
- ...
- We avoid classes whenever possible. Classes make it harder to copy / paste little chunks of code into notebooks. It's good to Stop Writing Classes.
Even though the project contains a nice README with plenty of examples, I'll try to complete it in this blog post with some extra explanation for the implementation details of the Slowly Changing Dimension update and duplicates removal.
Slowly Changing Dimension Type 2 update
To recall, a SCD Type 2 consists of updating the validity period of a row and inserting a new active row additionally. Mack helps managing this requirement with the following API:
def type_2_scd_generic_upsert( path, updates_df, primary_key, attr_col_names, is_current_col_name, effective_time_col_name, end_time_col_name,
The implementation consists of 2 main steps:
- Creating a staging table. The table results from a union between a DataFrame with the rows to insert and all other rows to update (= set the end validity date):
staged_part_1 = ( updates_df.alias("updates") .join(base_table.toDF().alias("base"), primary_key) .where(f"base.{is_current_col_name} = true AND ({updates_attrs})") .selectExpr("NULL as mergeKey", "updates.*") ) staged_part_2 = updates_df.selectExpr(f"{primary_key} as mergeKey", "*") staged_updates = staged_part_1.union(staged_part_2)
- Running a MERGE operation between the current and staging tables. Any match sets the current flag column to false and adds a validity end date. The opposite inserts a new row. As you can see from the snippet above, the staged_part_1 guarantees the insert for the expired rows thanks to the missing mergeKey attribute:
s = ( base_table.alias("base") .merge( source=staged_updates.alias("staged_updates"), condition=pyspark.sql.functions.expr( f"base.{primary_key} = mergeKey AND base.{is_current_col_name} = true AND ({staged_updates_attrs})" ), ) .whenMatchedUpdate( set={ is_current_col_name: "false", end_time_col_name: f"staged_updates.{effective_time_col_name}", } ) .whenNotMatchedInsert(values=res_thing) .execute()
Duplicates removal
Besides the SCD, Mack has several helper methods to remove all duplicates or only a part of them. You may be thinking, "PySpark does it already with drop_duplicates". Indeed, it does and the function is used for the scenario removing all duplicates from a DataFrame. But there are 2 other deduplication functions that rely on the MERGE.
The first of these deduplication methods is kill_duplicates. It removes all duplicated rows from the table. In the end, the table contains only rows that have never been duplicated. The logic first creates a DataFrame with these duplicated rows only. The dataset is created from the WINDOW operation and later joined with the current table so that any match made on the defined columns provokes row deletion:
duplicate_records = ( data_frame.withColumn( "amount_of_records", count("*").over(Window.partitionBy(duplication_columns)), ) .filter(col("amount_of_records") > 1) .drop("amount_of_records") .distinct() ) # ... # Remove all the duplicate records delta_table.alias("old").merge( duplicate_records.alias("new"), q ).whenMatchedDelete().execute()
The second method relies on the same principle but it has a different semantic for creating duplicated_records DataFrame. It completes the partitionBy with an orderBy call on top of a unique primary key column defined as a part of the discussed function drop_duplicates_pkey:
duplicate_records = ( data_frame.withColumn( "row_number", row_number().over( Window().partitionBy(duplication_columns).orderBy(primary_key) ), ) .filter(col("row_number") > 1) .drop("row_number") .distinct() ) # ... # Remove all the duplicate records delta_table.alias("old").merge( duplicate_records.alias("new"), q ).whenMatchedDelete().execute()
That way only the oldest rows are kept in the table.
Even though I only discussed 2 features, Mack does more. It can print table information in a more human-friendly format, copy a table, or even add a schema validation to the append action. But I'll let you discover them by yourself!