- Lab Credits: Prof. Ramin Mohammadi
- Blog Credits: Sai Akhilesh Ande
Note:
- Airflow lab-1 used Docker with which we were able to run on all Operating Systems. But, this lab requires installation of airflow.
- Hence, this lab requires Linux based operating systems (MacOS, Ubuntu, etc.). To install on Windows OS, install it via WSL2 or install Ubuntu through virtual box/vmware.
In this lab, we create a machine-learning workflow using Airflow
which trains a logistic regression model on advertising data, sends email notifications and monitors the status using Flask API.
The workflow involves the following steps:
- Installing Airflow (
pip install
) - Configuring
airflow.cfg
for emails. - Integrating email notifications.
- Monitoring workflow using FlaskAPI.
Prerequisites
- Github Lab-1
- Basic understanding of Airflow concepts (Airflow Lab-1)
Setting up the lab
Note: If you’d like to save your work files to GitHub, setup your working directory similar to Github Lab-1 and add necessary files to .gitignore
. If you just want to run the lab locally, follow the below steps.
- Open the local mlops_labs directory using Visual Studio Code(VSC) and open a new terminal inside VSC.
- Create a new working directory for this lab(e.g. airflow_lab2).
- Move into the working directory using the command cd airflow_lab2 in your terminal.
- Copy the dags folder and requirements.txt to your working directory.
- Create a virtual environment(e.g. airflow_lab2_env).
- Activate the environment and install the required packages using
pip install -r requirements.txt
.
Project Structure:
mlops_labs
└── airflow_lab2
├── airflow_lab2_env
├── dags
│ ├── data
│ │ └── advertising.csv
│ ├── model
│ ├── src
│ │ └── model_development.py
│ ├── templates
│ │ ├── failure.html
│ │ └── success.html
│ ├── Flask_API.py
│ └── main.py
└── requirements.txt
Running Apache Airflow
- Setting up airflow path - Run this command in the terminal inside VSC.
export AIRFLOW_HOME=~airflow
- This will create a folder named airflow in your home directory.
- Setting up the database - Apache Airflow requires a database. For our labs, we can stick to the default SQLite option.
airflow db migrate
- This command creates the database schema if it does not exist or migrate to the latest version if it does.
- You will notice that a folder logs and files airflow.cfg, airflow.db created in the airflow directory.
- Creating a user
airflow users create \ --username akhil \ --firstname "Sai Akhilesh" \ --lastname Ande \ --role Admin \ --email <email@northeastern.edu>
- If you want to give multiple words for an argument, use
" "
.
- If you want to give multiple words for an argument, use
- Airflow has multiple components - webserver and scheduler.
- scheduler: Handles both triggering scheduled workflows, and submitting Tasks to the executor to run. The executor, is a configuration property of the scheduler, not a separate component and runs within the scheduler process.
Configuring Airflow. There are several executors available out of the box, and you can also write your own. - webserver: presents a handy user interface to inspect, trigger and debug the behaviour of DAGs and tasks.
- So, we have to start both of them in two seperate terminals.
# you can change port if you have something else running in this port. airflow webserver --port 8080 # run in a seperate terminal. airflow scheduler
- scheduler: Handles both triggering scheduled workflows, and submitting Tasks to the executor to run. The executor, is a configuration property of the scheduler, not a separate component and runs within the scheduler process.
- Configuring Airflow
- All configurations of airflow are stored in the
airflow/airflow.cfg
file. - Any configuration changes should be made in this file.
# this will not load the default DAG examples that ship with Airflow. load_examples = False
- All configurations of airflow are stored in the
- Now, stop both servers(Ctrl + C), go through the next sections to begin working on this lab.
- Once, the following changes are made, restart both servers again to run the DAGs.
Sign in with app passwords
- Follow the instruction provided here: link and get your smtp password.
Adding SMTP Information to airflow.cfg
To configure Airflow to send emails, you need to add SMTP information to the airflow.cfg
file. Follow these steps:
- Locate the
airflow.cfg
file in your airflow directory. - Open the file in VSC or a text editor.
- Search for the
[smtp]
section in the configuration file. - Update the following parameters with your SMTP server information:
smtp_host
: Hostname of the SMTP server.smtp_starttls
: Set it toTrue
if your SMTP server uses TLS.smtp_ssl
: Set it toTrue
if your SMTP server uses SSL.smtp_user
: Your SMTP username.smtp_password
: Your SMTP password.smtp_port
: Port number of the SMTP server (e.g., 587 for TLS, 465 for SSL).
- Save the changes to the
airflow.cfg
file.
for our lab assuming you have a gmail account you can use the following setting:
- smtp_host = smtp.gmail.com
- smtp_starttls = True
- smtp_ssl = False
- smtp_user = YOUREMAIL@gmail.com
- smtp_password = Enter your password generated above
- smtp_port = 587
- smtp_mail_from = YOUREMAIL@gmail.com
- smtp_timeout = 30
- smtp_retry_limit = 5
After updating the SMTP information, Airflow will use the configured SMTP server to send email notifications.
DAG Structure
Airflow_Lab2
This DAG orchestrates a machine learning pipeline and notification system. Let’s break down each function within this DAG:
notify_success(context)
andnotify_failure(context)
Functions:- These functions define email notifications for task success and failure, respectively. They utilize the
EmailOperator
to send emails with predefined content and subject to a specified recipient (in this case,rey.mhmmd@gmail.com
).
- These functions define email notifications for task success and failure, respectively. They utilize the
default_args
Dictionary:- This dictionary defines default arguments for the DAG, including the start date and the number of retries in case of task failure.
dag
Definition:- This section creates the main DAG instance (
Airflow_Lab2
) with various parameters such as description, schedule interval, catchup behavior, and tags.
- This section creates the main DAG instance (
owner_task
BashOperator:- This task echoes
1
and is assigned to an owner (Ramin Mohammadi
). It represents a simple demonstration task with a linked owner.
- This task echoes
send_email
EmailOperator:- This task sends a notification email upon DAG completion. It utilizes the
notify_success
andnotify_failure
functions as callbacks for success and failure, respectively.
- This task sends a notification email upon DAG completion. It utilizes the
- PythonOperator Tasks:
- These tasks execute Python functions (
load_data
,data_preprocessing
,separate_data_outputs
,build_model
,load_model
) representing different stages of a machine learning pipeline. They perform data loading, preprocessing, model building, and model loading tasks.
- These tasks execute Python functions (
TriggerDagRunOperator
Task:- This task triggers the
Airflow_Lab2_Flask
DAG upon successful completion of the main DAG. It ensures that the Flask API is launched after the machine learning pipeline completes successfully.
- This task triggers the
Airflow_Lab2_Flask
This DAG manages the Flask API’s lifecycle and consists of the following function:
check_dag_status()
Function:- This function queries the status of the last DAG run (
Airflow_Lab2
). It returnsTrue
if the DAG run was successful, andFalse
otherwise.
- This function queries the status of the last DAG run (
handle_api_request()
Function:- This function handles API requests and redirects users to
/success
or/failure
routes based on the status of the last DAG run.
- This function handles API requests and redirects users to
- Flask Routes and HTML Templates:
- The Flask routes (
/api
,/success
,/failure
) define endpoints for accessing the API and displaying success or failure pages. These routes render HTML templates (success.html
,failure.html
) with appropriate messages.
- The Flask routes (
start_flask_app()
Function:- This function starts the Flask server, enabling users to access the API endpoints.
start_flask_API
PythonOperator:- This task executes the
start_flask_app()
function to initiate the Flask server. It represents the starting point for the Flask API’s lifecycle.
- This task executes the
Conclusion
In this project, we’ve constructed a robust workflow using Apache Airflow to orchestrate a machine learning pipeline and manage a Flask API for monitoring purposes. The Airflow_Lab2 DAG coordinates various tasks, including data loading, preprocessing, model building, and email notification upon completion. By leveraging PythonOperators and BashOperator, we’ve encapsulated each step of the machine learning process, allowing for easy scalability and maintenance.
Additionally, the integration of email notifications enhances the workflow’s visibility, providing stakeholders with timely updates on task success or failure. This ensures proactive monitoring and quick response to any issues that may arise during pipeline execution.
Furthermore, the Airflow_Lab2_Flask DAG facilitates the management of a Flask API, enabling users to access endpoints for checking the status of the machine learning pipeline. By querying the last DAG run status, the API delivers real-time feedback, empowering users to make informed decisions based on the pipeline’s performance.
Overall, this project demonstrates the power of Apache Airflow in orchestrating complex workflows and integrating external systems seamlessly. By following the provided instructions and understanding the workflow’s structure, users can leverage Airflow to streamline their machine learning pipelines and enhance operational efficiency.