Data Quality Monitoring for Kafka, Beyond Schema Validation
- Whylogs
- Open Source
- ML Monitoring
- Data Quality
Aug 23, 2022
Any application dealing with large amounts of data is vulnerable to data quality issues, whether a machine learning pipeline that ends in model training or a data pipeline into a data warehouse used for analytics. The larger the surface area of the application, the easier it is to have small bugs in far-off corners result in garbage data propagating through, and it's usually an end user or a product manager that finds out about it the hard way. Sure, all of the ints are still ints, but what if there are suddenly 10x more of them, or what if they’re all 10x bigger than last week?
Kafka is commonly used in modern data pipelines, so we'll use it as the basis for exploring data quality issues and mitigations. Data quality often maps to a schema registry or data type validation in the Kafka world, like the validation that Confluent offers. That kind of validation is a good start, but there are a few things most data application owners don’t think about.
When you have monitors that validate distribution shifts, unique value ratios, or data type counts in production, then you can detect issues that result in "weird data" rather than just validating that the data has the right shape. For example, suppose you have a field that you know should be a string or an int. In that case, you can validate each data point to make sure it isn't something else. But if you monitor the count of strings and ints in that field over time, you can be alerted when you stop receiving any strings, or get twice as many ints as you used to. That could mean that some of your upstream data providers are having data quality issues and you'll be able to figure it out before your analytics team asks you why their query results look “weird”.
Data quality issues
We'll use a few hypothetical error scenarios with semi-real-time data from the World of Warcraft API for the in-game auction house on their Kil'Jaeden server. These examples highlight subtle quality issues that schema validation won't pick up. The graphs used are from our WhyLabs SaaS offering.
For reference, the data in this dataset that represents WoW auctions looks roughly like this.
[
{
"id": 288802828,
"item.id": 4389,
"quantity": 34,
"unit_price": 840000,
"time_left": "SHORT"
},
{
"id": 288803158,
"item.id": 55598,
"item.modifiers.0.type": 9,
"item.modifiers.0.value": 35,
"item.modifiers.1.type": 28,
"item.modifiers.1.value": 55,
"buyout": 5000000,
"quantity": 1,
"time_left": "SHORT"
},
// ...
]
Distribution shifts in IDs
Distribution shifts in IDs is a common problem in machine learning. If the data in production slowly drifts from the data that a model was trained on over time, then a model will start generating bad predictions. It can be hard to catch in real time, because it might happen slowly or slightly.
Distribution shifts can indicate other issues in application logic for non machine learning applications. In this case, each auction in Blizzard's API has a unique, auto increment ID. The application owner should expect that the auction IDs consistently drift up over time. More drastic shifts in drift in any given hour might mean that some subset of auctions are being omitted from the API, some async process invalidates a series of usable auction IDs, or that there’s some other issue with the system. This example highlights that a system can remain sound while error behavior elsewhere flies under the radar until it eventually boils over.
Missing or delayed data
The auction house API doesn't let you request historical data and only updates once an hour. This means that any hour that doesn't serve new data results in clients missing the chance to consume that data forever.
Tracking the absence of data in combination with the amount of null values in the data will clearly signal that something isn't working. Ideally, you have other tests at various stages that prevent the underlying issue from getting deployed at all, but developers can miss things. This metric gives you a black box view that you can fall back on when things are missed. Without this, you’ll discover missing data when a user asks why it's suddenly gone.
Uniqueness changes
Going back to the auction IDs from the distribution shift, we can make additional assumptions about how many unique IDs we should expect to see. Since they're auto-incrementing, we expect there to never be unique IDs. Any drop in the ratio of unique values means something is wrong, even though the schema hasn't been violated, and each auction still makes sense.
The drop in the graph above could represent a single hour of data being duplicated such that each auction appears twice, which is not supposed to happen. This can give you a hint about which part of your application is likely malfunctioning at a glance, and which parts of your applications (that make assumptions about uniqueness) will be impacted as a side effect of the issue.
Data type changes
Of course, you can still track the inferred data type of fields. This graph represents the count of integer types found in the auction ID field. The spike here lines up with the uniqueness graph, supporting the theory that a batch of auction data had each auction duplicated in its payload.
This problem is easy to miss with simple schema validation. This information would be useful if you had a field that could be a union of two types, like string and integer. In the latter case, schema validation could only tell you that the types were correct, while a graph of counts of each type would cue you into changes in the amount of strings or ints over time. It might be important that you suddenly stopped receiving any string values and the amount of ints you see has doubled, for example.
Setting up data quality monitoring on a Kafka stream
Once you're convinced of the need to monitor data quality beyond simple schema validation, it's time to examine how to set up a solution.
Let’s dive into whylogs, the tool used to monitor data quality. The easiest way to get started is to use our whylogs container as a Kafka consumer on your cluster.
docker run --env-file conf.env whylabs/whylogs:latest
And conf.env
will end up looking something like this.
#
# Kafka settings
#
KAFKA_ENABLED=true
KAFKA_TOPIC_DATASET_IDS={"wow-ah": "model-xxxx"}
KAFKA_BOOTSTRAP_SERVERS=["http://localhost:9093"] # Wherever your kafka cluster is
KAFKA_GROUP_ID=whylogs_dev # Preferred kafka group id for the container
KAFKA_TOPICS=["wow-ah"] # List of topics to monitor
KAFKA_CONSUMER_THREADS=5 # How many consumer threads to spin up in the container
#
# WhyLabs settings
#
UPLOAD_DESTINATION=WHYLABS
# Generated from our api key dashboard at https://hub.whylabsapp.com/settings/access-tokens
WHYLABS_API_KEY=
WHYLOGS_PERIOD=HOURS
ORG_ID=org-xxxx
# The container has various resillience options that we won't need since we're using this
# with Kafka.
PROFILE_STORAGE_MODE=IN_MEMORY
REQUEST_QUEUEING_ENABLED=false
PORT=2032 # Port of the REST interface on the container. Can be ignored for now.
We have a full description of all configuration options on our dedicated doc page. Running the container with this configuration will give you a server that spins up some number of Kafka consumers (with dedicated threads) that will consume from the topics you listed, reduce those topics into compact profiles and upload those profiles every hour to WhyLabs for monitoring and visualization.
The consumers expect the data in the topics to be dictionaries of key value pairs. Any nested values will be flattened by default, forming keys of the form level1.level2.etc
.
How it works
The container we're running here also acts as a stand-alone REST server that you can send data to in pandas format, as described here.
The container just acts as a set of Kafka consumers. The consumers are created via the high level KafkaConsumer in the Java library, so most of the behavior is inherited from there. The container maintains a map of timestamp ➝ profile. Each consumer will attempt to reduce the messages they receive into the profile that exists at the appropriate timestamp, or create a profile if it doesn't exist yet. They're all parsing the messages with Jackson and using whylogs to convert the data into profiles.
What is a profile?
A profile is a lightweight statistical representation of some data. You can think of a profile as a bunch of aggregations like sum, count, and mean, over some set of data. That means that the size of a profile will be very small relative to the data it represents. In reality, the aggregations are more complicated distributions, but the principle is the same.
Profile sizes will scale with the number of fields/columns/features rather than the number of rows of data. This also means that you don't end up uploading all of your data, only the statistical summaries, which is ideal for anyone who isn't allowed to share their data with third parties. The only things transmitted from the original dataset are field names (the keys of the dictionaries in your Kafka data) and, conditionally, top k frequent items for non-discrete values.
Profiles are generated with our open source library, whylogs. Those profiles can be compared to one another, visualized in notebook environments, and monitored for changes. We built WhyLabs to make monitoring and visualizing these profiles easy and cost-effective.
What’s next?
Now, you can set up monitors and alerts on any of the data in any, or all, of your Kafka topics. Sign up for a WhyLabs starter account and set up data quality monitoring with the whylogs container for a single project - for free!
We have a sample repo that you can run locally with the data used in this blog. The container code and our open source library, whylogs are both hosted on Github. You can also run the entire demo using this GitHub repo, which spins up a Kafka cluster, the whylogs container, and a small app that deposits data into the cluster from Blizzard's API each hour.
There are many ways to run a container, and everyone has a unique set of requirements. We'll help you find the best solution for your stack, so don't hesitate to reach out on Slack or contact us if you have any questions.
Other posts
Best Practicies for Monitoring and Securing RAG Systems in Production
Oct 8, 2024
- Retrival-Augmented Generation (RAG)
- LLM Security
- Generative AI
- ML Monitoring
- LangKit
How to Evaluate and Improve RAG Applications for Safe Production Deployment
Jul 17, 2024
- AI Observability
- LLMs
- LLM Security
- LangKit
- RAG
- Open Source
WhyLabs Integrates with NVIDIA NIM to Deliver GenAI Applications with Security and Control
Jun 2, 2024
- AI Observability
- Generative AI
- Integrations
- LLM Security
- LLMs
- Partnerships
OWASP Top 10 Essential Tips for Securing LLMs: Guide to Improved LLM Safety
May 21, 2024
- LLMs
- LLM Security
- Generative AI
7 Ways to Evaluate and Monitor LLMs
May 13, 2024
- LLMs
- Generative AI
How to Distinguish User Behavior and Data Drift in LLMs
May 7, 2024
- LLMs
- Generative AI