Data quality on Databricks - Spark Expectations

Versions: Spark Expectations 2.3.0 https://github.com/bartosz25/spark-playground/tree/master/spark-expectations-demo

Previously we learned how to control data quality with Delta Live Tables. Now, it's time to see an open source library in action, Spark Expectations.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems 👉 contact@waitingforcode.com 📩

Spark Expectations vs. Delta Live Tables expectations

After the previous blog post introducing Delta Live Tables expectations, a good start here is to compare what we learned last time with what we're going to learn now. Let's begin with the same entry point for both solutions, the decorator. As for DLT, Spark Expectations relies on a Python decorator to annotate the validated DataFrame. You'll find an example below with the @with_expectations decorator applied to a SparkExpectations instance:

from spark_expectations.core.expectations import SparkExpectations

spark_expectations = SparkExpectations(
    product_id='user_account_staging_validation',
    rules_df=rules_dataframe,
    stats_table='dq_stats_table',
    stats_table_writer=writer,
    target_and_error_table_writer=writer
)

@spark_expectations.with_expectations(
    target_table='user_account_staging',
    write_to_table=True
)
def build_account_poland_gold_table_staging() -> DataFrame:
   pass

After reading this snippet and without entering into implementation details, you should already notice some differences. First, there are some tables. You can see the parameters for a stats table or for a target table. Second, you can also see that the validation is made on top of Apache Spark's DataFrame and the job can run without writing the data to a table (write_to_table=True/False).

But this is only a high-level view. Spark Expectations is much more. The next picture comes from Spark Expectations repo and greatly illustrates all the capabilities of this library:

In a nutshell - because we're going to delve into those points later - the involved steps are:

Configuring Spark Expectations

As you saw in our minimalistic example from the previous section, there are several configuration flags for the with_expectations decorator. To get a better idea what you can do, let's analyze the full signature of the function:

def with_expectations(
    self,
    target_table: str,
    write_to_table: bool = False,
    write_to_temp_table: bool = False,
    user_conf: Optional[Dict[str, Union[str, int, bool]]] = None,
    target_table_view: Optional[str] = None,
    target_and_error_table_writer: Optional["WrappedDataFrameWriter"] = None,
) -> Any:

Even though the attributes are self-explanatory, let's explain each of them:

Before we go to the user configuration, we can define the data validation workflow. From the previously explained properties you can see the Spark Expectations supports both types of data quality checks; the ones we called last time row-based apply for each row of the input DataFrame, while the ones we considered as complex apply in two places. The first one is the input DataFrame so possibly the dataset with the data quality issues. The second place is the DataFrame without those data quality issues which is created after applying the row-level data quality rules.

💬 Why these different rules?

Ashok Singamaneni, principal data engineer at Nike and one of the core contributors to Spark Expectations, answered a few of my questions. Thank you, Ashok, for your time and interesting insights!

Why does Spark Expectation support the same data quality controls (aggregation and query) at different levels, once for the input DataFrame, once for the output DataFrame?

So, when I started SE - I wanted to have row_dq, agg_dq and query_dq. But when I thought about the order, and discussed with various teams - there could be usecases where [agg_dq and query_dq] can be run on the input dataframe or after row_dq where there might be drop in the records because of which [agg_dq and query_dq] can have varied results.

So the flow is like this:

[agg_dq, query_dq] -> row_dq -> [agg_dq, query_dq]

It is not mandatory to run [agg_dq, query_dq] two times, and can be configured in the rules definition.

That's how the full workflow may look like. But you, as a user, can configure it and disable some of the steps with the following settings:

Put differently, the following will happen:

Enabled flagsBehavior
source_agg_dq, source_query_dq, _row_dq, target_agg_dq, target_query_dq, write_to_table
  1. Spark Expectations applies aggregation- and query-based rules on top of the input DataFrame
  2. Spark Expectations applies the row-based rules on top of the input DataFrame and returns a final DataFrame without rows having data quality issues (if drop action configured for the row-based rules)
  3. Spark Expectations applies the aggregation- and query-based rules on top of the final DataFrame.
  4. Spark Expectations writes the final DataFrame to the configured target table.
source_agg_dq, source_query_dq, write_to_table
  1. Spark Expectations applies aggregation- and query-based rules on top of the input DataFrame
  2. Spark Expectations writes the input DataFrame to the configured target table.
_row_dq, target_agg_dq, target_query_dq, write_to_table
  1. Spark Expectations applies the row-based rules on top of the input DataFrame and returns a final DataFrame without rows having data quality issues (if drop action configured for the row-based rules)
  2. Spark Expectations applies the aggregation- and query-based rules on top of the final DataFrame.
  3. Spark Expectations writes the final DataFrame to the configured target table.
target_agg_dq, target_query_dq, write_to_table
  1. Spark Expectations writes the input DataFrame to the configured target table. The validation doesn't happen as the target-based rules depend on the row-based ones.

User configuration

You should know now how to configure the validation workflow, but a few questions remain to answer. The first of them is how to be aware of the data validation results? Remember the user configuration from the previous section? Yes, that's where you can plug Spark Expectations to your alerting system, for example:

se_user_conf = {
  user_config.se_notifications_enable_email: True
  user_config.se_notifications_email_smtp_host: "waitingforcode.com",
  user_config.se_notifications_email_smtp_port: 25,
  user_config.se_notifications_smtp_password: "pass"
# ...

You'll find all supported properties in the spark_expectations.config.user_config.

Besides those email-related configuration, you can also set the following flags that impact the behavior of the validation process:

Validation rules

The last aspect to discover before we start playing with Spark Expectations are the validation rules. So far we have identified three types of validation rules that we called row-based, query-based, and aggregation-based.

The row-based rules work at the row level and can check an expectation against one or multiple columns. The documentation provides pretty clear examples for them:

In addition to these simple rules, you can define the aggregation-based rules that, as the name indicates, rely on the aggregation expressions. Again, the documentation gives pretty good examples for them:

And finally, if you need a rule which is not covered by the two previous categories, you can define a query-based rule that simply runs a query on top of one or multiple tables, as per documentation:

In addition to the rule definition itself, each rule has other properties:

User account validation example

The goal of this part is to use a staging table to validate the dataset before overwriting the dataset currently exposed to the consumers by running similar data quality rules to the ones you saw in the aforementioned Delta Live Tables data quality blog post:

Let's begin with the data quality rules. Even though you can store them in a dedicated table or a file, we're going to keep this part simple and rely on the validation rules defined as an in-memory DataFrame:

# The full code is available on Github: https://github.com/bartosz25/spark-playground/tree/master/spark-expectations-demo 
def get_rules_for_account() -> List[Row]:
  rules_builder = DataQualityRulesBuilder('accounts')
  return [
    rules_builder.new_row_rule(column='id', failed_action='fail', expectation='id IS NOT NULL', name='id_defined'),
    rules_builder.new_row_rule(column='creation_date', failed_action='fail', expectation='creation_date < NOW()', name='date_from_past'),
    rules_builder.new_row_rule(column='email', failed_action='fail', expectation='email LIKE "%@%"', name='real_email'),
    rules_builder.new_agg_rule(expectation='COUNT(id) > 4', failed_action='fail', name='correct_count'),
    rules_builder.new_agg_rule(expectation='COUNT(DISTINCT(id)) > 3', failed_action='fail', name='correct_distinct_count'),
    rules_builder.new_query_rule(expectation='''(SELECT COUNT(*) FROM accounts_staging a
        LEFT ANTI JOIN account_types at ON at.name = a.account_type) = 0''', failed_action='fail', name='referential_check'),
    rules_builder.new_query_rule(expectation='''(SELECT
        SUM(rows_number_new) - SUM(rows_number_old) AS rows_difference
        FROM (
          SELECT COUNT(*) AS rows_number_new, 0 AS rows_number_old FROM accounts_staging
          UNION ALL
          SELECT 0 AS rows_number_new, COUNT(*) AS rows_number_old FROM accounts
        )) >= 0
    	''', failed_action='fail', name='seasonality_check')
]

rules_dataframe = spark_session.createDataFrame(data=get_rules_for_account(), schema=get_rules_schema())

As you can see, there is validation on the individual rows, data integrity and seasonality checks, as well as an overall volume verification with an aggregation rule.

Next step consists on initializing the SparkExpectations within our Databricks job. The configuration references the rules DataFrame created before and a stats table that will be created by Spark Expectations to store some interesting details on the validation step:

delta_writer = WrappedDataFrameWriter().mode("overwrite").format("delta")
spark_expectations = SparkExpectations(
 product_id=product_id_from_table_name('accounts'),
 rules_df=rules_dataframe,
 stats_table='data_validation_stats',
 stats_table_writer=delta_writer,
 target_and_error_table_writer=delta_writer,
 stats_streaming_options={user_config.se_enable_streaming: False}
)
# disabled on purpose
user_conf = {
 user_config.se_notifications_enable_email: False,
 user_config.se_notifications_enable_slack: False,
}

After the SparkExpectations initialization, it's time to decorate the task that is going to run as part of Databricks Workflows:

def load_accounts_if_valid():
  # setup part
  @spark_expectations.with_expectations(
    target_table='wfc_catalog.test_se.accounts',
    write_to_table=True,
    user_conf=user_conf,
    target_table_view='accounts_staging'
  )
  def build_account_table():
    spark_session.createDataFrame(data=[
        Row(id=1, name='type a'), Row(id=2, name='type b'), Row(id=3, name='type c')
    ]).createOrReplaceTempView('account_types')
    spark_session.createDataFrame(data=[
        Row(id=1, name='account 1', email='email1@email', account_type='type a',
            creation_date=datetime.datetime(2025, 1, 10, 14, 50), meta_dq_run_id=None, meta_dq_run_datetime=None),
        Row(id=1, name='account 1', email='email1@email', account_type='type a',
            creation_date=datetime.datetime(2025, 1, 10, 14, 50), meta_dq_run_id=None, meta_dq_run_datetime=None),
        Row(id=2, name='account 2', email='email2@email', account_type='type b',
            creation_date=datetime.datetime(2025, 1, 10, 11, 50), meta_dq_run_id=None, meta_dq_run_datetime=None),
        Row(id=3, name='account 3', email='email3@email', account_type='type c',
            creation_date=datetime.datetime(2025, 1, 10, 8, 50), meta_dq_run_id=None, meta_dq_run_datetime=None),
        Row(id=4, name='account 4', email='email4@email', account_type='type XXXX',
            creation_date=datetime.datetime(2025, 1, 9, 14, 50), meta_dq_run_id=None, meta_dq_run_datetime=None)
    ], schema='id INT, name STRING, email STRING, account_type STRING, 
       creation_date TIMESTAMP, meta_dq_run_id STRING, 
       meta_dq_run_datetime TIMESTAMP').createOrReplaceTempView('accounts_old')

  df = spark_session.createDataFrame(data=[
        Row(id=1, name='account 1', email='email1@email', account_type='type a',
            creation_date=datetime.datetime(2025, 1, 10, 14, 50)),
        Row(id=1, name='account 1', email='email1@email', account_type='type a',
            creation_date=datetime.datetime(2025, 1, 10, 14, 50)),
        Row(id=2, name='account 5', email='email2@email', account_type='type b',
            creation_date=datetime.datetime(2025, 1, 10, 11, 50)),
        Row(id=3, name='account 6', email='email3@email', account_type='type c',
            creation_date=datetime.datetime(2025, 1, 10, 8, 50)),
        Row(id=4, name='account 7', email='email3@email', account_type='type c',
            creation_date=datetime.datetime(2050, 1, 10, 8, 50)),
        Row(id=None, name='account None', email='email3@email', account_type='type c',
            creation_date=datetime.datetime(2025, 1, 10, 8, 50)),
        Row(id=111, name='account 111', email='email3@email', account_type='type XXXXXXXc',
            creation_date=datetime.datetime(2025, 1, 10, 8, 50)),
    ], schema='id INT, name STRING, email STRING, account_type STRING, creation_date TIMESTAMP')

    df.createOrReplaceTempView('accounts_staging')

    return df

return build_account_table()

As you can see, the job defines some raw data breaking the data quality rules. As all the rules are configured with the fail flag, the validation job will fail and all failure details will be written to the stats table:

If instead of failing the data loading process you would like to see the data but annotated with the data quality issues, you can change the action_if_failed to 'ignore'. After executing the job, you should see the records correctly loaded to the output table with some extra columns referencing the data quality run:

Besides, the rows not respecting the data quality rules will land in the errors table:

Finally, if we change the action_if_failed to 'drop', you'll still find the errors table with two invalid records. On another side, you will not see them in the output table since the configuration removes bad rows from the destination:

As you can notice, only the rows that failed the row data quality rules are excluded from the target table. It's not the case for the id 111 from our screenshot which breaks the referential integrity. However, as the check is defined as a query data quality rule, Spark Expectations doesn't know which is the particular row breaking it. Therefore, the library can't exclude it from the final output.

⚠ Databricks Serverless and Spark Expectations

Spark Expectations defines pyspark as required dependency. If you try to install the framework on Databricks Serverless, your runtime environment will try to install Open Source PySpark which is incompatible with the version running on Databricks. Consequently, your job won't start and will fail with the following exception instead:

run failed with error message
 Library installation failed: Library installation attempted on serverless compute and failed due to installing an incompatible package with the core packages. Error code: ERROR_CORE_PACKAGE_VERSION_CHANGE, error message: Python restart failed:

For that reason, you need to - at least for Spark Expectations 2.3.1 - use a cluster.

Pros and cons

ProsCons
Flexibility that simplifies simple and complex rules definition, without requiring any extra structures to declare. It can be tricky, especially the dependence between source, row, and final table validation rules. Some issues experience this, for example [BUG] query rule is skipped #44. Thankfully, the documentation gets better and better!
Open source, so if you are in trouble, you can reach out to the maintainers or directly see the code. Even though the project is amazing from the end user perspective, it seems lacking some interest in the data community to collaborate on data quality best practices with a wider audience.
Native support for storing invalid records in a dead-letter table without you having to do any work. Incompatibility with Databricks Serverless. The serverless environment simplifies a lot of work while setting Databricks jobs up. Unfortunately, Spark Expectations is not yet compatible with it. Knowing that you can use the clusters, it shouldn't be a blocker, though!
Native support for storing data validation results and data validation rules as tables which makes their exposition to the monitoring layer possible without a lot of effort.

💬 What you might find in the next releases?

Ashok, if you could share a few words on the planned evolution of Spark Expectations like features, or things you are secretly working on at the moment?

1) We are working on an internal UI project to manage rules for SE, so even product, business and governance teams can collaborate and manage rules. We are envisioning Github as a backend to support CI/CD and ultimately engineers can test/validate in lower envs before moving to prod. I hope this goes good and ultimately becomes open source (secret :) )

2) Thinking of native support for YAML based rules configuration

3) Enhancing and optimizing SE for better streaming support

4) The community has been active over the last one and half years and improved SE to a great extent from its inception, with more community support - we can work on the long lasting important issues that will be beneficial for better reporting. Example: https://github.com/Nike-Inc/spark-expectations/issues/38

5) Lastly thanks to the community for contributing to new features and fixing many issues. Here is more to be done for improving the code base - https://github.com/Nike-Inc/spark-expectations/issues

I would be more than happy to tell you that Spark Expectations is the last data quality choice on Databricks. It's not the case because Databricks Labs released recently DQX that will be the topic of my next -and hopefully the last - blog post from the series!.

Consulting

With nearly 16 years of experience, including 8 as data engineer, I offer expert consulting to design and optimize scalable data solutions. As an O’Reilly author, Data+AI Summit speaker, and blogger, I bring cutting-edge insights to modernize infrastructure, build robust pipelines, and drive data-driven decision-making. Let's transform your data challenges into opportunities—reach out to elevate your data engineering game today!

👉 contact@waitingforcode.com
đź”— past projects