Apache Airflow gotchas

Versions: Apache Airflow 1.10.3

From time to time I try to help other people on StackOverflow and one of my tagged topics is Apache Airflow. In this blog post I'll try to show you some problems I saw there last few months.

In the next short sections I will describe one problem that you can have as a new user of Apache Airflow. Every section will contain an explanation of the issue and a video to illustrate it.

Start date is now

The first gotcha is about start_date attribute of the DAG. As its name indicates, this property defines when the DAG will start and it can be on the past or the future as well, depending on your use case. However, it cannot be just now because your DAG won't run because of an The execution date is 2020-01-23T05:18:41.189291+00:00 but this is before the task's start date 2020-01-23T05:19:54.986488+00:00 error. I think the message is pretty meaningful. Your start_date is set to now, so it changes every second. A single way to schedule such a DAG is to trigger it manually. But it will only work for the scheduling because the tasks will never be triggered for the execution. The start_date will be always moving and the scheduler will compare it to the execution_date from the past, which will result in the already quoted error:

class ExecDateAfterStartDateDep(BaseTIDep):
    NAME = "Execution Date"
    IGNOREABLE = True

    @provide_session
    def _get_dep_statuses(self, ti, session, dep_context):
        if ti.task.start_date and ti.execution_date < ti.task.start_date:
            yield self._failing_status(
                reason="The execution date is {0} but this is before the task's start "
                "date {1}.".format(
                    ti.execution_date.isoformat(),
                    ti.task.start_date.isoformat()))

Let's see this error in this short video:

Changing start date for already running DAG

The second issue related to the start date attribute is about the anti-pattern of "I deploy my DAG - I let it running - Tomorrow I change its start date to a date more in the past". If you do that, your new start date won't be taken into account. Why? Simply because Apache Airflow resolves the next execution date from that algorithm:

So in our anti-pattern, since the DAG already has been executed, the scheduler will use the second bullet point to figure out the new execution date. The following snippet from airflow.jobs.scheduler_job.SchedulerJob#create_dag_run proves that:

if not last_scheduled_run:
    # First run
    task_start_dates = [t.start_date for t in dag.tasks]
    if task_start_dates:
        next_run_date = dag.normalize_schedule(min(task_start_dates))
        self.log.debug(
            "Next run date based on tasks %s",
            next_run_date
        )
else:
    next_run_date = dag.following_schedule(last_scheduled_run)


# airflow.models.dag.DAG#following_schedule
def following_schedule(self, dttm):
  # ...
  if isinstance(self._schedule_interval, six.string_types):
    # for simplicity, let's focus here only
    # on the not string-based cron expression
  elif self._schedule_interval is not None:
    return dttm + self._schedule_interval

As previously, let's see that in this short demo:

TriggerRule.ALL_DONE and incorrect DAG run result

Let's imagine that we're running a pipeline with cloud resources and in the last step we want to clean up everything, independently on the outcome of the previous steps. One way to do that is to use TriggerRule.ALL_DONE as trigger_rule attribute. However, it's a little bit dangerous because, if any parent task fails, the last task will execute - which is something we want - but this last correct execution will put the DAG status to SUCCESS! Let's see how is it possible with this code snippet:

dag = DAG(
    dag_id='bad_dag_status',
    start_date=datetime(2020, 1, 3, 0, 0),
    schedule_interval='@hourly'
)

with dag:
    create_resource1 = DummyOperator(task_id='create_resource1')
    create_resource2 = DummyOperator(task_id='create_resource2')
    use_resources = DummyOperator(task_id='use_resources')
    # run dag & mark user_resources as failed
    clean_resources = DummyOperator(task_id='clean_resources', trigger_rule=TriggerRule.ALL_DONE)

    create_resource1 >> create_resource2 >> use_resources >> clean_resources

Why DAG run behaves so? How to fix that? To find the answer, let's take a look at DagRun class and its update_state method called by SchedulerJob#_process_task_instances function:

leaf_tis = [ti for ti in tis if ti.task_id in {t.task_id for t in dag.leaves}]

# if all roots finished and at least one failed, the run failed
if not unfinished_tasks and any(
    leaf_ti.state in {State.FAILED, State.UPSTREAM_FAILED} for leaf_ti in leaf_tis
):
    # ...
    self.set_state(State.FAILED)
# if all leafs succeeded and no unfinished tasks, the run succeeded
elif not unfinished_tasks and all(
    leaf_ti.state in {State.SUCCESS, State.SKIPPED} for leaf_ti in leaf_tis
):
    # ...
    self.set_state(State.SUCCESS)
# if *all tasks* are deadlocked, the run failed
elif (unfinished_tasks and none_depends_on_past and
      none_task_concurrency and no_dependencies_met):
    # ...
    self.set_state(State.FAILED)
# finally, if the roots aren't done, the dag is still running
else:
    self.set_state(State.RUNNING)

If you watch carefully, you can see that the DAG status is conditioned by the last tasks, represented in the snippet by leaf_tis variable. Later, from the comments, and more exactly from this one # if all leafs succeeded and no unfinished tasks, the run succeeded, you can deduce why the DAG state is set to SUCCESS.

So, an easy way to fix this is to add a dummy task at the end of the DAG that will be triggered. The new DAG will look like that:

with dag:
    create_resource1 = DummyOperator(task_id='create_resource1')
    create_resource2 = DummyOperator(task_id='create_resource2')
    use_resources = DummyOperator(task_id='use_resources')
    # run dag & mark user_resources as failed
    clean_resources = DummyOperator(task_id='clean_resources', trigger_rule=TriggerRule.ALL_DONE)
    dag_status_setter = DummyOperator(task_id='dag_status_setter')

    create_resource1 >> create_resource2 >> use_resources >>  [clean_resources, dag_status_setter]

Below you can find a quick demonstration of the issue and its possible fix:

Adding a task for already running DAG

Another interesting gotcha is when you have a running DAG and you want to add a new task somewhere in the middle of the processing. Sometimes it can lead to unexpected (from your point of view) behavior. Let's take an example. Here, I've 3 tasks executed sequentially:

dag = DAG(
    dag_id='new_task_running_dag',
    start_date=datetime(2020, 1, 3, 0, 0),
    schedule_interval='@hourly'
)

with dag:
    task_1 = DummyOperator(task_id='task_1')
    task_2 = DummyOperator(task_id='task_2') 
    task_3 = DummyOperator(task_id='task_3')
    task_4 = DummyOperator(task_id='task_4')

    task_1 >> task_2 >>  task_3 >> task_4

If in the future you will want to add a task between task_2 and task_3, it won't be called for terminated DAG runs. You can see this in the following video:

Execution date at the end of the schedule

This point is also a little bit misleading for the ones who start to work with Apache Airflow. The scheduler will trigger a new DAG execution at the end of the schedule. Even if you set the schedule interval to one specific day in a month, the scheduler will take this DAG for execution only before the next month's date. For instance, if you want to execute your processing every 7th day of the month, the execution for 07/01/2020 will be made next month! Below you can see the video showing that:

XCom overuse

Another gotcha I've observed is related to XCom variables. An XCom is a way to exchange small chunks of dynamically generated data between tasks. "Dynamic" means here that the data is generated within the context of DAG execution, for example when you're using current execution time to figure out the name of your time-series table or location of a time partitioned data. Such computed XCom is available for all subsequent tasks within the scope of current execution.

The problem with XCom that it's sometimes used to exchange really big volumes of data. The issue with it is that XCom is stored on metadata store of Airflow and having a lot of stored data may cause some performance issues.

Another point related to XCom, less obvious than the previous one, is that XCom is used everywhere. Let's suppose that you have one DAG factory method which generates different outputs for 2 different data processes. In that case, you don't need to pass the variables through XCom if they don't contain information related to the DAG execution. Below you can find an illustration for the anti-pattern of XCom use:

dag = DAG(
    dag_id='xcom_overuse',
    start_date=datetime(2020, 1, 3, 0, 0),
    schedule_interval='@hourly'
)

with dag:
    def generate_alias():
        return 'test_alias'

    def print_with_alias(**context):
        alias = context['ti'].xcom_pull(task_ids=task_1.task_id, key=None)
        return 'data_{}'.format(alias)

    task_1 = PythonOperator(task_id='task_1', python_callable=generate_alias)
    task_2 = PythonOperator(task_id='task_2', provide_context=True, python_callable=print_with_alias)
    task_3 = PythonOperator(task_id='task_3', provide_context=True, python_callable=print_with_alias)

    task_1 >> task_2 >> task_3

And here you can find a more correct version using params:

with dag:
    def print_with_alias(**context):
        return 'data_{}'.format(context['params']['alias'])

    alias = 'test_alias'
    task_2 = PythonOperator(
        task_id='task_2',
        provide_context=True,
        python_callable=print_with_alias,
        params={'alias': alias}
    )
    task_3 = PythonOperator(
        task_id='task_3',
        provide_context=True,
        python_callable=print_with_alias,
        params={'alias': alias}
    )

    task_2 >> task_3

In this example I used PythonOperator but there are many others like PostgresqlOperator, which accept static parameters that should be shareable among different tasks of the project.

Every new tool brings its own traps. A lot of them in Apache Airflow is related to the dates. Schedule interval executed at the end, even for rarely executed pipelines, start date that cannot be changed for an already running DAG or misuse of XCom is only a few I met. If you have yours, feel free to comment. I'm really curious about what was difficult for you with your first steps with Apache Airflow!


If you liked it, you should read:

📚 Newsletter Get new posts, recommended reading and other exclusive information every week. SPAM free - no 3rd party ads, only the information about waitingforcode!