blog bg left
Back to Blog

Integrating whylogs into your Kafka ML Pipeline

It is well known that machine learning models consume vast amounts of data, both during training and in production.  Kafka is an important part of many event-driven machine learning platforms because it  allows data consumers to be decoupled from producers which has benefits for horizontal scaling and ease of reconfiguration.  A Kafka topic might deliver events containing multiple feature elements, such as web access logs, point of sale transactions, or call log duration.  Any continuous stream of real-time data is a potential source of Kafka events.

Data streams typically power data-driven decisions, either through BI applications or ML/AI applications.  In either case, ensuring data quality is critical to making these applications are trustworthy and reliable.  Evaluating the quality of data in the Kafka stream is a non-trivial task due to large volumes of data and latency requirements.  This is an ideal job for whylogs, an open-source package for Python or Java that uses Apache DataSketches to monitor and detect statistical anomalies in streaming data.  Whylogs produces compact statistical profiles of time series data that can help detect data drift and distribution changes over time.  Most important for Kafka integration, Whylogs profiles can be merged so your monitoring pipeline can scale horizontally and still provide a continuous statistical profile of your entire data stream.

In addition to Kafka, whylogs can be integrated into a variety of data pipelines, including MLflow, SageMaker, and on Spark Pipelines.  This article will discuss how to use whylogs to monitor streams of data supplied by Kafka.

Monitoring Events through Kafka

Shown below is a simple python shim that consumes events from a Kafka topic and processes them through whylogs. Each Kafka event represents a row of JSON-encoded features in a stream of training data.  The example consumes up to 100 events at a time because processing batches is more efficient than processing individual events.  The session.logger in this example is configured to produce a new statistical profile every minute, as long as data is flowing.

from kafka import KafkaConsumer
from whylogs import get_or_create_session
import datetime
import json
import pandas as pd

deserializer=lambda x: json.loads(x.decode('utf-8'))
consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                         value_deserializer=deserializer)

consumer.subscribe(['whylogs-stream'])
session = get_or_create_session()
with session.logger(dataset_name="dataset", with_rotation_time="1m") as logger:
    while True:
        record = consumer.poll(timeout_ms=10000, 
                               max_records=100, 
                               update_offsets=True)
        for k,v in record.items():
            df = pd.DataFrame([row.value for row in v])
            logger.log_dataframe(df)

In production, this consumer might be considerably more complicated.  This example automatically updates the partition offset as soon as the events are consumed.  To avoid losing data due to service disruptions, it would be best to advance the offset pointer only after the whylogs profile covering that event has been persisted to long-term storage.

This example also subscribes to a single Kafka topic and processes events for all partitions of that topic.  However Topics are often divided into multiple partitions to allow horizontal scaling of consumers.   Whylogs can monitor separate partitions and the resulting profiles can be merged into a single profile that covers all events for a topic.

After running this example for some time, we will begin to accumulate profiles for batches of events.  We could examine those profiles individually, but our view would be limited to a single batch.  To graphically display the distribution of feature values over time, we can plot several whylogs profiles at once.

from whylogs import DatasetProfile
import glob

def from_file(fname) -> DatasetProfile:
    with open(fname, 'rb') as fp:
        return DatasetProfile.from_protobuf_string(fp.read()) 

# load whylogs profiles from disk
files = "output/dataset/*/protobuf/*.bin"
profiles = [from_file(fname) for fname in glob.glob(files)]

from whylogs.viz import ProfileVisualizer
viz = ProfileVisualizer()
viz.set_profiles(profiles)
viz.plot_distribution("fico_range_high", ts_format="00:%M:%S")

Merging profiles

Any monitoring solution is likely to fall behind a real-time stream if events are produced at too great a rate. The usual solution for Kafka consumers that fall behind is to run more consumers!

Kafka event streams can be partitioned so each consumer sees only a portion of the events in the stream.  Whylogs can monitor multiple partitions of a topic and later merge the profiles from the same time period without losing statistical power.

This code fragment shows the basics of merging Whylogs profiles to take advantage of horizontal scaling. The merge operation consolidates profiles from smaller time periods into a single profile that covers the entire time range.  The same merge operation will also consolidate profiles that monitor different features over the same time period. That is useful if separate Kafka topics stream events containing distinct model features.

from whylogs import DatasetProfile
import glob

profiles = "output/dataset/*/protobuf/*.bin"

merged = None
for fname in glob.glob(profiles):
    print(f'open {fname}')
    with open(fname, 'rb') as fp:
        p = DatasetProfile.from_protobuf_string(fp.read())
        if merged is None:
            merged = p
        else:
            merged.merge(p)
# `merged` will be a single profile that accumulates all the statistical 
# measures from individual profiles.

Conclusion

Whylogs can help monitor your ML data pipeline no matter how you structure your data pipeline, but is it particularly easy to monitor Kafka event streams.  The resulted profiles are available in protobuf, json and csv formats.  These profiles can be used for manual analysis or continuous monitoring.

If you are considering to use whylogs for your projects, join the Slack community to discuss ideas and share feedback on the library.

If you are looking for a monitoring solution for Kafka data streams, WhyLabs offers a SaaS platform built on top of whylogs.  The platform helps consolidate and visualize whylog profiles over extended time periods and across many features.  Monitoring and alerting can be enabled on any metrics with a configuration-free set-up.  Once set-up, configurable thresholds can send alerts when data quality metrics deviates in the data stream, catching issues like data distribution drifts, data corruption and loss.   A graphically rich dashboard helps you quickly zero-in on the time frame when problems started. Check out the WhyLabs Platform sandbox to see these features in action.

Other posts

Glassdoor Decreases Latency Overhead and Improves Data Monitoring with WhyLabs

The Glassdoor team describes their integration latency challenges and how they were able to decrease latency overhead and improve data monitoring with WhyLabs.

Understanding and Monitoring Embeddings in Amazon SageMaker with WhyLabs

WhyLabs and Amazon Web Services (AWS) explore the various ways embeddings are used, issues that can impact your ML models, how to identify those issues and set up monitors to prevent them in the future!

Data Drift Monitoring and Its Importance in MLOps

It's important to continuously monitor and manage ML models to ensure ML model performance. We explore the role of data drift management and why it's crucial in your MLOps pipeline.

Ensuring AI Success in Healthcare: The Vital Role of ML Monitoring

Discover how ML monitoring plays a crucial role in the Healthcare industry to ensure the reliability, compliance, and overall safety of AI-driven systems.

WhyLabs Recognized by CB Insights GenAI 50 among the Most Innovative Generative AI Startups

WhyLabs has been named on CB Insights’ first annual GenAI 50 list, named as one of the world’s top 50 most innovative companies developing generative AI applications and infrastructure across industries.

Hugging Face and LangKit: Your Solution for LLM Observability

See how easy it is to generate out-of-the-box text metrics for Hugging Face LLMs and monitor them in WhyLabs to identify how model performance and user interaction are changing over time.

7 Ways to Monitor Large Language Model Behavior

Discover seven ways to track and monitor Large Language Model behavior using metrics for ChatGPT’s responses for a fixed set of 200 prompts across 35 days.

Safeguarding and Monitoring Large Language Model (LLM) Applications

We explore the concept of observability and validation in the context of language models, and demonstrate how to effectively safeguard them using guardrails.

Robust & Responsible AI Newsletter - Issue #6

A quarterly roundup of the hottest LLM, ML and Data-Centric AI news, including industry highlights, what’s brewing at WhyLabs, and more.
pre footer decoration
pre footer decoration
pre footer decoration

Run AI With Certainty

Book a demo
loading...