Building Pipelines In Apache Airflow - For Beginners

22 Mar, 2023

Apache Airflow is quite popular in the data science and data engineering space. It boasts many features that enable users to programmatically create, manage, and monitor complex workflows.

However, the platform’s range of features may inadvertently become a detriment to beginners. New users that explore Apache Airflow’s documentation and tutorials can easily become inundated by new terminology, tools, and concepts.

With the aim of creating a more digestible introduction to this platform, we offer a bare-bones version of an Apache Airflow demo that entails coding and running an Airflow pipeline.

Terminology

It will be easier to follow the demo after becoming familiar with the following Airflow terminology.

  1. DAG: A DAG, short for directed acyclic graph, is what Airflow uses to represent a workflow. DAGs are composed of nodes that denote tasks and arrows that denote the relationships between the tasks.
Example DAG (Created By Author)

2. Task: A task is a single unit of work within a DAG. Tasks in a DAG can be linked together by dependencies, ensuring that they are executed in a specific order.

3. Operator: Operators are tools used to instantiate the tasks in a DAG. Airflow offers a plethora of operators that can perform basic jobs, such as executing Python code, bash commands, and SQL queries.

4. Metadata Database: The metadata database stores metadata about the Airflow pipelines that the users create and run.

5. Webserver: The webserver is a convenient UI that enables users to run, monitor, and debug pipelines.

6. Scheduler: The scheduler is responsible for monitoring DAGs and running tasks when their dependencies are met.

Demo

The goal of the demo is to create a DAG with Python and get it running on Airflow. Since we are prioritizing simplicity, we will create a DAG that only consists of two tasks.

The first task runs a Python function named pull_jokes, which procures random jokes from the official joke API and stores them in a text file.

The second task runs a bash command that prints a single sentence:

echo "The random jokes have been pulled"

Finally, we want the second task (i.e., bash command) to be executed right after the first task (i.e., Python function). If we were to visualize this DAG, it would look something like this:

Demo DAG (Created By Author)

Pretty simple, right? Let’s get started.

Part 1 — Setting Up Airflow

Before we can begin creating any DAGs, we will need to set up Airflow on our machine, which will require the use of the command line interface.

There are several ways to install Apache Airflow, as explained in the documentation. Here, we will perform the installation with the PyPI route.

  1. Activate Windows Subsystem for Linux (For Windows Users)

If you’re working with Windows, you will need to activate Windows Subsystem for Linux (WSL), which will require installing Ubuntu first.

To activate WSL, enter the wsl command in PowerShell.

2. Create a virtual environment

Next, create the virtual environment in which we will be working. Enter the following command to install python-venv.

sudo apt install python3-venv

For the demo, we will create a virtual environment named “airflow_venv” and activate it.

python -m venv airflow_venv source airflow_venv/bin/activate Activate Virtual Environment (Created By Author)

3. Select the home directory for Airflow (optional)

It’s important to know where the Airflow project will be stored, as this is where the created workflows should be stored and configured.

By default, the home directory for Airflow will be the ~/airflow directory. However, users can change the home directory for Airflow using the following command:

export AIRFLOW_HOME=

For the case study, the project will be stored in the ~/airflowhome directory.

4. Install Apache Airflow

After confirming the home directory, install Apache Airflow using pip with the following command:

pip install apache-airflow

This installation will create a project in the home directory with the following structure:

└── airflowhome/ ├── airflow.cfg ├── logs/ │ └── ... └── webserver_config.py Airflow Home Directory (Created By Author)

The file named airflow.cfg contains all of Airflow’s configurations. It includes a parameter named dags_folder, which shows the path of the folder where all created DAGs must be located.

You can see the currently assigned directory by entering the command:

vim airflow.cfg

Code Output (Created By Author)

As shown by the output, all airflow DAGs for this demo have to be in the dags subdirectory in the airflowhome directory.

So, we will create a folder named dags in the same directory, which will contain the subsequently created pipelines.

mkdir dags

Note: You can also change the path assigned to the dags_folder parameter in the airflow.cfg file to the path of your liking.

The project structure should be updated to the following:

└── airflowhome/ ├── airflow.cfg ├── logs/ │ └── ... ├── dags/ └── webserver_config.py

5. Initialize the metadata database

Next, initialize the database with the command:

airflow db init

After initializing the database, the airflow home directory should now have a file named airflow.db.

└── airflowhome/ ├── airflow.cfg ├── airflow.db ├── logs/ │ └── ... ├── dags/ └── webserver_config.py Airflow Home Directory (Created By Author)

6. Create a user.

A user account is needed to access the webserver, so we can create one using the following command.

airflow users create --username \ --password \ --firstname \ --lastname \ --role \ --email

For the case study, we will be creating an admin account.

Creating a User (Created By Author)

Once a user account is created, it can be verified using the command:

airflow users list Code Output (Created By Author)

7. Start the web server and scheduler

The Airflow webserver can be opened with a one-liner.

airflow webserver -p 8080

Note: the default port is 8080, but you can choose a different port.

In a new terminal, start the scheduler with the following command:

airflow scheduler

9. Login to the web server

After starting the webserver, visit “http://localhost:8080/” (or whichever port you choose) in a web browser, and you will be directed to the login page, which is where you will enter the login credentials from your newly created user account.

Login Page (Created By Author)

After entering your details, you will be directed to the home page.

Webserver (Created By Author)

At the moment, there will only be samples DAGs provided by Airflow, but the webserver will also contain your DAGs once they are created.

Part 2 — Creating the DAG

Now that we have Airflow set up, we can build the DAG using Python. The DAG created in this demo will be called “pulling_jokes_dag”.

The following Python script named pull_jokes.py creates a DAG instance as well as the tasks:

It’s a small snippet of code, but there is a lot going on. Let’s break everything down step by step.

  1. Create a DAG instance

First, we create a DAG instance, in which we determine the configuration of the DAG.

A DAG instance must have assigned values for two parameters: dag_id and start_date. The dag_id parameter is the unique identifier of the DAG, while the start_date parameter is the date the DAG is set to be scheduled.

To keep things simple, we will only specify a few parameters. The schedule parameter defines the rules for when the DAG should be run. The end_date parameter indicates when the DAG runs should stop running. The catchup parameter indicates whether the scheduler should start a DAG run for any data interval that has not been run since the last data interval. The tag parameter assigns tags to the DAG, which will make it easier to find in the UI.

For those interested in all available parameters for DAG instances, feel free to visit the Airflow documentation.

2. Create the first task

After creating the DAG instance, we can create the first task, which runs the pull_jokes function, by using a Python Operator.

The task_id parameter is the unique identifier for the task, whereas the python_callable parameter contains the function that should be executed.

3. Create the second task

Next, we create the second task, which executes the bash command, using the Bash Operator.

Once again, the task_id parameter is the unique identifier for the task, whereas the bash_command parameter contains the bash script that should be executed.

4. Establish dependencies

After creating the tasks, the dependencies need to be set up to determine the order in which the tasks are executed.

We can set task 2 to be executed after task 1 by using the >> operator.

Part 3 — Running the DAG on Airflow

Once the Python script for the DAG is created, the next step is to run the DAG on Airflow.

As a reminder, the DAG script must be placed in the location specified in the airflow.cfg file. The pull_jokes.py file used for this case study should be in this location.

DAGs directory (Created By Author)

After moving the pull_jokes.py file to the correct location, the project directory tree should look like this:

└── airflowhome/ ├── airflow.cfg ├── airflow.db ├── dags/ │ └── pull_jokes.py ├── logs/ │ └── ... └── webserver_config.py

Now, Airflow should have access to the DAG. To confirm this, simply enter the command:

airflow dags list

We should now be able to view the newly added DAG in the webserver. Start the webserver with the command:

airflow webserver -p 8080

In a separate terminal, start the scheduler

airflow scheduler

You should be able to see the DAG named “pulling_jokes_dag” in the UI.

DAG in Webserver (Created By Author)

Pro tip: Although it’s optional, it’s best to assign tags to your DAGs. This will make them easier to find in the webserver.

Technically speaking, DAGs can be managed using the command line interface alone, but the features in the web server make it much easier to access and monitor the created pipelines.

Some of the features include, but are not limited to:

  • A breakdown of the DAG in the form of a grid:
Grid View (Created By Author)
  • A breakdown of the DAG in the form of a graph
Graph View (Created By Author)
  • The underlying Python code for the DAG
Code (Created By Author)
  • The audit log
Audit Log (Created By Author)

Once Airflow has access to the DAG, it will perform DAG runs based on the provided schedule.

A DAG can also be run manually by clicking the “play button” in the top right of the web server.

Run DAG (Created By Author)

Alternatively, a DAG can be manually triggered using the CLI:

airflow dags trigger

Voila! Just like that, we have our DAG up and running on Airflow!

Conclusion

Photo by Prateek Katyal on Unsplash

Hopefully, this has been a useful primer for those looking to get hands-on experience with Apache Airflow.

This demo is by no means a comprehensive demonstration of Airflow’s features and capabilities (we have barely scratched the surface), but it should be able to help people get started, which oftentimes is the hardest part.

After building and running your first DAG, you will have established a solid foundation, which will make it easier to design and run more complex workflows with a more frictionless experience.

I wish you the best of luck in your data science endeavors!