Data Quality Monitoring in Apache Airflow with whylogs
- Integrations
- Whylogs
- Open Source
- Data Quality
- ML Monitoring
Sep 13, 2022
If you are working with data pipelines that need more resilient, complex and scalable processing, Apache Airflow is an excellent choice. The framework helps you create, schedule, and monitor your processes with its friendly API. It will ensure that your planned jobs run when they should and how they should, and also tells you which specific tasks might have failed along the way.
But just knowing that a process ran successfully and on time isn't enough for reliability. That's where whylogs, the open standard for data logging, comes in. whylogs allows users to verify the quality of their data with a Pythonic API for running a constraints validation suite, generating drift reports and much more.
To make the most of whylogs within your existing Airflow pipelines, we’ve created the whylogs Airflow provider. In this blog post, we will use an example to demonstrate how you can use whylogs and Airflow to make your workflow more responsible, scalable, and efficient.
A typical DAG in Apache Airflow
An Airflow pipeline is defined in a Directed Acyclic Graph (DAG), which means that every job will have a starting task and dependencies that will always run in a single direction and will not loop back to the beginning after they finish. A very simple DAG can be visualized in Airflow as the following image shows.
This will make Airflow execute task_a before task_b and task_c. If by any chance task_a fails, then the downstream tasks will not be executed and this can trigger an alert or retry the failing pieces automatically. Production workflows can lead to much larger graphs, opening ways for orchestration tools like Airflow shine. For data pipelines though, successful pipelines might represent wrongful results.
For example, an ML job that completed successfully might have led to drifted data, and without proper monitoring, the results may only be caught much later - if they are ever caught. With whylogs, monitoring Data and Machine Learning can be done very smoothly, and with our newest Airflow Operator, you can extend your existing pipelines right away to make your workflows more reliable.
Extending a DAG with whylogs operators
In order to use our operators, all you need to do is install it in your environment with:
(.venv) $ pip install airflow-provider-whylogs
Once that is done you are now ready to use whylogs and its operators across your data pipelines. To get started, the first thing you will need is to profile your data and store it somewhere your Airflow application can access later on. For demonstration purposes, we will transform and profile the data locally with pandas
and store the profile locally. In order to do so, let's define our first DAG tasks with the following piece of code.
from datetime import datetime
import pandas as pd
import whylogs as why
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
def profile_data(data_path="data/transformed_data.csv"):
df = pd.read_csv(data_path)
result = why.log(df)
result.writer("local").write(dest="data/profile.bin")
def transform_data(input_path="data/raw_data.csv"):
input_data = pd.read_csv(input_path)
clean_df = input_data.dropna(axis=0)
clean_df.to_csv("data/transformed_data.csv")
with DAG(
dag_id='whylogs_example_dag',
schedule_interval=None,
start_date=datetime.now(),
max_active_runs=1,
tags=['responsible', 'data_transformation'],
) as dag:
transform_data = PythonOperator(task_id="my_transformation", python_callable=transform_data)
profile_data = PythonOperator(task_id="profile_data", python_callable=profile_data)
After that, we can use our integration operators to consume the stored profiles and build three Constraints validators and also a Summary Drift Report.
from whylogs.core.constraints.factories import greater_than_number, mean_between_range
from whylogs_provider.operators.whylogs import (
WhylogsConstraintsOperator,
WhylogsSummaryDriftOperator,
)
greater_than_check_a = WhylogsConstraintsOperator(
task_id="greater_than_check_a",
profile_path="data/profile.bin",
constraint=greater_than_number(column_name="a", number=0),
)
greater_than_check_b = WhylogsConstraintsOperator(
task_id="greater_than_check_b",
profile_path="data/profile.bin",
constraint=greater_than_number(column_name="b", number=0),
)
avg_between_b = WhylogsConstraintsOperator(
task_id="avg_between_b",
profile_path="data/profile.bin",
break_pipeline=False,
constraint=mean_between_range(column_name="b", lower=0.0, upper=125.1261236210),
)
summary_drift = WhylogsSummaryDriftOperator(
task_id="drift_report",
target_profile_path="data/profile.bin",
reference_profile_path="data/ref_profile.bin",
reader="local",
write_report_path="data/Profile.html",
)
And to chain the execution together on our DAG file, all we need to do is add the following piece to our Python code.
(
transform_data
>> profile_data
>> [greater_than_check_a, greater_than_check_b, avg_between_b]
>> summary_drift
)
That code will create our example DAG, represented by the image below.
With this DAG, we will know right away whether our validations failed for each one of the constraints we put in. Also, if everything works fine, we will write out a Summary Drift Report to our desired location so users can further investigate drift when comparing the profiled data to the reference profile (that was created in the training process, for example).
NOTE: One of the possible locations to store your profiles is the local file system, in case you run Airflow on a Virtual Machine instance and its processing and storage pieces are shared. Another option is to submit a spark job to AWS EMR and store the profile on an AWS S3 bucket.
We didn't want to include a strong opinionated way to profile data at this point, because every real world usage scenario will be different.
Conclusion
Now that you have learned how to integrate whylogs into your Airflow DAGs, your pipelines will be more responsible, scalable, and efficient.
If you think another operator could be useful to your case, please open an issue on our open repo and let us know how you're using whylogs with Apache Airflow on the community Slack!
References
- Airflow-whylogs-provider: https://pypi.org/project/airflow-provider-whylogs/
- Airflow: https://airflow.apache.org/
- whylogs: https://whylogs.readthedocs.io/
Other posts
Understanding and Implementing the NIST AI Risk Management Framework (RMF) with WhyLabs
Dec 10, 2024
- AI risk management
- AI Observability
- AI security
- NIST RMF implementation
- AI compliance
- AI risk mitigation
Best Practicies for Monitoring and Securing RAG Systems in Production
Oct 8, 2024
- Retrival-Augmented Generation (RAG)
- LLM Security
- Generative AI
- ML Monitoring
- LangKit
How to Evaluate and Improve RAG Applications for Safe Production Deployment
Jul 17, 2024
- AI Observability
- LLMs
- LLM Security
- LangKit
- RAG
- Open Source
WhyLabs Integrates with NVIDIA NIM to Deliver GenAI Applications with Security and Control
Jun 2, 2024
- AI Observability
- Generative AI
- Integrations
- LLM Security
- LLMs
- Partnerships
OWASP Top 10 Essential Tips for Securing LLMs: Guide to Improved LLM Safety
May 21, 2024
- LLMs
- LLM Security
- Generative AI
7 Ways to Evaluate and Monitor LLMs
May 13, 2024
- LLMs
- Generative AI
How to Distinguish User Behavior and Data Drift in LLMs
May 7, 2024
- LLMs
- Generative AI