blog bg left
Back to Blog

Data Quality Monitoring in Apache Airflow with whylogs

If you are working with data pipelines that need more resilient, complex and scalable processing, Apache Airflow is an excellent choice. The framework helps you create, schedule, and monitor your processes with its friendly API. It will ensure that your planned jobs run when they should and how they should, and also tells you which specific tasks might have failed along the way.

But just knowing that a process ran successfully and on time isn't enough for reliability. That's where whylogs, the open standard for data logging, comes in. whylogs allows users to verify the quality of their data with a Pythonic API for running a constraints validation suite, generating drift reports and much more.

To make the most of whylogs within your existing Airflow pipelines, we’ve created the whylogs Airflow provider. In this blog post, we will use an example to demonstrate how you can use whylogs and Airflow to make your workflow more responsible, scalable, and efficient.

A typical DAG in Apache Airflow

An Airflow pipeline is defined in a Directed Acyclic Graph (DAG), which means that every job will have a starting task and dependencies that will always run in a single direction and will not loop back to the beginning after they finish. A very simple DAG can be visualized in Airflow as the following image shows.

This will make Airflow execute task_a before task_b and task_c. If by any chance task_a fails, then the downstream tasks will not be executed and this can trigger an alert or retry the failing pieces automatically. Production workflows can lead to much larger graphs, opening ways for orchestration tools like Airflow shine. For data pipelines though, successful pipelines might represent wrongful results.

For example, an ML job that completed successfully might have led to drifted data, and without proper monitoring, the results may only be caught much later - if they are ever caught. With whylogs, monitoring Data and Machine Learning can be done very smoothly, and with our newest Airflow Operator, you can extend your existing pipelines right away to make your workflows more reliable.

Extending a DAG with whylogs operators

In order to use our operators, all you need to do is install it in your environment with:

(.venv) $ pip install airflow-provider-whylogs

Once that is done you are now ready to use whylogs and its operators across your data pipelines. To get started, the first thing you will need is to profile your data and store it somewhere your Airflow application can access later on. For demonstration purposes, we will transform and profile the data locally with pandas and store the profile locally. In order to do so, let's define our first DAG tasks with the following piece of code.

from datetime import datetime
 
import pandas as pd
import whylogs as why
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
 
def profile_data(data_path="data/transformed_data.csv"):
   df = pd.read_csv(data_path)
   result = why.log(df)
   result.writer("local").write(dest="data/profile.bin")
 
 
def transform_data(input_path="data/raw_data.csv"):
   input_data = pd.read_csv(input_path)
   clean_df = input_data.dropna(axis=0)
   clean_df.to_csv("data/transformed_data.csv")
 
 
with DAG(
   dag_id='whylogs_example_dag',
   schedule_interval=None,
   start_date=datetime.now(),
   max_active_runs=1,
   tags=['responsible', 'data_transformation'],
) as dag:
 
   transform_data = PythonOperator(task_id="my_transformation", python_callable=transform_data)
 
   profile_data = PythonOperator(task_id="profile_data", python_callable=profile_data)

After that, we can use our integration operators to consume the stored profiles and build three Constraints validators and also a Summary Drift Report.

from whylogs.core.constraints.factories import greater_than_number, mean_between_range
from whylogs_provider.operators.whylogs import (
   WhylogsConstraintsOperator,
   WhylogsSummaryDriftOperator,
)
 
greater_than_check_a = WhylogsConstraintsOperator(
       task_id="greater_than_check_a",
       profile_path="data/profile.bin",
       constraint=greater_than_number(column_name="a", number=0),
   )
   greater_than_check_b = WhylogsConstraintsOperator(
       task_id="greater_than_check_b",
       profile_path="data/profile.bin",
       constraint=greater_than_number(column_name="b", number=0),
   )
 
   avg_between_b = WhylogsConstraintsOperator(
       task_id="avg_between_b",
       profile_path="data/profile.bin",
       break_pipeline=False,
       constraint=mean_between_range(column_name="b", lower=0.0, upper=125.1261236210),
   )
 
   summary_drift = WhylogsSummaryDriftOperator(
       task_id="drift_report",
       target_profile_path="data/profile.bin",
       reference_profile_path="data/ref_profile.bin",
       reader="local",
       write_report_path="data/Profile.html",
   )

And to chain the execution together on our DAG file, all we need to do is add the following piece to our Python code.

(
    transform_data
    >> profile_data
    >> [greater_than_check_a, greater_than_check_b, avg_between_b]
    >> summary_drift
)

That code will create our example DAG, represented by the image below.

With this DAG, we will know right away whether our validations failed for each one of the constraints we put in. Also, if everything works fine, we will write out a Summary Drift Report to our desired location so users can further investigate drift when comparing the profiled data to the reference profile (that was created in the training process, for example).

Image: Summary Drift report for the Wine Classification Dataset
NOTE: One of the possible locations to store your profiles is the local file system, in case you run Airflow on a Virtual Machine instance and its processing and storage pieces are shared. Another option is to submit a spark job to AWS EMR and store the profile on an AWS S3 bucket.
We didn't want to include a strong opinionated way to profile data at this point, because every real world usage scenario will be different.


Conclusion

Now that you have learned how to integrate whylogs into your Airflow DAGs, your pipelines will be more responsible, scalable, and efficient.

If you think another operator could be useful to your case, please open an issue on our open repo and let us know how you're using whylogs with Apache Airflow on the community Slack!

References

Other posts

Achieving Ethical AI with Model Performance Tracing and ML Explainability

With Model Performance Tracing and ML Explainability, we’ve accelerated our customers’ journey toward achieving the three goals of ethical AI - fairness, accountability and transparency.

BigQuery Data Monitoring with WhyLabs

We’re excited to announce the release of a no-code solution for data monitoring in Google BigQuery, making it simple to monitor your data quality without writing a single line of code.

Robust & Responsible AI Newsletter - Issue #4

Every quarter we send out a roundup of the hottest MLOps and Data-Centric AI news including industry highlights, what’s brewing at WhyLabs, and more.

WhyLabs Private Beta: Real-time Data Monitoring on Prem

We’re excited to announce our Private Beta release of an extension service for the Profile Store, enabling production use cases of whylogs on customers' premises.

Understanding Kolmogorov-Smirnov (KS) Tests for Data Drift on Profiled Data

We experiment with statistical tests, Kolmogorov-Smirnov (KS) specifically, applied to full datasets and dataset profiles and compare the results.

Re-imagine Data Monitoring with whylogs and Apache Spark

An overview of how the whylogs integration with Apache Spark achieves large scale data profiling, and how users can apply this integration into existing data and ML pipelines.

ML Monitoring in Under 5 Minutes

A quick guide to using whylogs and WhyLabs to monitor common issues with your ML models to surface data drift, concept drift, data quality, and performance issues.

AIShield and WhyLabs: Threat Detection and Monitoring for AI

The seamless integration of AIShield’s security insights on WhyLabs AI observability platform delivers comprehensive insights into ML workloads and brings security hardening to AI-powered enterprises.

Large Scale Data Profiling with whylogs and Fugue on Spark, Ray or Dask

Profiling large-scale data for use cases such as anomaly detection, drift detection, and data validation with Fugue on Spark, Ray or Dask.
pre footer decoration
pre footer decoration
pre footer decoration

Run AI With Certainty

Book a demo
loading...