Clean architecture for PySpark

Versions: Apache Spark 4.0.1, Delta Lake 4.0.1 https://github.com/bartosz25/spark-playground/tree/master/pyspark-clean-architecture-example

This year, I'll be exploring various software engineering principles and their application to data engineering. Clean Architecture is a prime example, and it serves as the focus of this post.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩

In a nutshell, clean architecture promises a clear separation between layers, from business rules implemented as domain objects, to the communication with the external world via adapters. This separation favors business logic independence from the technical world represented by databases or external APIs. One of the most important rules is the isolation where the inner circles should never know what their outer cycles are. The code organized that way is more flexible and easier to test locally since the most external cycle is not anymore required for the code execution. The following picture summaries the clean architecture principle:

If you have done some web development at least once in your life, you can clearly see where it's going. The entities are domain objects, so everything that represents your business. If you are an e-commerce store, you will find here classes like Order, Buyer, or yet Seller.

When it comes to the Use cases, this circle will contain code that apply the business rules implemented in the domain entities directly. Think about it as methods that operate on domains and eventually orchestrate abstract definitions from the Controllers layer. In our e-commerce example we would implement here a method changing order status to ready and persisting this change...remark, I'm not typing where this state should be persisted as the Use case layer shouldn't know this detail.

The next circle is the Controllers one, aka interface adapters. They're exact implementations of the persistence abstractions invoked in the Use cases circle. Typically, they'll facilitate data exchange between real databases, messaging queues, external APIs, or HTTP requests, and the inner circle. For example, we could convert here our domain Order into a row that can be persisted in the database.

The final circle stands for Devices and interfaces. That's where all external services or databases expected by inner circles are living. You'll find here some infrastructure with databases but also frameworks used to build the application.

Clean architecture for data engineers

Applying the clean architecture as-is to data engineering projects is risky. It might lead to many unnecessary abstractions that will complexify the code without reason. Therefore, instead of trying to translate clean architecture to data engineering workloads, let's try to adapt it.

As I have been specializing in Apache Spark over the years, I'll use it as an example and, hopefully, share one way for organizing the code for PySpark jobs running on Databricks (since it's another tool I've been extensively working on over the years 😉).

Our workflow consists of many CSV files that should ultimately be combined to a single DataFrame, and written to a Delta Lake table. We can illustrate it as follows:

Because it's hard to translate this workflow to circle-based clean architecture, let's see how to represent it with layered boxes:

Let see how this organization fits to the onion-based organization of clean architecture:

Even though I could introduce the real data repositories abstraction to generate DataFrames, I won't to that. Repositories are an additional abstraction that - subjectively speaking - brings an extra indirection layer making the whole understanding more challenging. That's why the DataFrames from the Controllers circle will be for me...simple DataFrames that might be generated by a Use case.

To see this clean architecture in the PySpark context, let's take a look at the code snippets:

  1. Orders entity (leaving only one country because it's very similar):
    @dataclass
    class OrderPoland:
        CUSTOMER_ID = 'customer_id'
        ITEM_ID = 'item_id'
        MANUFACTURER = 'manufacturer'
        DATE = 'date'
        LABEL = 'label'
        VALUE = 'value'
        UNITS = 'units'
        UNITS_TYPE_ENTITY = 'units_type_entity'
    
        @classmethod
        def prepare_dataframe(clz, order_to_process: DataFrame) -> DataFrame:
            prepared_order = order_to_process.withColumnsRenamed({
                clz.CUSTOMER_ID: EnrichedOrder.CUSTOMER,
                clz.DATE: EnrichedOrder.ORDER_DATE,
                clz.UNITS_TYPE_ENTITY: EnrichedOrder.UNITS_TYPE
            })
    
            return prepared_order.drop(clz.LABEL)
    
  2. Product referential entity:
    @dataclass
    class ProductReferential:
        ITEM_ID = 'item_id'
        MANUFACTURER = 'manufacturer'
        MODEL = 'model'
    
        ALL_COLUMNS = [ITEM_ID, MANUFACTURER, MODEL]
    
        def prepare_dataframe(self, product_referential: DataFrame) -> DataFrame:
            return product_referential.select(self.ALL_COLUMNS)
    
  3. Enriched orders entity:
    @dataclass
    class EnrichedOrder:
        CUSTOMER = 'customer'
        ITEM_ID = ProductReferential.ITEM_ID
        MANUFACTURER = ProductReferential.MANUFACTURER
        ORDER_DATE = 'order_date'
        VALUE = 'value'
        UNITS = 'units'
        UNITS_TYPE = 'units_type'
    
        ALL_COLUMNS = [CUSTOMER, ITEM_ID, MANUFACTURER, ORDER_DATE, VALUE, UNITS, UNITS_TYPE]
    
        @classmethod
        def prepare_enriched_order(clz, orders_poland: DataFrame, orders_france: DataFrame, orders_germany: DataFrame,
                                   referential_product: DataFrame) -> DataFrame:
            unified_orders = (orders_poland.unionByName(orders_germany, allowMissingColumns=False)
                              .unionByName(orders_france, allowMissingColumns=False))
    
            enriched_orders = unified_orders.join(referential_product,
                                                  on=[ProductReferential.ITEM_ID, ProductReferential.MANUFACTURER],
                                                  how='left')
            return enriched_orders.select(clz.ALL_COLUMNS
    
  4. Main job that I'm considering here as the ultimate Devices and interfaces layer connecting everything:
    def generate_enriched_orders(raw_poland: DataFrame, raw_france: DataFrame,
                                 raw_germany: DataFrame, raw_referential_data: DataFrame) -> DataFrame:
        orders_poland = OrderPoland.prepare_dataframe(raw_poland)
        orders_france = OrderFrance.prepare_dataframe(raw_france)
        orders_germany = OrderGermany.prepare_dataframe(raw_germany)
        referential_data = ProductReferential.prepare_dataframe(raw_referential_data)
    
        enriched_orders = EnrichedOrder.prepare_enriched_order(
            orders_poland=orders_poland, orders_france=orders_france, orders_germany=orders_germany,
            referential_product=referential_data
        )
    
        return enriched_orders
    
    if __name__ == '__main__':
        spark = (configure_spark_with_delta_pip(SparkSession.builder.master("local[*]")
                                                .config("spark.sql.extensions",
                                                        "io.delta.sql.DeltaSparkSessionExtension")
                                                .config("spark.sql.catalog.spark_catalog",
                                                        "org.apache.spark.sql.delta.catalog.DeltaCatalog")
                                                .config("spark.sql.warehouse.dir", "../data/catalog")
                                                ).getOrCreate())
        enriched_orders = generate_enriched_orders(
            raw_poland=spark.read.csv('../data/poland_orders', header=True, inferSchema=True),
            raw_france=spark.read.csv('../data/france_orders', header=True, inferSchema=True),
            raw_germany=spark.read.csv('../data/germany_orders', header=True, inferSchema=True),
            raw_referential_data=spark.read.table('product_referential')
        )
        enriched_orders.write.format('delta').mode('overwrite').saveAsTable('enriched_orders')
    

Clean Architecture provides a robust framework for decoupling layers within your codebase. As demonstrated in this post, despite its origins in pure software engineering, these concepts are highly applicable to the data engineering context.

Consulting

With nearly 17 years of experience, including 9 as data engineer, I offer expert consulting to design and optimize scalable data solutions. As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!

👉 contact@waitingforcode.com
đź”— past projects


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!