blog bg left
Back to Blog

Data Quality Monitoring in Apache Airflow with whylogs

If you are working with data pipelines that need more resilient, complex and scalable processing, Apache Airflow is an excellent choice. The framework helps you create, schedule, and monitor your processes with its friendly API. It will ensure that your planned jobs run when they should and how they should, and also tells you which specific tasks might have failed along the way.

But just knowing that a process ran successfully and on time isn't enough for reliability. That's where whylogs, the open standard for data logging, comes in. whylogs allows users to verify the quality of their data with a Pythonic API for running a constraints validation suite, generating drift reports and much more.

To make the most of whylogs within your existing Airflow pipelines, we’ve created the whylogs Airflow provider. In this blog post, we will use an example to demonstrate how you can use whylogs and Airflow to make your workflow more responsible, scalable, and efficient.

A typical DAG in Apache Airflow

An Airflow pipeline is defined in a Directed Acyclic Graph (DAG), which means that every job will have a starting task and dependencies that will always run in a single direction and will not loop back to the beginning after they finish. A very simple DAG can be visualized in Airflow as the following image shows.

This will make Airflow execute task_a before task_b and task_c. If by any chance task_a fails, then the downstream tasks will not be executed and this can trigger an alert or retry the failing pieces automatically. Production workflows can lead to much larger graphs, opening ways for orchestration tools like Airflow shine. For data pipelines though, successful pipelines might represent wrongful results.

For example, an ML job that completed successfully might have led to drifted data, and without proper monitoring, the results may only be caught much later - if they are ever caught. With whylogs, monitoring Data and Machine Learning can be done very smoothly, and with our newest Airflow Operator, you can extend your existing pipelines right away to make your workflows more reliable.

Extending a DAG with whylogs operators

In order to use our operators, all you need to do is install it in your environment with:

(.venv) $ pip install airflow-provider-whylogs

Once that is done you are now ready to use whylogs and its operators across your data pipelines. To get started, the first thing you will need is to profile your data and store it somewhere your Airflow application can access later on. For demonstration purposes, we will transform and profile the data locally with pandas and store the profile locally. In order to do so, let's define our first DAG tasks with the following piece of code.

from datetime import datetime
 
import pandas as pd
import whylogs as why
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
 
def profile_data(data_path="data/transformed_data.csv"):
   df = pd.read_csv(data_path)
   result = why.log(df)
   result.writer("local").write(dest="data/profile.bin")
 
 
def transform_data(input_path="data/raw_data.csv"):
   input_data = pd.read_csv(input_path)
   clean_df = input_data.dropna(axis=0)
   clean_df.to_csv("data/transformed_data.csv")
 
 
with DAG(
   dag_id='whylogs_example_dag',
   schedule_interval=None,
   start_date=datetime.now(),
   max_active_runs=1,
   tags=['responsible', 'data_transformation'],
) as dag:
 
   transform_data = PythonOperator(task_id="my_transformation", python_callable=transform_data)
 
   profile_data = PythonOperator(task_id="profile_data", python_callable=profile_data)

After that, we can use our integration operators to consume the stored profiles and build three Constraints validators and also a Summary Drift Report.

from whylogs.core.constraints.factories import greater_than_number, mean_between_range
from whylogs_provider.operators.whylogs import (
   WhylogsConstraintsOperator,
   WhylogsSummaryDriftOperator,
)
 
greater_than_check_a = WhylogsConstraintsOperator(
       task_id="greater_than_check_a",
       profile_path="data/profile.bin",
       constraint=greater_than_number(column_name="a", number=0),
   )
   greater_than_check_b = WhylogsConstraintsOperator(
       task_id="greater_than_check_b",
       profile_path="data/profile.bin",
       constraint=greater_than_number(column_name="b", number=0),
   )
 
   avg_between_b = WhylogsConstraintsOperator(
       task_id="avg_between_b",
       profile_path="data/profile.bin",
       break_pipeline=False,
       constraint=mean_between_range(column_name="b", lower=0.0, upper=125.1261236210),
   )
 
   summary_drift = WhylogsSummaryDriftOperator(
       task_id="drift_report",
       target_profile_path="data/profile.bin",
       reference_profile_path="data/ref_profile.bin",
       reader="local",
       write_report_path="data/Profile.html",
   )

And to chain the execution together on our DAG file, all we need to do is add the following piece to our Python code.

(
    transform_data
    >> profile_data
    >> [greater_than_check_a, greater_than_check_b, avg_between_b]
    >> summary_drift
)

That code will create our example DAG, represented by the image below.

With this DAG, we will know right away whether our validations failed for each one of the constraints we put in. Also, if everything works fine, we will write out a Summary Drift Report to our desired location so users can further investigate drift when comparing the profiled data to the reference profile (that was created in the training process, for example).

Image: Summary Drift report for the Wine Classification Dataset
NOTE: One of the possible locations to store your profiles is the local file system, in case you run Airflow on a Virtual Machine instance and its processing and storage pieces are shared. Another option is to submit a spark job to AWS EMR and store the profile on an AWS S3 bucket.
We didn't want to include a strong opinionated way to profile data at this point, because every real world usage scenario will be different.


Conclusion

Now that you have learned how to integrate whylogs into your Airflow DAGs, your pipelines will be more responsible, scalable, and efficient.

If you think another operator could be useful to your case, please open an issue on our open repo and let us know how you're using whylogs with Apache Airflow on the community Slack!

References

Other posts

Get Early Access to the First Purpose-Built Monitoring Solution for LLMs

We’re excited to announce our private beta release of LangKit, the first purpose-built large language model monitoring solution! Join the responsible LLM revolution by signing up for early access.

Mind Your Models: 5 Ways to Implement ML Monitoring in Production

We’ve outlined five easy ways to monitor your ML models in production to ensure they are robust and responsible by monitoring for concept drift, data drift, data quality, AI explainability and more.

Simplifying ML Deployment: A Conversation with BentoML's Founder & CEO Chaoyu Yang

A summary of the live interview with Chaoyu Yang, Founder & CEO at BentoML, on putting machine learning models in production and BentoML's role in simplifying deployment.

Data Drift vs. Concept Drift and Why Monitoring for Them is Important

Data drift and concept drift are two common challenges that can impact ML models on production. In this blog, we'll explore the differences between these two types of drift and why monitoring for them is crucial.

Robust & Responsible AI Newsletter - Issue #5

Every quarter we send out a roundup of the hottest MLOps and Data-Centric AI news including industry highlights, what’s brewing at WhyLabs, and more.

Detecting Financial Fraud in Real-Time: A Guide to ML Monitoring

Fraud is a significant challenge for financial institutions and businesses. As fraudsters constantly adapt their tactics, it’s essential to implement a robust ML monitoring system to ensure that models effectively detect fraud and minimize false positives.

How to Troubleshoot Embeddings Without Eye-balling t-SNE or UMAP Plots

WhyLabs' scalable approach to monitoring high dimensional embeddings data means you don’t have to eye-ball pretty UMAP plots to troubleshoot embeddings!

Achieving Ethical AI with Model Performance Tracing and ML Explainability

With Model Performance Tracing and ML Explainability, we’ve accelerated our customers’ journey toward achieving the three goals of ethical AI - fairness, accountability and transparency.

Detecting and Fixing Data Drift in Computer Vision

In this tutorial, Magdalena Konkiewicz from Toloka focuses on the practical part of data drift detection and fixing it on a computer vision example.
pre footer decoration
pre footer decoration
pre footer decoration

Run AI With Certainty

Book a demo
loading...