Managing task dependencies - data or triggering?
In this blog post I will consider these 2 options and try to see the use context of everyone. And I will start with the data sensor.
The idea behind data sensors is to waiting for a specific data to be created. For instance, you can use airflow.sensors.s3_key_sensor.S3KeySensor to check whether an object is created on S3. It can be particularly useful for Apache Spark pipelines which, at the end of a successful processing, create a file called _SUCCESS. Now, if you want to start your other pipeline directly after the first one, you can use one of available sensors (e.g. S3KeySensor) and start to process as soon as this _SUCCESS is created. Below you can find an example with FileSensor:
file_sensor_dag = DAG( dag_id='data_sensor', start_date=datetime(2020, 1, 24, 0, 0), schedule_interval='* * * * *' ) file_sensor = FileSensor( task_id='minute_file_sensor', dag=file_sensor_dag, filepath="/tmp/test_file.txt", retries=100, retry_delay=timedelta(seconds=30), mode='reschedule' ) task_1 = DummyOperator(task_id='task_1', dag=file_sensor_dag) file_sensor >> task_1
Data sensor brings this nice possibility to decouple DAGs. Today you can generate the data needed by DAGA with DAGB but tomorrow you can do it with DAGC, or even export the generation logic somewhere else, without any problem.
However, data-based sensors aren't easy to implement for everything. Let's imagine that you write the data into a relational database. How can you estimate the completeness of your writing? You will probably need to either implement your sensor, or extend the generation by the logic managing the listened data part, for instance by generating an "_SUCCESS"-like file in your own.
Data-sensors can also be hard to implement for the case of continuously arriving data. Let's imagine that you synchronize your Apache Kafka topic to S3 and want to start the processing once all data for a given hour is written. FYI, it's also hard, and maybe even harder, to do with DAG sensor. An idea to handle that could be a sensor that checks the time of the last written object for the hour key and compares it with some allowed latency. If the difference goes far beyond the latency, it means that the processing can start. Otherwise, it has to wait.
The second category of sensors are DAG sensors and in Apache Airflow they can be implemented with airflow.sensors.external_task_sensor.ExternalTaskSensor. The idea here is to use the specific execution of a task as the trigger for our DAG's execution. Below you can find an example:
dag_source = DAG( dag_id='dag_sensor_source', start_date=datetime(2020, 1, 24, 0, 0), schedule_interval='* * * * *' ) task_1 = DummyOperator(task_id='task_1', dag=dag_source) dag_target = DAG( dag_id='dag_sensor_target', start_date=datetime(2020, 1, 24, 0, 0), schedule_interval='* * * * *' ) task_sensor = ExternalTaskSensor( dag=dag_target, task_id='dag_sensor_source_sensor', retries=100, retry_delay=timedelta(seconds=30), mode='reschedule', external_dag_id='dag_sensor_source', external_task_id='task_1' ) task_1 = DummyOperator(task_id='task_1', dag=dag_target) task_sensor >> task_1
When to use a DAG sensor? If you want to enforce the dependencies and control that a given DAG will always execute after another, ExternalTaskSensor seems to be a better option than any data sensor. By using it you will enforce yourself today and yourself from the future to think like "I have to move this DAG but wait, it's used here as data provider". In other words, it simplifies - if you don't have any efficient data lineage tool - the vision of the data producers and consumers in your ETL.
However, you can also have situations where having such enforced constraint because it will require much more effort to evolve things.
As you can see then, there is not a single YES/NO answer to the question. The right answer is "IT DEPENDS". It depends on your use case, what is your goal, a highly or loosely coupled system? Is your data easy to listen to? Depending on your answers, you will probably choose one option or another.