My last slides of Spark Summit 2019 were dedicated to an output invalidation pattern that is very useful to build maintainable data pipelines. In this post I will deep delve into it.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
In this post I will go through 3 parts to first show how we can but shouldn't load data into data warehouse stores. Later I will develop more the idea of "table partitions as immutable objects" whereas in the last part I will give you a sample implementation of this pattern with Apache Airflow.
What's hard with loading big volumes of data?
The simplest pattern for loading data into data warehouse storages is presented in the following schema:
The schema is very simple. We generate the data on one side and load it directly to the final destination. Easy peasy, but it will work only if you are the perfect programmer and never make mistakes. If it's not the case, you will need to take into account the aspect of data reprocessing. Let's imagine that you're integrating data coming from your partners but they had a problem in the data generation part. They resend you the data and you must reload it into your data store. The problem is that new data is completely different. You need to rollback your previous load and to do that, you have several choices:
- delete all rows present in the previous file
- reload everything from the beginning into a new table
- ...
I'm not a fan of neither of these 2 approaches. Deleting rows one by one in modern data warehouse solutions is costly and can take time. Reloading everything from the beginning can also take time and it's possible only if you archive your raw data somewhere. Fortunately, there is a data engineering pattern that can help you to overcome that and reduce the maintenance burden.
"Table partitions as immutable objects"
The idea comes from Maxime Beauchemin's "Functional Data Engineering - a modern paradigm for batch data processing" post. The goal is to have an immutable data store and use these immutable components to expose the final data, that way:
CREATE VIEW all_weeks AS SELECT a, b FROM week_1 UNION ALL SELECT a, b FROM week_2 UNION ALL SELECT a, b FROM week_3
The immutability in that context simply means that none of the in-place changes like updates or deletes are allowed. We shouldn't allow appends either but it would mean to create one table per load and could be highly inefficient due to the metadata limitations of data stores (eg. max number of tables). To sum up, we deny DELETE and UPDATE and only allow new data.
In our example the data will be therefore loaded at a weekly basis. It also means that it will be reloaded at that same schedule. So even if you have one dataset to load, you'll need to copy everything from the beginning of the week. Even if it seems suboptimal at first glance, at the end you may end up with much more performant loading time since TRUNCATE or DROP operations that will be applied on the weekly tables, are metadata operations and they tend to be much faster than random deletes.
You can use it with sharded weekly/hourly/monthly tables as in the example above. You can also use a built-in mechanism of data stores.For instance, BigQuery exposes a special column called _PARTITIONTIME that can be used for that purpose.
To handle that specific model, the pattern to load the data looks like that:
Airflow implementation
Let's move now to the practice and see how the output invalidation pattern can be implemented in Apache Airflow against a PostgreSQL database. First, I will implement the part responsible for creating weekly table and for simplicity I suppose that the loading part starts from a Monday at midnight:
with dag: schema = 'output_invalidation_pattern' view_name = 'output_invalidation_pattern_view' def generate_weekly_table_name(**context): execution_date = context['execution_date'] return 'week_{week_number}'.format(week_number=execution_date.week_of_year) wait_for_data_to_add = FileSensor( fs_conn_id='fs_default', task_id='wait_for_data_to_add', poke_interval=20, filepath='/tmp/output_invalidation_pattern/{{ts_nodash}}.csv' ) generate_table_name = PythonOperator( task_id='generate_table_name', provide_context=True, python_callable=generate_weekly_table_name ) def retrieve_path_for_table_creation(**context): execution_date = context['execution_date'] is_monday_midnight = execution_date.day_of_week == 1 and execution_date.hour == 0 return 'remove_table_from_view' if is_monday_midnight else "dummy_task" check_if_monday_midnight = BranchPythonOperator( task_id='check_if_monday_midnight', provide_context=True, python_callable=retrieve_path_for_table_creation ) dummy_task = DummyOperator( task_id='dummy_task' ) remove_table_from_view = PostgresViewManagerOperator( task_id='remove_table_from_view', postgres_conn_id='docker_postgresql', database=schema, view_name=view_name, params={'schema': schema}, sql='create_view_without_table.sql' ) drop_table = PostgresOperator( task_id='drop_table', postgres_conn_id='docker_postgresql', database=schema, sql="DROP TABLE IF EXISTS {{ti.xcom_pull(task_ids='generate_table_name')}}" ) create_table = PostgresOperator( task_id='create_table', postgres_conn_id='docker_postgresql', database=schema, sql='create_table.sql' ) add_table_to_view = PostgresViewManagerOperator( task_id='add_table_to_view', postgres_conn_id='docker_postgresql', database=schema, view_name=view_name, params={'schema': schema}, sql='create_view_with_table.sql' ) add_data_to_table = PostgresCopyOperator( task_id='add_data_to_table', postgres_conn_id='docker_postgresql', database=schema, trigger_rule=TriggerRule.ONE_SUCCESS, source_file='/tmp/output_invalidation_pattern/{{ ts_nodash }}.csv', target_table="{{ ti.xcom_pull(task_ids='generate_table_name') }}" ) wait_for_data_to_add >> generate_table_name >> check_if_monday_midnight >> [dummy_task, remove_table_from_view] remove_table_from_view >> drop_table >> create_table >> add_table_to_view [add_table_to_view, dummy_task] >> add_data_to_table
The rendered DAG gives the following sequence:
To facilitate my work, I use here 2 simple custom operators, one to create a view from a SQL query and another one to load data to the table:
class PostgresViewManagerOperator(PostgresOperator): @apply_defaults def __init__(self, view_name, *args, **kwargs): super(PostgresViewManagerOperator, self).__init__(*args, **kwargs) self.view_name = view_name def execute(self, context): hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database) view_tables = hook.get_records(self.sql) if view_tables: view_table_names = map(lambda tuple: 'SELECT * FROM {}'.format(tuple[0]), view_tables) view_tables_union = ' UNION ALL '.join(view_table_names) view_query = 'CREATE OR REPLACE VIEW {view_name} AS ({tables})'.format( view_name=self.view_name, tables=view_tables_union) hook.run(view_query) else: hook.run('DROP VIEW {}'.format(self.view_name)) class PostgresCopyOperator(PostgresOperator): template_fields = ('source_file', 'target_table') @apply_defaults def __init__(self, source_file, target_table, *args, **kwargs): super(PostgresCopyOperator, self).__init__(sql='x', *args, **kwargs) self.source_file = source_file self.target_table = target_table def execute(self, context): hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database) hook.bulk_load(table=self.target_table, tmp_file=self.source_file)
The 2 SQL queries are:
# create_view_without_table.sql SELECT table_name FROM information_schema.tables WHERE table_catalog='{{ params.schema }}' AND table_schema = 'public' AND table_type = 'BASE TABLE' AND table_name != '{{ ti.xcom_pull(task_ids='generate_table_name') }}'
# create_view_with_table.sql SELECT table_name FROM information_schema.tables WHERE table_catalog ='{{ params.schema }}' AND table_schema = 'public' AND table_type = 'BASE TABLE' AND table_name LIKE 'week_%'
The solution fully uses the output invalidation pattern. In the beginning, the task checks the route to follow. If the execution time is a Monday at midnight, it will follow the path of creating a new weekly table and updating the view. The data will be loaded only at the end. On the other hand, if it's not a Monday at midnight, the data will be directly loaded into the weekly table through PostgresCopyOperator.
Even though this pattern is not a pure Apache Spark concept, it integrates pretty well with the data generated by the framework. Together with the overwrite save mode, the output invalidation pattern helps to reduce the maintenance burden and the need for manual interventions of the case of data reprocessing. Thanks to the orchestration framework scheduler, reprocessing the data is only a matter of one click cleaning already computed state and automatically regenerating the execution.