Dealing with time delta in Apache Airflow

Versions: Apache Airflow 1.10.7

Often in batch processing we give the pipeline some time to catch up late data, ie. the pipeline for 9 will be executed only at 11. One of methods to do so in Airflow is to compute delta on the tasks but there is a more "native" way with TimeDeltaSensor.

In the first part of this post I will show the "old" way of doing that. In the next one, I will introduce TimeDeltaSensor as its better alternative.

XCom delta

The first approach of dealing with time delta in Apache Airflow consists on computing the delta from a PythonOperator and passing it to other tasks as an XCom variable:

    def generate_dates(**context):
        execution_date = context['execution_date']
        timestamped_partition_to_load = execution_date - datetime.timedelta(hours=3)
        return timestamped_partition_to_load

    timeshift_date = PythonOperator(
        task_id='timeshift_date',
        provide_context=True,
        python_callable=generate_dates
    )
    printer = PythonOperator(
        task_id='printer',
        python_callable=print_execution_date,
        provide_context=True
    )
    timeshift_date >> printer

A drawback of this approach is of course the overhead to compute and store the XCom. What if we could simply take one of the available macros ({{ ds }}, {{ ts_nodash }}, ...), and use it in our time-sensible pipelines? We could do it if we used TimeDeltaSensor which acts like a "guard" and does not let the pipeline continue as long as the time barrier is not reached.

TimeDeltaSensor

The idea behind TimeDeltaSensor is to add some delay to the execution date. It's quite important (that's why I highlighted) that it's the execution date which is concerned, so for backfilling tasks, the delay won't be applied. You can see that pretty clearly in the poke methodo of the sensor:

        target_dttm = dag.following_schedule(context['execution_date'])
        target_dttm += self.delta
        self.log.info('Checking if the time (%s) has come', target_dttm)
        return timezone.utcnow() > target_dttm

Let's write now a simple DAG and check how does it look:

def print_execution_date(**context):
    execution_date = context['execution_date']
    timeshift_date = context['ti'].xcom_pull(key=None, task_ids='timeshift_date')
    if timeshift_date:
        print('Got execution date {}'.format(timeshift_date))
    else:
        print('Got execution date {}'.format(execution_date))


    guard_sensor = TimeDeltaSensor(
        task_id='guard_sensor',
        delta=datetime.timedelta(hours=2),  # let 3 more hours for late data to arrive (2 + 1h since DAG executes at the end of the schedule interval)
        poke_interval=60*10,  # 10 minutes
        mode='reschedule'
    )
    printer = PythonOperator(
        task_id='printer',
        python_callable=print_execution_date,
        provide_context=True
    )
    guard_sensor >> printer

And in the following video you can see both approaches in action:

As you can see, we can achieve the same thing, so read time-partitioned data not corresponding to the current time, in 2 different ways. The former, "old" one, works in now in the past mode whereas the latter one stays in the same time period. Apart from the XCom storage, an important difference between them is the readability. For the "old" approach you have to know the timeshifting duration and often will need to go to the DAG itself and compute on your own the data that is processed. For the "new" approach this problem doesn't exist since the execution date represents the data that is processed.