Data Quality Monitoring in Apache Airflow with whylogs
- Integration
- whylogs
- Data Logging
Sep 13, 2022
If you are working with data pipelines that need more resilient, complex and scalable processing, Apache Airflow is an excellent choice. The framework helps you create, schedule, and monitor your processes with its friendly API. It will ensure that your planned jobs run when they should and how they should, and also tells you which specific tasks might have failed along the way.
But just knowing that a process ran successfully and on time isn't enough for reliability. That's where whylogs, the open standard for data logging, comes in. whylogs allows users to verify the quality of their data with a Pythonic API for running a constraints validation suite, generating drift reports and much more.
To make the most of whylogs within your existing Airflow pipelines, we’ve created the whylogs Airflow provider. In this blog post, we will use an example to demonstrate how you can use whylogs and Airflow to make your workflow more responsible, scalable, and efficient.
A typical DAG in Apache Airflow
An Airflow pipeline is defined in a Directed Acyclic Graph (DAG), which means that every job will have a starting task and dependencies that will always run in a single direction and will not loop back to the beginning after they finish. A very simple DAG can be visualized in Airflow as the following image shows.
This will make Airflow execute task_a before task_b and task_c. If by any chance task_a fails, then the downstream tasks will not be executed and this can trigger an alert or retry the failing pieces automatically. Production workflows can lead to much larger graphs, opening ways for orchestration tools like Airflow shine. For data pipelines though, successful pipelines might represent wrongful results.
For example, an ML job that completed successfully might have led to drifted data, and without proper monitoring, the results may only be caught much later - if they are ever caught. With whylogs, monitoring Data and Machine Learning can be done very smoothly, and with our newest Airflow Operator, you can extend your existing pipelines right away to make your workflows more reliable.
Extending a DAG with whylogs operators
In order to use our operators, all you need to do is install it in your environment with:
(.venv) $ pip install airflow-provider-whylogs
Once that is done you are now ready to use whylogs and its operators across your data pipelines. To get started, the first thing you will need is to profile your data and store it somewhere your Airflow application can access later on. For demonstration purposes, we will transform and profile the data locally with pandas
and store the profile locally. In order to do so, let's define our first DAG tasks with the following piece of code.
from datetime import datetime
import pandas as pd
import whylogs as why
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
def profile_data(data_path="data/transformed_data.csv"):
df = pd.read_csv(data_path)
result = why.log(df)
result.writer("local").write(dest="data/profile.bin")
def transform_data(input_path="data/raw_data.csv"):
input_data = pd.read_csv(input_path)
clean_df = input_data.dropna(axis=0)
clean_df.to_csv("data/transformed_data.csv")
with DAG(
dag_id='whylogs_example_dag',
schedule_interval=None,
start_date=datetime.now(),
max_active_runs=1,
tags=['responsible', 'data_transformation'],
) as dag:
transform_data = PythonOperator(task_id="my_transformation", python_callable=transform_data)
profile_data = PythonOperator(task_id="profile_data", python_callable=profile_data)
After that, we can use our integration operators to consume the stored profiles and build three Constraints validators and also a Summary Drift Report.
from whylogs.core.constraints.factories import greater_than_number, mean_between_range
from whylogs_provider.operators.whylogs import (
WhylogsConstraintsOperator,
WhylogsSummaryDriftOperator,
)
greater_than_check_a = WhylogsConstraintsOperator(
task_id="greater_than_check_a",
profile_path="data/profile.bin",
constraint=greater_than_number(column_name="a", number=0),
)
greater_than_check_b = WhylogsConstraintsOperator(
task_id="greater_than_check_b",
profile_path="data/profile.bin",
constraint=greater_than_number(column_name="b", number=0),
)
avg_between_b = WhylogsConstraintsOperator(
task_id="avg_between_b",
profile_path="data/profile.bin",
break_pipeline=False,
constraint=mean_between_range(column_name="b", lower=0.0, upper=125.1261236210),
)
summary_drift = WhylogsSummaryDriftOperator(
task_id="drift_report",
target_profile_path="data/profile.bin",
reference_profile_path="data/ref_profile.bin",
reader="local",
write_report_path="data/Profile.html",
)
And to chain the execution together on our DAG file, all we need to do is add the following piece to our Python code.
(
transform_data
>> profile_data
>> [greater_than_check_a, greater_than_check_b, avg_between_b]
>> summary_drift
)
That code will create our example DAG, represented by the image below.
With this DAG, we will know right away whether our validations failed for each one of the constraints we put in. Also, if everything works fine, we will write out a Summary Drift Report to our desired location so users can further investigate drift when comparing the profiled data to the reference profile (that was created in the training process, for example).
NOTE: One of the possible locations to store your profiles is the local file system, in case you run Airflow on a Virtual Machine instance and its processing and storage pieces are shared. Another option is to submit a spark job to AWS EMR and store the profile on an AWS S3 bucket.
We didn't want to include a strong opinionated way to profile data at this point, because every real world usage scenario will be different.
Conclusion
Now that you have learned how to integrate whylogs into your Airflow DAGs, your pipelines will be more responsible, scalable, and efficient.
If you think another operator could be useful to your case, please open an issue on our open repo and let us know how you're using whylogs with Apache Airflow on the community Slack!
References
- Airflow-whylogs-provider: https://pypi.org/project/airflow-provider-whylogs/
- Airflow: https://airflow.apache.org/
- whylogs: https://whylogs.readthedocs.io/
Other posts
WhyLabs Announces SCA with AWS to Accelerate Responsible Generative AI Adoption
Nov 14, 2023
- WhyLabs
- SageMaker
Understanding and Mitigating LLM Hallucinations
Oct 18, 2023
- LLMs
- AI Observability
Understanding and Monitoring Embeddings in Amazon SageMaker with WhyLabs
Sep 11, 2023
- WhyLabs
- ML Monitoring
Data Drift Monitoring and Its Importance in MLOps
Aug 29, 2023
- MLOps
- Data Logging
Glassdoor Decreases Latency Overhead and Improves Data Monitoring with WhyLabs
Aug 17, 2023
- WhyLabs
- Machine Learning
Ensuring AI Success in Healthcare: The Vital Role of ML Monitoring
Aug 10, 2023
- ML Monitoring
WhyLabs Recognized by CB Insights GenAI 50 among the Most Innovative Generative AI Startups
Aug 8, 2023
- WhyLabs
Hugging Face and LangKit: Your Solution for LLM Observability
Jul 26, 2023
- LLMs
- WhyLabs
7 Ways to Monitor Large Language Model Behavior
Jul 20, 2023
- LLMs
- WhyLabs