Zichen Jiang
A brief background of Airflow
Airflow was started by Airbnb in 2014. In 2016 it became an Apache incubator and in 2019 it was adopted as an Apache software foundation project. It is a platform written in Python to schedule and monitor workflows programmatically. It is designed to execute a series of tasks following specified dependencies on a specified schedule. The platform has a rich user interface visualizing all the running workflows, making it easy to monitor and troubleshoot issues. Workflows are defined as code, allowing them to be easily maintained, versioned, and tested.
How does Apache Airflow work?
In Airflow, a workflow is defined as a DAG (Directed Acyclic Graph), which contains individual units of work called Tasks. Tasks have dependencies and branches.
An Airflow instance is usually composed of a scheduler, an executor, a webserver, and a metadata database. The scheduler triggers scheduled workflows and sends tasks to the executor. The executor runs the tasks. The webserver hosts a user interface to allow users to trigger and debug DAGs. And the metadata database stores state, logs, and settings from all the components.
All DAGs are defined in Python and are highly extensible. You can define your own tasks that suits your environment. The workflows can be triggered on schedule or by an external event. And an Airflow instance can be easily deployed to any cloud service.
Why Apache Airflow?
- Airflow workflows are configuration as code in Python. Developers can write code that instantiates pipeline dynamically.
- You can easily define your own operators, executors, and extend the library to suit your needs.
- Airflow is modular and can orchestrate any number of workers, making it highly scalable.
- You can easily convert your existing workflow into a DAG due to its wide range of Operators.
- Workflows can be triggered by specific schedule or any external events.
- Airflow provides an analytical dashboard to help you optimize your workflow.
- You can run multiple data pipelines at different schedule in one Airflow instance.
Available Operators
An Operator is a template for a predefined Task that can be defined declaratively inside the DAG. There are many kinds of operator available in Airflow to cover your basic needs, such as:
- BashOperator – executes bash command
- PythonOperator – calls a Python function
- KubernetesPodOperator – create and run a Pod on a Kubernetes cluster
- EmailOperator – sends an email
- SimpleHttpOperator – sends an HTTP request
You can also install more operators from Provider packages to further extend Airflow’s functionalities.
A Brief Example
If you don’t already have Airflow running, you can start an instance in 10 minutes by following this guide: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html. You will need to have Docker installed for this guide.
Here is an example data pipeline taken from a project that Levio built. In this example, the pipeline consists of the following steps:
- Wait for a zip file.
- Extract the content of the zip file.
- Run the files through Java and Python programs to extract information.
- Call an API to score the files based on extracted information.
- Delete the received zip file and all extracted files.
- Send success email to developers once the pipeline has finished successfully.
We will take a closer look at the code of the above DAG. In the below code, we are initializing a DAG object with predefined arguments. The depends_on_past argument tells the pipeline whether it should run depend on the result of the previous run. If it is set to True, the pipeline will only start if the previous run was successful. The concurrency argument limits how many instances of this pipeline can run at the same time. The schedule_interval argument uses con job format. In the example, the pipeline runs every day at 4AM.
args = {
"owner": "levio",
"email": ["info@levio.ca"],
"depends_on_past": False,
"start_date": "2021-08-03",
"email_on_failure": True,
"email_on_retry": True,
"retries": 2,
}
dag = DAG(
dag_id="example_pipeline",
default_args=args,
concurrency = 1,
schedule_interval="0 4 * * *")
Moving on to the first task, the zipfile_sensor, we have a FileSensor task that checks every 30 seconds to see if the file /input/pdf_files.zip exists. If it does, it will initiate the next task. Note that you must set the parameter in every task to tell it which DAG object the task belongs to.
zipfile_sensor = FileSensor(
task_id="zipfile_sensor",
poke_interval=30,
fs_conn_id="fs_conn_1",
filepath="/input/pdf_files.zip",
dag=dag,
)
The fs_conn_id parameter refers to the “Conn id” of a connection that you need to set up via the Airflow web interface in the Connections page under Admin menu.
Airflow Connection Page
Once you click on the blue + button, you are brought to a page to set up a new connection. There are dozens of different connection types you can choose from. Once you have one set up, you can refer to this connection from your task by their “Conn id”
Airflow Add Connection
For the unzip_files task, we have a simple BashOperator that calls a bash command to unzip the received zip file. Of course, you can write more complicated bash script and execute it here by calling them, such as bash_command=”bash your_custom_script.sh”.
unzip_files = BashOperator(
task_id="unzip_files",
bash_command="unzip pdf_files.zip",
dag=dag,
)
After decompressing the zip file, we will create a pod in the Kubernetes cluster that runs a Java program to convert the PDF files into images. The parameters for KubernetesPodOperator are very similar to the ones you would put in a yaml file to create the pod. The is_delete_operator_pod determines whether the pod should be deleted from the cluster once it finishes its job. And the image_pull_secrets is an environment variable you set in the Airflow system to use when pulling image from a private container repository.
volume = k8s.V1Volume(name = "ra-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(
claim_name="example-pvc"))
volumeMount = k8s.V1VolumeMount(mount_path = "/example-volume-mount",
name="example-volume", sub_path = None, read_only = False)
exampleSecrets = Secret("volume", "/secret-path", "example-credentials")
resourceLimit = {"limit_cpu":"4","limit_memory":"16G",
"request_memory":"200M", "request_cpu": "100m"}
java_program = KubernetesPodOperator(
image = "example_docker_image:latest",
namespace = "example_namespace",
env_vars = {
"INPUT_FOLDER": "/input/",
"OUTPUT_FOLDER": "/output/",
"DB_CRED_PATH": "/etc/secret-volume/db.properties",
},
image_pull_policy = "Always",
image_pull_secrets="example-secretes",
name = "java-program",
task_id = "java_program",
get_logs = True,
is_delete_operator_pod = True,
dag = dag,
startup_timeout_seconds = 120,
resources = resourceLimit,
volumes = [volume],
volume_mounts = [volumeMount],
secrets = [exampleSecrets]
)
After the files are converted to images, we would need to determine which Python script to use to extract their information. In the example I defined a simple function that returns one of the Python task id from a random number. In a production pipeline, we would have much more complex logic here. The python code should probably be in a separate file. You can even create your own python library and import it in the DAG file.
def python_branch():
task_id = str(randint(1,2))
if task_id == 1:
return "python_script_1"
else:
return "python_script_2"
branch = BranchPythonOperator(
task_id="choose_python_script",
python_callable=python_branch,
dag=dag,
)
Then, the task with the same id as the value returned by the branch function is being executed. We have a PythonOperator here calling a Python function. In the example I defined a simple function within the DAG file. Again, in a production pipeline the function would be much more complicated. Similar to the PythonBranchOperator above, you can import the python functions from your own custom library.
def python_function_1():
print("processing data through python function 1")
# do the data processing
python_script_1 = PythonOperator(
task_id="python_script_1",
python_callable=python_function_1,
op_kwargs={"db":"dev", "input_folder":"/input/"},
dag=dag,
)
After the Python script extracted the information and stored them in our database, we want to call an API to score the documents based on the information. We can use a SimpleHttpOperator to make a POST request. The http_conn_id is similar to the fs_conn_id in the FileSensor; it refers to a connection you set up in Airflow. It is the same way to set up as shown in Figure 3 and 4. The parameter trigger_rule=”none_failed” is very important in this task. By default, the value of this parameter is all_success, meaning that all the upstream tasks must be successful for this task to run. Since we skipped one of the Python script task, the all_success condition will not be met. Thus, we use a different value, none_failed, to tell the task that it can run as long as none of the upstream tasks have failed.
call_api = SimpleHttpOperator(
task_id="call_api",
http_conn_id="http_conn_1",
endpoint="/relative-path-to-api",
method="POST",
trigger_rule='none_failed',
dag=dag,
)
Although not shown here, the SimpleHttpOperator can also send data to the target API by adding the parameter data:
data={"param1": "value1", "param2": "value2"},
By default, the SimpleHttpOperator returns the response body as plain text, and it can be passed to the next task downstream. You can also modify the response before passing using the response_filter parameter. For example:
response_filter=lambda response: response.json()['nested']['property']
After all the processing is done, we would want to delete all the files we created and the zip file we received, so the old zip file won’t be mistaken as a new one the next time the pipeline runs. The code is similar to unzip_files task. Similarly, you can execute a more complicated custom bash script by having “bash clean_up.script” in the bash_command parameter.
clean_up = BashOperator(
task_id="clean_up",
bash_command="rm /input/*.zip /input/*.pdf /output/*.jpg",
dag=dag,
)
At the end of the pipeline, we want to send an email to notify the developers. We can use the EmailOperator to do so. The example here sends an email with a simple subject and content. You can also access the Airflow API to send more detailed email including information such as individual task run time, total run time, etc.
email_success = EmailOperator(
task_id="email_success",
to="info@indellient.com",
subject="Example Pipeline Success",
html_content="<p>The Example Pipeline ran successfully!</p>",
dag=dag,
)
At the end, we need to explicitly tell Airflow in what order the tasks should be executed, and if there are any conditional branches. >> is used to connect two tasks together, the direction of the arrows indicates that the flow is from left to right. For the two conditioned Python tasks, we put them in an array to indicate their parallel relationship. The trigger_rule='none_failed' parameter must be in the task that comes right after the branched tasks.
zipfile_sensor >> unzip_files >> java_program >> branch >> \
[python_script_1, python_script_2] >> call_api >> clean_up >> email_success
Another usage of putting tasks in an array is when we want to run multiple tasks in parallel to reduce runtime. Imagine all the following tasks are PythonOperators. After task1 runs, task2, task3, and task4 will all be triggered and run in parallel.
task1 >> [task2, task3, task4]
Please note it is just an example so there are lots of placeholders used. For example, the KubernetesPodOperator uses an image that doesn’t exist. The PythonOperators call python functions at the top of the DAG file that only prints. Additionally, the SimpleHttpOperator does not point to a real API URL.
Use cases of Apache Airflow
Airflow is generally used to automate ETL processes, build data pipeline, and populate data warehouses. When you have dozens of steps to transform your source data, it is impractical to run them manually, and building custom software to manage the tasks can be very time-consuming and costly.
We built a data pipeline in Airflow that takes source data and runs it through over 20 stages of data transformation. This used to be done manually by the client and can take a few days to run through, so they only do it monthly. Once we migrated the data pipeline to Airflow, it required little human intervention and ran on a daily schedule during off-hours. The original pipeline was written in Python, so it was also very easy to migrate to Airflow.
Airflow becomes especially useful when there are multiple independent programs used to run the data pipeline. In another project, we have a data pipeline built in Java and Python. The pipeline also relies on external files to run and uses RESTful APIs as part of the processing. Airflow is very useful in this situation. We used KubernetesPodOperator to spawn pods in a Kubernetes cluster to run the Java program and some of the resource-hungry Python scripts. PythonOperator was used to run less complicated Python scripts. SimpleHttpOperator was used to send HTTP request to the APIs. And sensors were used to check hourly if the external files have arrived for the pipeline to start. It was also easy to monitor and debug the pipeline thanks to Airflow’s web interface. You can easily see which step has error and check on its log to debug it.
In Conclusion
Apache Airflow can play a very important role to reliably run your data pipelines. Its extensibility allows you to build a pipeline that relies on tools and programs of different language. It is a highly scalable platform used by thousands of organizations worldwide. It has a strong community and is integrated by many cloud platforms for easy setup, making it a popular and proven choice when building your data pipelines.