blog bg left
Back to Blog

Data Quality Monitoring in Apache Airflow with whylogs

If you are working with data pipelines that need more resilient, complex and scalable processing, Apache Airflow is an excellent choice. The framework helps you create, schedule, and monitor your processes with its friendly API. It will ensure that your planned jobs run when they should and how they should, and also tells you which specific tasks might have failed along the way.

But just knowing that a process ran successfully and on time isn't enough for reliability. That's where whylogs, the open standard for data logging, comes in. whylogs allows users to verify the quality of their data with a Pythonic API for running a constraints validation suite, generating drift reports and much more.

To make the most of whylogs within your existing Airflow pipelines, we’ve created the whylogs Airflow provider. In this blog post, we will use an example to demonstrate how you can use whylogs and Airflow to make your workflow more responsible, scalable, and efficient.

A typical DAG in Apache Airflow

An Airflow pipeline is defined in a Directed Acyclic Graph (DAG), which means that every job will have a starting task and dependencies that will always run in a single direction and will not loop back to the beginning after they finish. A very simple DAG can be visualized in Airflow as the following image shows.

This will make Airflow execute task_a before task_b and task_c. If by any chance task_a fails, then the downstream tasks will not be executed and this can trigger an alert or retry the failing pieces automatically. Production workflows can lead to much larger graphs, opening ways for orchestration tools like Airflow shine. For data pipelines though, successful pipelines might represent wrongful results.

For example, an ML job that completed successfully might have led to drifted data, and without proper monitoring, the results may only be caught much later - if they are ever caught. With whylogs, monitoring Data and Machine Learning can be done very smoothly, and with our newest Airflow Operator, you can extend your existing pipelines right away to make your workflows more reliable.

Extending a DAG with whylogs operators

In order to use our operators, all you need to do is install it in your environment with:

(.venv) $ pip install airflow-provider-whylogs

Once that is done you are now ready to use whylogs and its operators across your data pipelines. To get started, the first thing you will need is to profile your data and store it somewhere your Airflow application can access later on. For demonstration purposes, we will transform and profile the data locally with pandas and store the profile locally. In order to do so, let's define our first DAG tasks with the following piece of code.

from datetime import datetime
 
import pandas as pd
import whylogs as why
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
 
def profile_data(data_path="data/transformed_data.csv"):
   df = pd.read_csv(data_path)
   result = why.log(df)
   result.writer("local").write(dest="data/profile.bin")
 
 
def transform_data(input_path="data/raw_data.csv"):
   input_data = pd.read_csv(input_path)
   clean_df = input_data.dropna(axis=0)
   clean_df.to_csv("data/transformed_data.csv")
 
 
with DAG(
   dag_id='whylogs_example_dag',
   schedule_interval=None,
   start_date=datetime.now(),
   max_active_runs=1,
   tags=['responsible', 'data_transformation'],
) as dag:
 
   transform_data = PythonOperator(task_id="my_transformation", python_callable=transform_data)
 
   profile_data = PythonOperator(task_id="profile_data", python_callable=profile_data)

After that, we can use our integration operators to consume the stored profiles and build three Constraints validators and also a Summary Drift Report.

from whylogs.core.constraints.factories import greater_than_number, mean_between_range
from whylogs_provider.operators.whylogs import (
   WhylogsConstraintsOperator,
   WhylogsSummaryDriftOperator,
)
 
greater_than_check_a = WhylogsConstraintsOperator(
       task_id="greater_than_check_a",
       profile_path="data/profile.bin",
       constraint=greater_than_number(column_name="a", number=0),
   )
   greater_than_check_b = WhylogsConstraintsOperator(
       task_id="greater_than_check_b",
       profile_path="data/profile.bin",
       constraint=greater_than_number(column_name="b", number=0),
   )
 
   avg_between_b = WhylogsConstraintsOperator(
       task_id="avg_between_b",
       profile_path="data/profile.bin",
       break_pipeline=False,
       constraint=mean_between_range(column_name="b", lower=0.0, upper=125.1261236210),
   )
 
   summary_drift = WhylogsSummaryDriftOperator(
       task_id="drift_report",
       target_profile_path="data/profile.bin",
       reference_profile_path="data/ref_profile.bin",
       reader="local",
       write_report_path="data/Profile.html",
   )

And to chain the execution together on our DAG file, all we need to do is add the following piece to our Python code.

(
    transform_data
    >> profile_data
    >> [greater_than_check_a, greater_than_check_b, avg_between_b]
    >> summary_drift
)

That code will create our example DAG, represented by the image below.

With this DAG, we will know right away whether our validations failed for each one of the constraints we put in. Also, if everything works fine, we will write out a Summary Drift Report to our desired location so users can further investigate drift when comparing the profiled data to the reference profile (that was created in the training process, for example).

Image: Summary Drift report for the Wine Classification Dataset
NOTE: One of the possible locations to store your profiles is the local file system, in case you run Airflow on a Virtual Machine instance and its processing and storage pieces are shared. Another option is to submit a spark job to AWS EMR and store the profile on an AWS S3 bucket.
We didn't want to include a strong opinionated way to profile data at this point, because every real world usage scenario will be different.


Conclusion

Now that you have learned how to integrate whylogs into your Airflow DAGs, your pipelines will be more responsible, scalable, and efficient.

If you think another operator could be useful to your case, please open an issue on our open repo and let us know how you're using whylogs with Apache Airflow on the community Slack!

References

Other posts

Model Monitoring for Financial Fraud Classification

Model monitoring is helping the financial services industry avoid huge losses caused by performance degradation in their fraud transaction models.

Data and ML Monitoring is Easier with whylogs v1.1

The release of whylogs v1.1 brings many features to the whylogs data logging API, making it even easier to monitor your data and ML models!

Robust & Responsible AI Newsletter - Issue #3

Every quarter we send out a roundup of the hottest MLOps and Data-Centric AI news including industry highlights, what’s brewing at WhyLabs, and more.

Data Logging with whylogs: Profiling for Efficiency and Speed

Rather than sampling data, whylogs captures snapshots of the data making it fast and efficient for data logging, even if your datasets scale to larger sizes.

Data Quality Monitoring for Kafka, Beyond Schema Validation

Data quality mapped to a schema registry or data type validation is a good start, but there are a few things most data application owners don’t think about. We explore error scenarios beyond schema validation and how to mitigate them.

Data + Model Monitoring with WhyLabs: simple, customizable, actionable

The new monitoring system maximizes the helpfulness of alerts and minimizes alert fatigue, so users can focus on improving their models instead of worrying about them in production...

A Solution for Monitoring Image Data

A breakdown of how to monitor unstructured data such as images, the types of problems that threaten computer vision systems, and a solution for these challenges.

How to Validate Data Quality for ML Monitoring

Data quality is one of the most important considerations for machine learning applications—and it's one of the most frequently overlooked. We explore why it’s an essential step in the MLOps process and how to check your data quality with whylogs.

Small Changes for Big SQLite Performance Increases

A behind-the-scenes look at how the WhyLabs engineering team improved SQLite performance to make monitoring data and machine learning models faster and easier for whylogs users.
pre footer decoration
pre footer decoration
pre footer decoration

Run AI With Certainty

Book a demo
loading...