Output invalidation pattern

Versions: Apache Airflow 1.10.3

My last slides of Spark Summit 2019 were dedicated to an output invalidation pattern that is very useful to build maintainable data pipelines. In this post I will deep delve into it.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I'm currently writing one on that topic and the first chapters are already available in πŸ‘‰ Early Release on the O'Reilly platform

I also help solve your data engineering problems πŸ‘‰ contact@waitingforcode.com πŸ“©

In this post I will go through 3 parts to first show how we can but shouldn't load data into data warehouse stores. Later I will develop more the idea of "table partitions as immutable objects" whereas in the last part I will give you a sample implementation of this pattern with Apache Airflow.

What's hard with loading big volumes of data?

The simplest pattern for loading data into data warehouse storages is presented in the following schema:

The schema is very simple. We generate the data on one side and load it directly to the final destination. Easy peasy, but it will work only if you are the perfect programmer and never make mistakes. If it's not the case, you will need to take into account the aspect of data reprocessing. Let's imagine that you're integrating data coming from your partners but they had a problem in the data generation part. They resend you the data and you must reload it into your data store. The problem is that new data is completely different. You need to rollback your previous load and to do that, you have several choices:

I'm not a fan of neither of these 2 approaches. Deleting rows one by one in modern data warehouse solutions is costly and can take time. Reloading everything from the beginning can also take time and it's possible only if you archive your raw data somewhere. Fortunately, there is a data engineering pattern that can help you to overcome that and reduce the maintenance burden.

"Table partitions as immutable objects"

The idea comes from Maxime Beauchemin's "Functional Data Engineering - a modern paradigm for batch data processing" post. The goal is to have an immutable data store and use these immutable components to expose the final data, that way:

CREATE VIEW all_weeks AS
  SELECT a, b FROM week_1  
  UNION ALL
  SELECT a, b FROM week_2  
  UNION ALL  
  SELECT a, b FROM week_3

The immutability in that context simply means that none of the in-place changes like updates or deletes are allowed. We shouldn't allow appends either but it would mean to create one table per load and could be highly inefficient due to the metadata limitations of data stores (eg. max number of tables). To sum up, we deny DELETE and UPDATE and only allow new data.

In our example the data will be therefore loaded at a weekly basis. It also means that it will be reloaded at that same schedule. So even if you have one dataset to load, you'll need to copy everything from the beginning of the week. Even if it seems suboptimal at first glance, at the end you may end up with much more performant loading time since TRUNCATE or DROP operations that will be applied on the weekly tables, are metadata operations and they tend to be much faster than random deletes.

You can use it with sharded weekly/hourly/monthly tables as in the example above. You can also use a built-in mechanism of data stores.For instance, BigQuery exposes a special column called _PARTITIONTIME that can be used for that purpose.

To handle that specific model, the pattern to load the data looks like that:

Airflow implementation

Let's move now to the practice and see how the output invalidation pattern can be implemented in Apache Airflow against a PostgreSQL database. First, I will implement the part responsible for creating weekly table and for simplicity I suppose that the loading part starts from a Monday at midnight:

with dag:
    schema = 'output_invalidation_pattern'
    view_name = 'output_invalidation_pattern_view'

    def generate_weekly_table_name(**context):
        execution_date = context['execution_date']
        return 'week_{week_number}'.format(week_number=execution_date.week_of_year)

    wait_for_data_to_add = FileSensor(
        fs_conn_id='fs_default',
        task_id='wait_for_data_to_add',
        poke_interval=20,
        filepath='/tmp/output_invalidation_pattern/{{ts_nodash}}.csv'
    )

    generate_table_name = PythonOperator(
        task_id='generate_table_name',
        provide_context=True,
        python_callable=generate_weekly_table_name
    )

    def retrieve_path_for_table_creation(**context):
        execution_date = context['execution_date']
        is_monday_midnight = execution_date.day_of_week == 1 and execution_date.hour == 0
        return 'remove_table_from_view' if is_monday_midnight else "dummy_task"

    check_if_monday_midnight = BranchPythonOperator(
        task_id='check_if_monday_midnight',
        provide_context=True,
        python_callable=retrieve_path_for_table_creation
    )

    dummy_task = DummyOperator(
        task_id='dummy_task'
    )

    remove_table_from_view = PostgresViewManagerOperator(
        task_id='remove_table_from_view',
        postgres_conn_id='docker_postgresql',
        database=schema,
        view_name=view_name,
        params={'schema': schema},
        sql='create_view_without_table.sql'
    )

    drop_table = PostgresOperator(
        task_id='drop_table',
        postgres_conn_id='docker_postgresql',
        database=schema,
        sql="DROP TABLE IF EXISTS {{ti.xcom_pull(task_ids='generate_table_name')}}"
    )

    create_table = PostgresOperator(
        task_id='create_table',
        postgres_conn_id='docker_postgresql',
        database=schema,
        sql='create_table.sql'
    )

    add_table_to_view = PostgresViewManagerOperator(
        task_id='add_table_to_view',
        postgres_conn_id='docker_postgresql',
        database=schema,
        view_name=view_name,
        params={'schema': schema},
        sql='create_view_with_table.sql'
    )

    add_data_to_table = PostgresCopyOperator(
        task_id='add_data_to_table',
        postgres_conn_id='docker_postgresql',
        database=schema,
        trigger_rule=TriggerRule.ONE_SUCCESS,
        source_file='/tmp/output_invalidation_pattern/{{ ts_nodash }}.csv',
        target_table="{{ ti.xcom_pull(task_ids='generate_table_name') }}"
    )

    wait_for_data_to_add >> generate_table_name >> check_if_monday_midnight >> [dummy_task, remove_table_from_view]
    remove_table_from_view >> drop_table >> create_table >> add_table_to_view
    [add_table_to_view, dummy_task] >> add_data_to_table

The rendered DAG gives the following sequence:

To facilitate my work, I use here 2 simple custom operators, one to create a view from a SQL query and another one to load data to the table:

class PostgresViewManagerOperator(PostgresOperator):
    @apply_defaults
    def __init__(self, view_name, *args, **kwargs):
        super(PostgresViewManagerOperator, self).__init__(*args, **kwargs)
        self.view_name = view_name

    def execute(self, context):
        hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database)
        view_tables = hook.get_records(self.sql)
        if view_tables:
            view_table_names = map(lambda tuple: 'SELECT * FROM {}'.format(tuple[0]), view_tables)
            view_tables_union = ' UNION ALL '.join(view_table_names)
            view_query = 'CREATE OR REPLACE VIEW {view_name} AS ({tables})'.format(
                view_name=self.view_name, tables=view_tables_union)
            hook.run(view_query)
        else:
            hook.run('DROP VIEW {}'.format(self.view_name))



class PostgresCopyOperator(PostgresOperator):
    template_fields = ('source_file', 'target_table')

    @apply_defaults
    def __init__(self, source_file, target_table, *args, **kwargs):
        super(PostgresCopyOperator, self).__init__(sql='x', *args, **kwargs)
        self.source_file = source_file
        self.target_table = target_table

    def execute(self, context):
        hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database)
        hook.bulk_load(table=self.target_table, tmp_file=self.source_file)

The 2 SQL queries are:

The solution fully uses the output invalidation pattern. In the beginning, the task checks the route to follow. If the execution time is a Monday at midnight, it will follow the path of creating a new weekly table and updating the view. The data will be loaded only at the end. On the other hand, if it's not a Monday at midnight, the data will be directly loaded into the weekly table through PostgresCopyOperator.

Even though this pattern is not a pure Apache Spark concept, it integrates pretty well with the data generated by the framework. Together with the overwrite save mode, the output invalidation pattern helps to reduce the maintenance burden and the need for manual interventions of the case of data reprocessing. Thanks to the orchestration framework scheduler, reprocessing the data is only a matter of one click cleaning already computed state and automatically regenerating the execution.