Often in batch processing we give the pipeline some time to catch up late data, ie. the pipeline for 9 will be executed only at 11. One of methods to do so in Airflow is to compute delta on the tasks but there is a more "native" way with TimeDeltaSensor.
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
In the first part of this post I will show the "old" way of doing that. In the next one, I will introduce TimeDeltaSensor as its better alternative.
XCom delta
The first approach of dealing with time delta in Apache Airflow consists on computing the delta from a PythonOperator and passing it to other tasks as an XCom variable:
def generate_dates(**context): execution_date = context['execution_date'] timestamped_partition_to_load = execution_date - datetime.timedelta(hours=3) return timestamped_partition_to_load timeshift_date = PythonOperator( task_id='timeshift_date', provide_context=True, python_callable=generate_dates ) printer = PythonOperator( task_id='printer', python_callable=print_execution_date, provide_context=True ) timeshift_date >> printer
A drawback of this approach is of course the overhead to compute and store the XCom. What if we could simply take one of the available macros ({{ ds }}, {{ ts_nodash }}, ...), and use it in our time-sensible pipelines? We could do it if we used TimeDeltaSensor which acts like a "guard" and does not let the pipeline continue as long as the time barrier is not reached.
TimeDeltaSensor
The idea behind TimeDeltaSensor is to add some delay to the execution date. It's quite important (that's why I highlighted) that it's the execution date which is concerned, so for backfilling tasks, the delay won't be applied. You can see that pretty clearly in the poke methodo of the sensor:
target_dttm = dag.following_schedule(context['execution_date']) target_dttm += self.delta self.log.info('Checking if the time (%s) has come', target_dttm) return timezone.utcnow() > target_dttm
Let's write now a simple DAG and check how does it look:
def print_execution_date(**context): execution_date = context['execution_date'] timeshift_date = context['ti'].xcom_pull(key=None, task_ids='timeshift_date') if timeshift_date: print('Got execution date {}'.format(timeshift_date)) else: print('Got execution date {}'.format(execution_date)) guard_sensor = TimeDeltaSensor( task_id='guard_sensor', delta=datetime.timedelta(hours=2), # let 3 more hours for late data to arrive (2 + 1h since DAG executes at the end of the schedule interval) poke_interval=60*10, # 10 minutes mode='reschedule' ) printer = PythonOperator( task_id='printer', python_callable=print_execution_date, provide_context=True ) guard_sensor >> printer
And in the following video you can see both approaches in action:
As you can see, we can achieve the same thing, so read time-partitioned data not corresponding to the current time, in 2 different ways. The former, "old" one, works in now in the past mode whereas the latter one stays in the same time period. Apart from the XCom storage, an important difference between them is the readability. For the "old" approach you have to know the timeshifting duration and often will need to go to the DAG itself and compute on your own the data that is processed. For the "new" approach this problem doesn't exist since the execution date represents the data that is processed.