How to orchestrate your data pipelines on the cloud? Often, you will have a possibility to use managed Open Source tools like Cloud Composer on GCP or Amazon Managed Workflows for Apache Airflow on AWS. Sometimes, you will need to use cloud services like for Azure and its Data Factory orchestrator. Is it complicated to create Data Factory pipelines with the Apache Airflow knowledge? We'll see that in this blog post.
In this blog post, I will focus on mapping Azure Data Factory control flows to Apache Airflow components.
The first Data Factory component mapped to an Apache Airflow operator will be a conditional activity. By this term, I mean 2 things. The first of them is the If Condition activity. It's nothing else than the if-else expression implemented as a Data Factory component. So to use it, you define a condition that evaluates to true or false. For each of these outcomes, you define one or multiple other activities that will start after the evaluation step.
But how to create a router to dispatch the workflow into an appropriate subgraph, depending on the condition? Instead of the If Condition, we should use the Switch activity. It has a similar construction to the If one, though. You have to define an expression and the cases that should match the outcome of this expression. The case matching the outcome will start another activity. To sum up, you will find a high-level picture of them here:
What about Apache Airflow? Both Data Factory's activities can be implemented with a BranchPythonOperator that can contain a various number of conditions. To avoid noise and interruptions, I will put the demo code at the end of the blog post.
Apart from the conditionals, Data Factory also has an activity that will filter the received input. This activity is called Filter activity and as for 2 previous ones, you have to define an expression that will apply on top of the received items. These items can be hardcoded or be an output of another activity.
An equivalent in Apache Airflow? I didn't find a pure filtering operator, but you can implement it in various ways. We can use the most primitive PythonOperator returning a list of items as an XCom, if there are not a lot of items to filter. Or, we could also do something more complicated and implement this filtering logic as an internal detail of a custom operator.
The 3rd activity is slightly more complicated to express in Apache Airflow. It's called Until activity and its main goal is to execute an activity as long as it doesn't evaluate to true or expires.
When it comes to the "waiting" in Apache Airflow, we can often use a sensor. A sensor verifies whether the specific condition evaluates to true, exactly like the Until activity. If it doesn't happen, it retries after a poke interval. If the sensor doesn't get a positive outcome within the specified timeout, it fails.
In a real-world scenario, it could look like:
The last activity presented before the demo is the Foreach activity. I was thinking for a while if there is any equivalent in Apache Airflow. In a Data Factory, a Foreach activity takes an input of elements and processes them one by one in one of the multiple activities as a for loop.
On one hand, I would say that we can implement the same thing in Apache Airflow. After all, we can define a DAG with the help of a for loop. The difference with the Foreach activity is that this operation will generate a static execution graph. In contrast, in Data Factory, the flow will depend on the input parameters. Thus the graph can be created dynamically by a previous task.
I found a lot of interesting links in the answers to Proper way to create dynamic workflows in Airflow question on StackOverflow, but none of them seems to do the same thing as the Foreach activity. Even though some of them look like a valid solution to the problem, they're rather tricky and not natively implemented in the framework.
However, if you analyze the problem from a different angle, you will see that exactly as for the filtering, the foreach loop can be implemented as a separate custom operator or, if not, as a part of any other operator. Therefore, instead of creating x tasks dynamically, we'll take the input for these asks and pass it to the data processing part like an Apache Spark job. And that's the approach I will present in the demo.
And this so long expected demo where you will find all of the 4 presented control flows in Apache Airflow is just below:
In the blog post, I wanted to share with you how to reason about Apache Airflow pipelines if you worked only with Azure Data Factory or inversely. As you saw, the presented concepts were quite similar; some of them could be defined in more or less flexible ways, but generally, we could end up with the same thing. However, please do not get me wrong. I'm not saying that both are the same! If you take a deeper look, you will certainly see the differences, and it can be a good follow-up exercise if you appreciated this blog post and have plans to discover one of them.