Airflow is the promise of easy, flexible, and scalable data workflow management. What is actually behind this more and more well-known software? In this article, you will get to know its main components and get a chance to follow an all-in-one tutorial to implement your first DAG.
Airflow is a Python-based open-source software to create, schedule, and monitor workflows. By workflow, I mean a wide range of data pipeline management use cases, from framework orchestration to scheduling.
Downloads of Apache Airflow increased exponentially over the last year. It is born a couple of years ago, in 2014, in Airbnb’s offices. They needed a software to manage their workflows, which were getting more and more complex as they grew. In 2016, It joins the official Apache Incubator. At that time, the first version is already available. The project takes a big step in 2019 with the labeling of the software as a top-level Apache Software Foundation project.
Airflow boasts its flexibility. Actually, after familiarizing yourself with it, you will be able to adapt it entirely to your use case. The code is freely available, which enables easy element customization. In addition, there are many extensions, integrations, and plugins that are made available by its always more active community. So let’s accustom ourselves to Airflow to stand on your own feet and begin your own Airflow journey.
Airflow’s key components
In mathematics, a DAG is a graph with unidirectional links between each node and no cycle. In Airflow, a DAG is a Python class that defines workflows.
The Operators are the basic components of workflows in Airflow. They define the action of each task and can be of plenty of different types. Each operator is defined individually and is then entirely independent by construction. Therefore, they are entirely isolated and can be run on different machines. To exchange small amounts of data between Operators, one needs XComs (for cross-communication). XComs are defined by a key, the task_id, and the dag_id it came from. They can have any serializable value and are explicitly pulled and pushed from their storage location. There are several types of operators: default operators, operators from external providers, and sensors. Let’s give here some examples to sense the wide range of operator types.
Here are examples of default operators:
- The BashOperator enables to run a bash command,
- The PythonOperator works with a python callable,
- The BranchOperator decides between multiple tasks to continue with (= branch) based on the results of the upstream tasks,
- The EmailOperator sends emails,
- The TriggerDagRunOperator can trigger another defined DAG.
External Operators are developed as a bridge by external providers. For example, the DockerOperator enables to execute a command inside a docker container. The ECSOperator runs tasks in a AWS ECS while the PostgresOperator directly executes a specific SQL code in a Postgres database. S3ToRedschiftOperator copies data from an S3 source to a temporary location on the local filesystem to run a transformation on this file and uploads the output to a Redshift table.
Sensors are designed to wait for something to occur and check for this event continuously or every x specified second. Consequently, the FileSensor waits for a file to land in a filesystem and the PythonSensor waits for a Python callable to return True. Alternatively, the DatetimeSensor waits until a specified datetime before continuing.
Airflow comes with a UI, which is an essential tool to efficiently monitor your workflows. You can visualize your DAGs, see their run history, trigger them, change or define settings, manage user access, or other components.
To go further
Connection and Hook
An Airflow connection is essentially a set of parameters such as a key, type, username, password, or hostname. This data is encrypted and saved as metadata. You can easily access it via the UI. A hook integrates with a connection. It is a high-level interface to an external platform that lets you quickly talk with it. For instance, the PostgresHook enables to connect with a Postgres database and serves the Operators that may use it.
They are key as they handle the run of each task. You can only use one type of executor. They are of different modes: local or remote execution. They are of different types: sequential or parallel execution. Just to give an example, the DaskExecutor runs tasks in a Dask-distributed cluster.
They enable you to adapt to some specific needs you may have. For instance, the API plugin exposes REST-like endpoints to perform operations and access data. It replicates application logic. Whereas, the Google Analytics plugin integrates analytics into your application.
Your first Airflow DAG
In this section, we will build together a DAG to get you (truly) started. This trivial DAG uses two simple operators and one sensor. It simulates model training based on a file uploaded by a collaborator.
You first must have a DAG you want to implement. You know now that Airflow is the perfect tool to manage it!
Installation With Linux:
- install the needed dependencies
- make sure the airflow_home environment variable is set correctly
- install Apache Airflow
Implement your DAG in Airflow. We import the needed libraries and define a global variable for our csv path.
We begin with the DAG definition, we name it demo_dag and schedule it daily. Note that the catch up parameter makes airflow only run workflows that are scheduled in the future from the current time. In fact, by default, airflow runs any past scheduled workflow that was not run. This is called backfilling.
Then, we implement our four tasks. The first one is a simple BashOperator, the second one is a PythonOperator that simulates a partner saving a .csv file in the filesystem, the third one is a FileSensor that waits for the partner to upload to file, and the last task simulates the training of three models based on the values contained in the csv file.
The first PythonOperator calls the *save_*csv_file method and the second calls training_model. We then need to implement these two methods.
We define the ordering of tasks. Likewise, we note that there are three equivalent ways of doing so.
For our complete workflow, we have:
You configure your Airflow webserver, scheduler, and database. An efficient way of doing so is by defining a docker-compose file for Airflow, as done in this article. An Airflow folder typically contains dags, logs, and plugins folders. The Python files defining your DAGs must be in the dags folder.
Finally, you can monitor your workflow with the UI. If you followed the configuration of 4, you only need to write
docker-compose up in your terminal in your Airflow folder. You need then to open http://localhost:8080 and enter your credentials (by default, username airflow, and password airflow).
You have now implemented your first Airflow DAG locally. The next step towards production is deployment. This step’s complexity is not to underestimate. You have several options for this. The well-known platforms for deployment are AWS, GCP, and Microsoft Azure. AWS has the interesting MWAA, Amazon Managed Workflows for Apache Airflow, that makes it easier. However, this managed tool works only with versions up to Airflow 2.2.2 and Python 3.7. Thus, to use other versions, you will have to use specific Operators that can handle your requirements, for example, ECSOperator. In fact, the ECSOperator would be run on a docker image containing the wanted python version.
Plugins repository: https://github.com/airflow-plugins