How to use an execution branch depending on the previous results ?

Apache Airflow is a very flexible orchestration framework. You can execute the operations depending on the conditional branches or, as you will see below, on the previous tasks results.

The execution of given task can be conditioned by the results of previous tasks with the trigger_rule attribute. It can take one of the following values:

  • all_success
  • all_failed
  • all_done
  • one_failed
  • one_success
  • none_failed
  • dummy
You will find the exact definition of each of them in the documentation https://airflow.apache.org/concepts.html#trigger-rules Here I will focus on the example. Let's suppose that your DAG starts a cloud resource, does some computation and stops the resource at the end, independently on the final result:
computation_executor = ...

terminate_resource_operator_success = TerminateCloudResourceOperator(
    task_id=terminate_resource_operator_success,
    resource_id="{{ task_instance.xcom_pull(create_resource, key='return_value') }}",
    dag=dag,
    trigger_rule=TriggerRule.ALL_SUCCESS,
)

terminate_resource_operator_failure = TerminateCloudResourceOperator(
    task_id=terminate_resource_operator_failure,
    resource_id="{{ task_instance.xcom_pull(create_resource, key='return_value') }}",
    dag=dag,
    trigger_rule=TriggerRule.ONE_FAILED,
)

... computation_executor > terminate_resource_operator_failure
... computation_executor > terminate_resource_operator_success

If you don't specify the terminate_resource_operator_failure operator, the DAG will terminate when the computation_executor fails, so without stopping the created resource. As an alternative to above solution, you can also use TriggerRule.ALL_DONE for the termination operator trigger.