blog bg left
Back to Blog

Data Quality Monitoring for Kafka, Beyond Schema Validation

Any application dealing with large amounts of data is vulnerable to data quality issues, whether a machine learning pipeline that ends in model training or a data pipeline into a data warehouse used for analytics. The larger the surface area of the application, the easier it is to have small bugs in far-off corners result in garbage data propagating through, and it's usually an end user or a product manager that finds out about it the hard way. Sure, all of the ints are still ints, but what if there are suddenly 10x more of them, or what if they’re all 10x bigger than last week?

Kafka is commonly used in modern data pipelines, so we'll use it as the basis for exploring data quality issues and mitigations. Data quality often maps to a schema registry or data type validation in the Kafka world, like the validation that Confluent offers. That kind of validation is a good start, but there are a few things most data application owners don’t think about.

When you have monitors that validate distribution shifts, unique value ratios, or data type counts in production, then you can detect issues that result in "weird data" rather than just validating that the data has the right shape. For example, suppose you have a field that you know should be a string or an int. In that case, you can validate each data point to make sure it isn't something else. But if you monitor the count of strings and ints in that field over time, you can be alerted when you stop receiving any strings, or get twice as many ints as you used to. That could mean that some of your upstream data providers are having data quality issues and you'll be able to figure it out before your analytics team asks you why their query results look “weird”.

Data quality issues

We'll use a few hypothetical error scenarios with semi-real-time data from the World of Warcraft API for the in-game auction house on their Kil'Jaeden server. These examples highlight subtle quality issues that schema validation won't pick up. The graphs used are from our WhyLabs SaaS offering.

For reference, the data in this dataset that represents WoW auctions looks roughly like this.

[
    {
        "id": 288802828,
        "item.id": 4389,
        "quantity": 34,
        "unit_price": 840000,
        "time_left": "SHORT"
    },
    {
        "id": 288803158,
        "item.id": 55598,
        "item.modifiers.0.type": 9,
        "item.modifiers.0.value": 35,
        "item.modifiers.1.type": 28,
        "item.modifiers.1.value": 55,
        "buyout": 5000000,
        "quantity": 1,
        "time_left": "SHORT"
    },
    // ...
]

Distribution shifts in IDs

Distribution shifts in IDs is a common problem in machine learning. If the data in production slowly drifts from the data that a model was trained on over time, then a model will start generating bad predictions. It can be hard to catch in real time, because it might happen slowly or slightly.

Distribution shifts can indicate other issues in application logic for non machine learning applications. In this case, each auction in Blizzard's API has a unique, auto increment ID. The application owner should expect that the auction IDs consistently drift up over time. More drastic shifts in drift in any given hour might mean that some subset of auctions are being omitted from the API, some async process invalidates a series of usable auction IDs, or that there’s some other issue with the system. This example highlights that a system can remain sound while error behavior elsewhere flies under the radar until it eventually boils over.

Missing or delayed data

The auction house API doesn't let you request historical data and only updates once an hour. This means that any hour that doesn't serve new data results in clients missing the chance to consume that data forever.

Tracking the absence of data in combination with the amount of null values in the data will clearly signal that something isn't working. Ideally, you have other tests at various stages that prevent the underlying issue from getting deployed at all, but developers can miss things. This metric gives you a black box view that you can fall back on when things are missed. Without this, you’ll discover missing data when a user asks why it's suddenly gone.

Uniqueness changes

Going back to the auction IDs from the distribution shift, we can make additional assumptions about how many unique IDs we should expect to see. Since they're auto-incrementing, we expect there to never be unique IDs. Any drop in the ratio of unique values means something is wrong, even though the schema hasn't been violated, and each auction still makes sense.

The drop in the graph above could represent a single hour of data being duplicated such that each auction appears twice, which is not supposed to happen. This can give you a hint about which part of your application is likely malfunctioning at a glance, and which parts of your applications (that make assumptions about uniqueness) will be impacted as a side effect of the issue.

Data type changes

Of course, you can still track the inferred data type of fields. This graph represents the count of integer types found in the auction ID field. The spike here lines up with the uniqueness graph, supporting the theory that a batch of auction data had each auction duplicated in its payload.

This problem is easy to miss with simple schema validation. This information would be useful if you had a field that could be a union of two types, like string and integer. In the latter case, schema validation could only tell you that the types were correct, while a graph of counts of each type would cue you into changes in the amount of strings or ints over time. It might be important that you suddenly stopped receiving any string values and the amount of ints you see has doubled, for example.

Setting up data quality monitoring on a Kafka stream

Once you're convinced of the need to monitor data quality beyond simple schema validation, it's time to examine how to set up a solution.

Let’s dive into whylogs, the tool used to monitor data quality. The easiest way to get started is to use our whylogs container as a Kafka consumer on your cluster.

docker run --env-file conf.env whylabs/whylogs:latest

And conf.env will end up looking something like this.

#
# Kafka settings
#
KAFKA_ENABLED=true
KAFKA_TOPIC_DATASET_IDS={"wow-ah": "model-xxxx"}
KAFKA_BOOTSTRAP_SERVERS=["http://localhost:9093"] # Wherever your kafka cluster is
KAFKA_GROUP_ID=whylogs_dev # Preferred kafka group id for the container
KAFKA_TOPICS=["wow-ah"] # List of topics to monitor
KAFKA_CONSUMER_THREADS=5 # How many consumer threads to spin up in the container

#
# WhyLabs settings
#
UPLOAD_DESTINATION=WHYLABS
# Generated from our api key dashboard at https://hub.whylabsapp.com/settings/access-tokens
WHYLABS_API_KEY=
WHYLOGS_PERIOD=HOURS
ORG_ID=org-xxxx

# The container has various resillience options that we won't need since we're using this 
# with Kafka.
PROFILE_STORAGE_MODE=IN_MEMORY
REQUEST_QUEUEING_ENABLED=false
PORT=2032 # Port of the REST interface on the container. Can be ignored for now.

We have a full description of all configuration options on our dedicated doc page. Running the container with this configuration will give you a server that spins up some number of Kafka consumers (with dedicated threads) that will consume from the topics you listed, reduce those topics into compact profiles and upload those profiles every hour to WhyLabs for monitoring and visualization.

The consumers expect the data in the topics to be dictionaries of key value pairs. Any nested values will be flattened by default, forming keys of the form level1.level2.etc.

How it works

The container we're running here also acts as a stand-alone REST server that you can send data to in pandas format, as described here.

The container just acts as a set of Kafka consumers. The consumers are created via the high level KafkaConsumer in the Java library, so most of the behavior is inherited from there. The container maintains a map of timestamp ➝ profile. Each consumer will attempt to reduce the messages they receive into the profile that exists at the appropriate timestamp, or create a profile if it doesn't exist yet. They're all parsing the messages with Jackson and using whylogs to convert the data into profiles.

What is a profile?

A profile is a lightweight statistical representation of some data. You can think of a profile as a bunch of aggregations like sum, count, and mean, over some set of data. That means that the size of a profile will be very small relative to the data it represents. In reality, the aggregations are more complicated distributions, but the principle is the same.

Profile sizes will scale with the number of fields/columns/features rather than the number of rows of data. This also means that you don't end up uploading all of your data, only the statistical summaries, which is ideal for anyone who isn't allowed to share their data with third parties. The only things transmitted from the original dataset are field names (the keys of the dictionaries in your Kafka data) and, conditionally, top k frequent items for non-discrete values.

Profiles are generated with our open source library, whylogs. Those profiles can be compared to one another, visualized in notebook environments, and monitored for changes. We built WhyLabs to make monitoring and visualizing these profiles easy and cost-effective.

What’s next?

Now, you can set up monitors and alerts on any of the data in any, or all, of your Kafka topics. Sign up for a WhyLabs starter account and set up data quality monitoring with the whylogs container for a single project - for free!

We have a sample repo that you can run locally with the data used in this blog. The container code and our open source library, whylogs are both hosted on Github. You can also run the entire demo using this GitHub repo, which spins up a Kafka cluster, the whylogs container, and a small app that deposits data into the cluster from Blizzard's API each hour.

There are many ways to run a container, and everyone has a unique set of requirements. We'll help you find the best solution for your stack, so don't hesitate to reach out on Slack or contact us if you have any questions.

Other posts

Achieving Ethical AI with Model Performance Tracing and ML Explainability

With Model Performance Tracing and ML Explainability, we’ve accelerated our customers’ journey toward achieving the three goals of ethical AI - fairness, accountability and transparency.

BigQuery Data Monitoring with WhyLabs

We’re excited to announce the release of a no-code solution for data monitoring in Google BigQuery, making it simple to monitor your data quality without writing a single line of code.

Robust & Responsible AI Newsletter - Issue #4

Every quarter we send out a roundup of the hottest MLOps and Data-Centric AI news including industry highlights, what’s brewing at WhyLabs, and more.

WhyLabs Private Beta: Real-time Data Monitoring on Prem

We’re excited to announce our Private Beta release of an extension service for the Profile Store, enabling production use cases of whylogs on customers' premises.

Understanding Kolmogorov-Smirnov (KS) Tests for Data Drift on Profiled Data

We experiment with statistical tests, Kolmogorov-Smirnov (KS) specifically, applied to full datasets and dataset profiles and compare the results.

Re-imagine Data Monitoring with whylogs and Apache Spark

An overview of how the whylogs integration with Apache Spark achieves large scale data profiling, and how users can apply this integration into existing data and ML pipelines.

ML Monitoring in Under 5 Minutes

A quick guide to using whylogs and WhyLabs to monitor common issues with your ML models to surface data drift, concept drift, data quality, and performance issues.

AIShield and WhyLabs: Threat Detection and Monitoring for AI

The seamless integration of AIShield’s security insights on WhyLabs AI observability platform delivers comprehensive insights into ML workloads and brings security hardening to AI-powered enterprises.

Large Scale Data Profiling with whylogs and Fugue on Spark, Ray or Dask

Profiling large-scale data for use cases such as anomaly detection, drift detection, and data validation with Fugue on Spark, Ray or Dask.
pre footer decoration
pre footer decoration
pre footer decoration

Run AI With Certainty

Book a demo
loading...