Follow the instructions in the airflow documentation. In the project, run the command
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.3/docker-compose.yaml'
This creates the default yaml file. As we do not need Celery or Redis we remove the lines linked to it in the docker-compose.yaml. Replace value of AIRFLOW__CORE_EXECUTOR with LocalExecutor.
Then create folder structure with the command in terminal:
mkdir -p ./dags ./logs ./plugins ./configs
Now as we need other libraries besides the default airflow library we need to create a docker image with these libraries.
Create a new Dockerfile with the following content:
FROM apache/airflow:2.8.2
COPY requirements.txt /requirements.txt
RUN pip install - user - upgrade pip
RUN pip install - no-cache-dir - user -r /requirements.txtv
Next, create the requirements.txt file:
pandas==1.5.3
matplotlib==3.7.5
scikit-learn==1.3.2
statsmodels==0.14.1
apache-airflow-providers-amazon==2.6.0
Now, build this image with the following command:
docker build . - tag stats_airflow:latest
Here, stats_airflow:latest is the name and version of the image.
This image will be visible in your docker-desktop
Next, we add this image to the docker-compose file to ask docker to create a container of this image for the project.
image: ${AIRFLOW_IMAGE_NAME:-stats_airflow:latest}
Before creating this container, we need to setup airflow metadata database with the command:
docker compose up airflow-init
This creates a postgres metadata database with an admin user named airflow.
To actually build the container we run the command (-d used to run docker in detached mode:
docker compose up -d
As we can see all the conponents elaborated in the architecture are started as seperate containers.
This now works as expected, we can open the UI for airflow in browser at http://localhost:8080/
The username and password for login are both set to “airflow” by default.
After this is done, we need to setup the S3 bucket to read from in the workflow. We will be using the open-source S3 object storage, MinIO.
First, we create the minio directory in the project folder and then run the following command in terminal
docker run
-p 9000:9000
-p 9001:9001
- name minio1
-e "MINIO_ROOT_USER=ROOTUSER"
-e "MINIO_ROOT_PASSWORD=AIRFLOWS3"
-v ${HOME}/CryptoAirflowProject/minio/data:/data
quay.io/minio/minio server /data - console-address ":9001"
(Replace the ‘${HOME}/CryptoAirflowProject’ with your current project path)
This will create the object storage at the required location in our project.
We can open the UI for this object store at http://0.0.0.0:9001/
Next, we create a bucket in this object store named btc-dataset. In this, we store the train dataset in /data/train/train.csv and test dataset in /data/test/test.csv
In the airflow UI, we create a connection of type S3 named bucket_conn. In extra, we add the following details:
{
"aws_access_key_id":"ROOTUSER",
"aws_secret_access_key":"AIRFLOWS3",
"host":"http://host.docker.internal:9000"
}
In this file we declare a task group with the @task_group decorator named data_preprocess which takes data_path as argument.
Then we create 2 subtasks inside this task group: download_dataset and preprocess_dataset
In download_dataset, S3Hook is imported from airflow.providers.amazon.aws.hooks.s3 to form a connection to S3 bucket called bucket_conn. The csv file passed in source_file is then downloaded from btc-dataset bucket and stored in local_path.
In preprocess_dataset, the csv is read from local storage into a dataframe and preprocessed. This involves sorting by date, taking and average and then calculating its exponentially weighted mean (ewm). The preprocessed data is then stored on local storage with _preprocessed suffix.
The preprocess_dataset is called with the output from download_dataset. The local storage path is based on the data_path passed as argument and may lead to the train or test folder.
import pandas as pd
from datetime import datetime
import loggingfrom airflow.decorators import task_group, task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
logger = logging.getLogger(__name__)
@task_group(group_id='data_preprocess')
def data_preprocess(data_path):
@task
def download_dataset(source_file, dest_file):
hook = S3Hook('bucket_conn')
file_name = hook.download_file(
key=source_file,
bucket_name='btc-dataset',
local_path=dest_file
)
return dest_file
@task
def preprocess_dataset(source_file):
df = pd.read_csv(source_file)
df = df.set_index('Date').sort_values('Date').drop(columns=['Unix Timestamp', 'Symbol'])
df['Avg'] = df[['High', 'Low']].mean(axis=1)
df['EWM'] = df.Avg.ewm(alpha=0.005).mean()
df = df[['EWM']].reset_index()
dest_file = f'{source_file[:-4]}_preprocessed.csv'
df.to_csv(dest_file, index=False)
return dest_file
intermediate = 'data/train/' if 'train' in data_path else 'data/test/'
dest_file = download_dataset(source_file=data_path, dest_file=intermediate)
dest_file = preprocess_dataset(source_file=dest_file)
return dest_file
In this, we declare a DAG with @dag decorator named bitcoin_model. The arguments to this decorator are: start_date (for the sake of this project we keep as current date +1) and schedule_interval (optional) which is expressed as a CRON string.
This DAG defines 3 subtasks: scale_dataset, bitcoin_model, upload_model
In the scale_dataset, we read data from local storage, and scale the EWM column with a standard scaler. The local storage path is passed as an argument along with the path for storing the pickled scaler. The scaled file is also stored with _scaled suffix.
In the bitcoin_model task, an ARIMA model from statsmodels.tsa.arima.model is employed to model the dataset. The fitted model is pickled and dumped in local storage.
The upload_model task also contains the S3Hook with bucket_conn connection. The scaler and model are then uploaded to the btc-dataset bucket from local storage with the load_file() function.
In this file, first the data_preprocess task group is called. The returned value is then passed to scale_dataset. The bitcoin_model and upload_model are then chained to it.
import pandas as pd
from datetime import datetime
import pickle
import loggingfrom statsmodels.tsa.arima.model import ARIMA
from sklearn.preprocessing import StandardScaler
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from preprocess_group import data_preprocess
logger = logging.getLogger(__name__)
@dag(dag_id='bitcoin_model_v1', start_date=datetime(2024, 4, 4))
def bitcoin_model():
@task
def scale_dataset(source_file, scaler_file):
df = pd.read_csv(source_file)
ss = StandardScaler()
df['EWM']: ss.fit_transform(df['EWM'])
pickle.dump(ss, open(scaler_file, 'wb'))
dest_file = f'{source_file[:-4]}_scaled.csv'
df.to_csv(dest_file, index=False)
return dest_file
@task
def bitcoin_model(source_file, model_file):
df = pd.read_csv(source_file)
df = df.set_index('Date')
model = ARIMA(df, order=(10, 1, 0))
result_model = model.fit()
pickle.dump(result_model, open(model_file, 'wb'))
return model_file
@task
def upload_model(scaler_file,model_file):
try:
hook = S3Hook('bucket_conn')
hook.load_file(
filename=scaler_file,
key=scaler_file,
bucket_name='btc-dataset',
replace=True
)
hook.load_file(
filename=model_file,
key=model_file,
bucket_name='btc-dataset',
replace=True
)
except Exception as e:
logger.error(e)
return e
data_path='data/train/train.csv'
scaler_file = 'data/scaler.pkl'
model_file = 'data/model.sav'
dest_file = data_preprocess(data_path)
dest_file = scale_dataset(source_file=dest_file, scaler_file=scaler_file)
model_file = bitcoin_model(source_file=dest_file, model_file=model_file)
upload_model(scaler_file=scaler_file, model_file=model_file)
bitcoin_model()
The final DAG diagram created is: