
Apache Airflow is a powerful platform for orchestrating complex workflows as Directed Acyclic Graphs (DAGs). Understanding the components of a DAG is crucial for building and managing your data pipelines effectively. In this article, we’ll explore the anatomy of an Apache Airflow DAG by breaking it down into its logical blocks and providing code examples for each block.
An Apache Airflow DAG can be divided into the following logical blocks:
- Imports: Importing the necessary libraries and operators.
- DAG Arguments: Defining arguments and configurations for the DAG.
- DAG Definition: Creating an instance of the DAG.
- Task Definitions: Defining individual tasks within the DAG.
- Task Pipeline: Specifying the order and dependencies between tasks.
Let’s dive into each of these blocks with code examples.
In this block, we import the required libraries and operators. These imports are essential for defining tasks and configuring the DAG.
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
DAG arguments are like settings for the DAG. They define parameters such as the owner, start date, email alerts, and retry settings.
default_args = {
'owner': 'Ramesh Sannareddy',
'start_date': days_ago(0),
'email': ['ramesh@somemail.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
In this block, we create an instance of the DAG by specifying its unique ID, default arguments, description, and schedule interval.
dag = DAG(
dag_id='sample-etl-dag',
default_args=default_args,
description='Sample ETL DAG using Bash',
schedule_interval=timedelta(days=1),
)
Tasks are the building blocks of a DAG. In this block, we define individual tasks, each represented by a unique task ID, a Bash command to execute, and the DAG it belongs to:(remember to modify the bash_command to the actual command you need to run).
extract = BashOperator(
task_id='extract',
bash_command='echo "extract"',
dag=dag,
)transform = BashOperator(
task_id='transform',
bash_command='echo "transform"',
dag=dag,
)
load = BashOperator(
task_id='load',
bash_command='echo "load"',
dag=dag,
)
The task pipeline specifies the order in which tasks should be executed. Tasks are linked together using the >>
operator to create dependencies.
extract >> transform >> load
Now, let’s create a practical example of a DAG that extracts user information from the /etc/passwd
file, transforms it, and loads it into a file. This DAG will run daily.
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago# Define DAG arguments
default_args = {
'owner': 'Ramesh Sannareddy',
'start_date': days_ago(0),
'email': ['ramesh@somemail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Define the DAG
dag = DAG(
'my-first-dag',
default_args=default_args,
description='My first DAG',
schedule_interval=timedelta(days=1),
)
# Define tasks
extract = BashOperator(
task_id='extract',
bash_command='cut -d":" -f1,3,6 /etc/passwd > /home/project/airflow/dags/extracted-data.txt',
dag=dag,
)
transform_and_load = BashOperator(
task_id='transform',
bash_command='tr ":" "," < /home/project/airflow/dags/extracted-data.txt > /home/project/airflow/dags/transformed-data.csv',
dag=dag,
)
# Task pipeline
extract >> transform_and_load
To submit a DAG, you need to place the DAG Python file in the dags
folder within your Airflow home directory. Here’s how you can do it:
- Save the code above in a Python file, e.g.,
my_first_dag.py
. - Copy the DAG file to the
dags
folder in your Airflow home directory:
cp my_first_dag.py $AIRFLOW_HOME/dags
3. Verify that your DAG is successfully submitted by listing all existing DAGs:
You should see your DAG name, ‘my-first-dag,’ in the output.
To list all the tasks within your DAG, you can use the following command:
airflow tasks list my-first-dag
This should display the two tasks defined in your DAG, ‘extract’ and ‘transform.’
By following these steps, you’ve explored the anatomy of an Apache Airflow DAG and created and submitted your own DAG for data extraction, transformation, and loading. You are now ready to schedule and manage your data workflows with Airflow.