Dealing with time delta in Apache Airflow

Often in batch processing we give the pipeline some time to catch up late data, ie. the pipeline for 9 will be executed only at 11. One of methods to do so in Airflow is to compute delta on the tasks but there is a more "native" way with TimeDeltaSensor.

4-day workshop · In-person or online

What would it take for you to trust your Databricks pipelines in production?

A 3-day bug hunt on a 3-person team costs up to €7,200 in lost engineering time. This workshop teaches you to prevent that — unit tests, data tests, and integration tests for PySpark and Databricks Lakeflow, including Spark Declarative Pipelines.

Unit, data & integration tests
Medallion architecture & Lakeflow SDP
Max 10 participants · production-ready templates
See the full curriculum → €7,000 flat fee · cohort of up to 10
Bartosz Konieczny
Bartosz
Konieczny

In the first part of this post I will show the "old" way of doing that. In the next one, I will introduce TimeDeltaSensor as its better alternative.

XCom delta

The first approach of dealing with time delta in Apache Airflow consists on computing the delta from a PythonOperator and passing it to other tasks as an XCom variable:

    def generate_dates(**context):
        execution_date = context['execution_date']
        timestamped_partition_to_load = execution_date - datetime.timedelta(hours=3)
        return timestamped_partition_to_load

    timeshift_date = PythonOperator(
        task_id='timeshift_date',
        provide_context=True,
        python_callable=generate_dates
    )
    printer = PythonOperator(
        task_id='printer',
        python_callable=print_execution_date,
        provide_context=True
    )
    timeshift_date >> printer

A drawback of this approach is of course the overhead to compute and store the XCom. What if we could simply take one of the available macros ({{ ds }}, {{ ts_nodash }}, ...), and use it in our time-sensible pipelines? We could do it if we used TimeDeltaSensor which acts like a "guard" and does not let the pipeline continue as long as the time barrier is not reached.

TimeDeltaSensor

The idea behind TimeDeltaSensor is to add some delay to the execution date. It's quite important (that's why I highlighted) that it's the execution date which is concerned, so for backfilling tasks, the delay won't be applied. You can see that pretty clearly in the poke methodo of the sensor:

        target_dttm = dag.following_schedule(context['execution_date'])
        target_dttm += self.delta
        self.log.info('Checking if the time (%s) has come', target_dttm)
        return timezone.utcnow() > target_dttm

Let's write now a simple DAG and check how does it look:

def print_execution_date(**context):
    execution_date = context['execution_date']
    timeshift_date = context['ti'].xcom_pull(key=None, task_ids='timeshift_date')
    if timeshift_date:
        print('Got execution date {}'.format(timeshift_date))
    else:
        print('Got execution date {}'.format(execution_date))


    guard_sensor = TimeDeltaSensor(
        task_id='guard_sensor',
        delta=datetime.timedelta(hours=2),  # let 3 more hours for late data to arrive (2 + 1h since DAG executes at the end of the schedule interval)
        poke_interval=60*10,  # 10 minutes
        mode='reschedule'
    )
    printer = PythonOperator(
        task_id='printer',
        python_callable=print_execution_date,
        provide_context=True
    )
    guard_sensor >> printer

And in the following video you can see both approaches in action:

As you can see, we can achieve the same thing, so read time-partitioned data not corresponding to the current time, in 2 different ways. The former, "old" one, works in now in the past mode whereas the latter one stays in the same time period. Apart from the XCom storage, an important difference between them is the readability. For the "old" approach you have to know the timeshifting duration and often will need to go to the DAG itself and compute on your own the data that is processed. For the "new" approach this problem doesn't exist since the execution date represents the data that is processed.

Data Engineering Design Patterns

Looking for a book that defines and solves most common data engineering problems? I wrote one on that topic! You can read it online on the O'Reilly platform, or get a print copy on Amazon.

I also help solve your data engineering problems contact@waitingforcode.com đź“©