Apache Airflow is a very flexible orchestration framework. You can execute the operations depending on the conditional branches or, as you will see below, on the previous tasks results.
The execution of given task can be conditioned by the results of previous tasks with the trigger_rule attribute. It can take one of the following values:
computation_executor = ... terminate_resource_operator_success = TerminateCloudResourceOperator( task_id=terminate_resource_operator_success, resource_id="{{ task_instance.xcom_pull(create_resource, key='return_value') }}", dag=dag, trigger_rule=TriggerRule.ALL_SUCCESS, ) terminate_resource_operator_failure = TerminateCloudResourceOperator( task_id=terminate_resource_operator_failure, resource_id="{{ task_instance.xcom_pull(create_resource, key='return_value') }}", dag=dag, trigger_rule=TriggerRule.ONE_FAILED, ) ... computation_executor > terminate_resource_operator_failure ... computation_executor > terminate_resource_operator_success
If you don't specify the terminate_resource_operator_failure operator, the DAG will terminate when the computation_executor fails, so without stopping the created resource. As an alternative to above solution, you can also use TriggerRule.ALL_DONE for the termination operator trigger.