Kafka Streams

Windowing in Kafka Streams

Senthil Nayagan
Senthil Nayagan           

Windowing refers to the process of dividing a continuous stream of data into discrete segments, or windows, based on time. These windows then serve as the basis for applying computational operations, such as aggregations or transformations, to the data contained within them.
Windowing in Kafka Streams

Image Credits: Image generated by DALL-E.
It's a visual metaphor for the nuanced working of Kafka Streams' windowed aggregation feature.


Overview

Windowing in Kafka Streams is a powerful feature that allows us to manage and analyze data that enters a Kafka cluster in a time-sequenced order. It enables the grouping of records based on time intervals, or windows, which makes temporal operations on data streams easier. The term “temporal operations” refers to operations that are based on time. Having said that, windowing refers to the process of dividing a continuous stream of data into discrete segments, or “windows,” based on time. This feature is particularly useful for applications that require real-time analytics, aggregations, or other time-based processing on continuous streams of data.

By segmenting the data temporally, windowing allows for more granular and time-sensitive analysis of streaming data. For example, calculating the average number of website visits per minute, detecting patterns of activity within a certain time frame, or summarizing financial transactions on an hourly basis.

Types of windows supported in Kafka streams

Kafka Streams supports various types of windowing operations to group events by time boundaries, allowing for time-sensitive processing of streaming data:

Before we go further into explaining all these types of windows, let’s understand some of the core concepts:

  • Window size: The duration of each window. All events within this period are aggregated together.
  • Advance interval (Hop size): The amount by which each window advances. If the advance interval is less than the window size, the windows will overlap.

Tumbling windows

Fixed-size, non-overlapping intervals of time. They are useful for cases where we want to perform an operation on non-overlapping segments of data.

Use case: A good use case for tumbling windows is inventory tracking because we only want the unique number of items sold per window.

Example

Let’s say we want to determine the average temperature every ten minutes from a stream of temperature signals from a sensor. To guarantee that every temperature reading is contained in a single 10-minute window, tumbling windows are used in this situation. Averages are then computed for these non-overlapping periods.

Let’s say the sensor sends readings every minute: 1, 3, 7, 11, 15, 18, 22, 25. Using 10-minute tumbling windows, the readings would be grouped and averaged as follows:

  • Window 1 (0-10 minutes): Includes readings at minutes 1, 3, 7. The average of these readings is calculated.
  • Window 2 (10-20 minutes): Includes readings at minutes 11, 15, 18. The average of these readings is calculated.
  • Window 3 (20-30 minutes): Includes readings at minutes 22, 25. The average of these readings is calculated.

Tumbling windows

Figure 1: Tumbling windows. Original image (1331 × 389 pixels).

Each reading is accounted for in exactly one window, with no overlap between windows.

Code example

Here’s how we might set up a tumbling window in Kafka Streams for aggregating temperature readings:

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;

import java.time.Duration;

public class TumblingWindowExample {
    public static void main(String[] args) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, Double> temperatureReadings = builder.stream("temperature-sensor");

        TimeWindows tumblingWindows = TimeWindows.of(Duration.ofMinutes(10)); // Define a 10-minute tumbling window

        KTable<Windowed<String>, Double> averageTemperatures = temperatureReadings
            .groupByKey() // Assuming the key is the sensor ID
            .windowedBy(tumblingWindows)
            .aggregate(
                () -> 0.0, // Initializer
                (aggKey, newValue, aggValue) -> (aggValue + newValue) / 2, // Aggregator
                Materialized.as("average-temperatures-store") // State store name
            );

        // Further processing or output the results to a topic
    }
}

Hopping windows

Fixed-size, overlapping intervals of time. They allow for operations over windows that move forward by a smaller step than the window size, providing overlapping data aggregates. This particular window is called hopping because each window moves forward by a fixed period, which is less than or equal to the size of the window itself. A hopping window is defined by a window size and the size of the time block at which it advances. This results in windows that overlap with each other, providing a way to analyze data points that are close in time across multiple windows.

Hop: Literally, “hop” means “lightly jump.” In our context, “advance by.” If the hop size is equal to the window size, then there’s no overlap, and the windows are referred to as tumbling windows.

Example

Imagine we are processing a stream of page views with a hopping window. We want to count the number of page views in windows of 5 minutes, but we want to update the count every minute. In this scenario:

  • Window Size: 5 minutes
  • Advance Interval (Hop Size): 1 minute

This configuration means that every minute, a new window starts, but each window covers data from the past 5 minutes. Therefore, at any given time, there are multiple overlapping windows active.

Let’s say our events are timestamped as follows (in minutes): 1, 2, 3, 4, 5, 6, 7, 8, 9, 10.

  • Window 1 covers minutes 1-5 and sees events at 1, 2, 3, 4, 5.
  • Window 2 (starting at minute 2) covers 2-6 and sees events at 2, 3, 4, 5, 6.
  • Window 6 (starting at minute 6) covers 6-10 and sees events at 6, 7, 8, 9, 10.

Hopping windows

Figure 2: Hopping windows. Original image (804 × 758 pixels).

Notice how each window overlaps with the next, sharing four minutes of data with it.

Code example

Here’s how we might define a hopping window in Kafka Streams for the above scenario:

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;

import java.time.Duration;

public class HoppingWindowExample {
    public static void main(String[] args) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> pageViews = builder.stream("page-views");

        TimeWindows hoppingWindows = TimeWindows.of(Duration.ofMinutes(5)) // Window size
                                                 .advanceBy(Duration.ofMinutes(1)); // Advance interval

        KTable<Windowed<String>, Long> pageViewCounts = pageViews
            .groupBy((key, value) -> value) // Group by page view event value (or some logic)
            .windowedBy(hoppingWindows)
            .count(Materialized.as("page-view-counts-store")); // Store name for the state store

        // Further processing...
    }
}

In this example, pageViews is a stream of page view events, possibly with a user ID as the key and some page identifier as the value. The events are grouped by page (or some other logic) and then counted within each window defined by hoppingWindows. The count operation will create a new KTable holding the counts of page views for each 5-minute window, updated every minute.

Hopping windows are particularly useful for analyses where we want more frequent updates but also need to consider events over a longer period. They provide a balance between real-time and batch processing, making them suitable for a wide range of time-sensitive aggregation tasks.


Session windows

Dynamically sized windows based on periods of activity. Session windows do not overlap. Its size is not based on time but on behavior instead. It captures periods of activity separated by gaps of inactivity, making them particularly suited for scenarios where activity occurs in bursts. In other words, session windows are designed to group together events that are closely related in time, while allowing for gaps of inactivity.

Session windows define a period of inactivity, and they expand in size as new events occur within that period. Unlike tumbling or hopping windows, which are based on fixed time intervals, session windows dynamically adjust their size based on the presence or absence of events. This capability allows for more flexible and meaningful aggregation of events that naturally occur in sessions or bursts1.

Dynamic Duration

The size of a session window is determined by the timestamps of the events it contains. Imagine we start watching events (like messages or transactions) coming in one after another. A session window keeps track of these events, starting with the first one we see. Now, as long as events keep coming without taking too long of a break between them, they’re all considered part of the same session. But if there’s a pause — meaning no events come in for a while longer than a specific time we’ve decided on in advance — then we say the current session window is over.

Once a new event comes in after that break, it starts a new session window. This way, each session window groups together a series of events that are closely related in time, with breaks indicating the end of one group and the start of another.

Inactivity gap

This is a configurable duration that defines how much time must pass without any events before a new session window is started, i.e., before new messages start arraying. If a subsequent event arrives within the inactivity gap, it is included in the current session window, potentially extending its duration.

Key and no overlap for a given key

The key is a crucial concept in session windows. It’s what Kafka Streams uses to group events into sessions. Each event in a Kafka stream has a key-value pair, and the key is used to identify which events are related. When processing streams, Kafka Streams uses these keys to organize data into windows.

For session windows, events with the same key are considered part of the same user or entity’s activity, allowing the system to create windows that accurately reflect periods of activity for each unique entity represented by the key.

Each key in our data represents a unique identifier for a group of related events. For example, if we’re processing user activity events, the key might be the User ID. Kafka Streams ensures that for any given key, session windows do not overlap. This means that each session window for a specific key is distinct and separated by a period of inactivity that exceeds the defined gap. There won’t be two session windows for the same key that cover overlapping time periods.

Vary in length

Since session windows are defined by periods of activity (events coming in) and inactivity (no events for a specified gap), they can vary in length. The duration of a session window is determined by how long the activity lasts before a period of inactivity that’s long enough to end the session. This means session windows can be short for brief bursts of activity or long for extended periods of activity, and this length can differ significantly from one session to another, even for the same key.

Example

Consider an online gaming platform where players log in, perform various activities (play games, chat, etc.), and then log out. We want to analyze the duration and frequency of individual gaming sessions.

Let’s say we have the following events (login actions) for two players over time:

  • Player A’s events: 10:00, 10:05, and 10:20.
  • Player B’s events: 10:30 and 11:15.
  • Inactivity gap: 10 minutes.

Session window analysis

Player A

  • 10:00 and 10:05: These two events are 5 minutes apart, which is less than the 10-minute inactivity gap. Therefore, they belong to the same session window.
  • 10:05 and 10:20: The gap between these two events is 15 minutes, which is more than the 10-minute inactivity threshold. Therefore, the event at 10:20 starts a new session window.

As a result, Player A has two session windows:

  • One from 10:00 to 10:05
  • Another starting at 10:20 and ending at the last event time (assuming no more events occur, this window ends at 10:20 itself).

Player B

  • 10:30 and 11:15: These two events are 45 minutes apart, which is significantly more than the 10-minute inactivity gap. Each event here would constitute its own session window because the gap between any two events is larger than the defined threshold.

As a result, Player B has two separate session windows:

  • One starting at 10:30 and ending at 10:30 (assuming no more events occur close to this time).
  • Another starting at 11:15 and ending at 11:15 (assuming no more events occur close to this time).

Session windows

Figure 3: Session windows. Original image (3182 × 486 pixels).

Each session window starts with the first event and ends with the last event in that session, considering the inactivity gap.

Code example

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;

import java.time.Duration;

public class SessionWindowExample {
    public static void main(String[] args) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> playerActions = builder.stream("player-actions");

        Duration inactivityGap = Duration.ofMinutes(10); // Define inactivity gap for session windows

        KTable<Windowed<String>, Long> sessionCounts = playerActions
            .groupByKey() // Assuming the key is the player ID
            .windowedBy(SessionWindows.with(inactivityGap)) // Apply session windowing
            .count(Materialized.as("session-counts-store")); // Count events in each session

        // Further processing or output the results to a topic
    }
}
  • playerActions is a KStream representing player actions, keyed by player ID.
  • groupByKey groups the actions by player ID, preparing them for windowed aggregation.
  • windowedBy(SessionWindows.with(inactivityGap)) applies session windowing based on the specified inactivity gap. In our case, we set the inactivity gap to 10 minutes.
  • The count method aggregates the number of actions within each session window for each player.

Session windows are particularly useful for analyzing user behavior, session-based analytics, and any scenario where activity patterns are bursty or discontinuous. By dynamically adjusting window sizes based on actual activity, session windows offer a detailed view of event patterns compared to fixed-size windows.


Sliding windows

Sliding windows are defined by a fixed duration, and they are designed to allow overlapping windows that slide over the data stream for continuous updates. Like the session window, they can continue to grow in size because they are based on behavior. In other words, they allow for the aggregation of events within a specific time frame that “slides” over the data stream as time progresses.

Having said that, they are defined by two parameters:

  • Window’s length: The duration of the window for which the data is aggregated.
  • Slide interval or Advance Interval or Slide: How often a new window starts.

To better understand, imagine a river flowing by us, and we want to examine the water quality, but instead of checking the entire river at once, we decide to look at just a portion of the water that flows by us within a specific time frame. Let’s say we choose to examine the water every 5 minutes, but instead of starting a new check every 5 minutes on the dot, we start a new check every minute, each time including the water from the last 5 minutes.

In the context of Kafka Streams and sliding windows, the “river” is our stream of data (events), and the “water quality check” is the aggregation or analysis we want to perform on that data. A sliding window is like the moving 5-minute period we use to check the water quality. As time moves forward, this 5-minute window “slides” along with the flow of the river (or the flow of our data). This means we’re constantly updating our analysis or aggregation based on the most recent 5 minutes of data, starting a new analysis every minute (or whatever our chosen interval is).

In other words, we’re continuously summarizing or analyzing the most recent chunk of data, and this chunk moves forward over time, constantly capturing a fresh set of data based on the window’s duration and the sliding interval we’ve defined. This approach allows us to get a continuous, up-to-date view of what’s happening in our data stream, making it very useful for real-time monitoring and analysis.

Example

Let’s take the following case:

  • Window size: 30 minutes
  • Slide interval: 10 minutes

This means every 10 minutes, a new window starts, but each window covers data from the past 30 minutes. Therefore, windows will overlap each other.

Imagine our click data stream over a timeline:

Sliding windows

Figure 4: Sliding windows. Original image (1961 × 436 pixels).

Clicks occur at minutes 5, 15, 25, 35, 45, 55, 65, 75, 85.

  • Window 1 (0-30): Captures clicks at 5, 15, 25.
  • Window 2 (10-40) - Slides 10 minutes from Window 1: Captures clicks at 15, 25, 35.
  • Window 2 (20-50) - Slides 10 minutes from Window 2: Captures clicks at 25, 35, 45.

And so on.

To visualize this:

Initial State (First Window)
[5, 15, 25]              <- Window 1 (0-30)
      [15, 25, 35]       <- Window 2 (10-40) (Starts after 10 minutes)
            [25, 35, 45] <- Window 3 (20-50) (Starts after another 10 minutes)

Each bracket […] represents a window capturing clicks within that 30-minute period, sliding every 10 minutes, showing the overlap where consecutive windows include some of the same data points (clicks).

Code example

Here’s how we might set up a sliding window in Kafka Streams for calculating average temperatures:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import java.time.Duration;
import java.util.Properties;

public class ClickCountSlidingWindow {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "click-count-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> clicks = builder.stream("clicks-topic");

        // Define the sliding window
        TimeWindows slidingWindow = TimeWindows.of(Duration.ofMinutes(30)).advanceBy(Duration.ofMinutes(10));

        // Count clicks in the sliding window
        KTable<Windowed<String>, Long> clickCounts = clicks
                .groupByKey()
                .windowedBy(slidingWindow)
                .count(Materialized.as("clicks-count-store"));
        
        // Further processing or output the results to a topic
    }
}
  • TimeWindows: Defines the sliding window with a size of 30 minutes and an advance interval of 10 minutes.
  • groupByKey(): Groups the click events by their key. In this case, the specific key doesn’t matter since we’re counting all clicks, but it’s necessary for windowed aggregation.
  • windowedBy(): Applies the sliding window definition to the grouped stream.
  • count(): Counts the number of events (clicks) within each window.

Sliding windows are ideal for continuous calculations that need to be updated with each new event, providing a more granular and immediate view of data trends over time.


Summary

Hopefully, this clarifies the concepts of windowing in Kafka Streams. Below is a quick comparison of all the window types that we have learned about:

Feature/Type Tumbling Window Hopping Window Session Window Sliding Window
Definition A type of window that has a fixed size and does not overlap. Each record belongs to exactly one window. Similar to tumbling windows but can overlap. It is defined by two parameters: the window’s size and its advance interval (or hop). Dynamic windows that group events by activity sessions. Windows are defined by periods of activity and inactivity. A type of window that allows for overlapping windows where each window is defined by a fixed duration and can slide over the data stream.
Window Size Fixed Fixed Dynamic, determined by inactivity gap Fixed
Overlap No Yes, if the advance interval is smaller than the window size No, but windows are not fixed and can vary in length Yes, windows can overlap due to the sliding nature
Use Cases Aggregations over a uniform time interval, such as hourly metrics. Aggregations where more frequent updates are needed within a fixed period, allowing overlap. Aggregations where the activity is bursty or irregular, such as user sessions in web analytics. Fine-grained aggregations where updates are needed for every event or at short intervals, capturing trends over time.
Key Characteristics Simple and non-overlapping, making it suitable for clear-cut time intervals. Provides flexibility with overlapping windows, useful for more nuanced time-based aggregations. Ideal for handling irregular or event-driven data, adapting to the data’s inherent patterns. Offers the most granular control over windowing, allowing for continuous updates within overlapping periods.

Table 1: Comparison of all windows supported by Kafka Streams.

Happy learning!


Frequently asked questions (FAQs)

What does the “grace period” refer to in Kafka Streams?

The “grace period” refers to a configurable time window that allows for late-arriving records to be included in windowed aggregations. In other words, the grace period specifies how much time after a window has technically expired (based on its defined duration) the system will continue to accept records for that window. This feature is particularly useful for dealing with out-of-order data, which is common in distributed systems due to network delays or other factors. By setting a grace period, you can accommodate these late-arriving records to ensure they are processed and included in the correct aggregation window, rather than being discarded or processed as part of a later window.

For example, if we have a 1-minute tumbling window with a grace period of 30 seconds, records that arrive up to 30 seconds after the end of each 1-minute window can still be included in the corresponding windowed computation. After the grace period has elapsed, the window is closed, and no further records for that window are processed.

The grace period is set via the Materialized or Windows configuration when defining windowed operations in Kafka Streams. It’s a crucial setting for ensuring data completeness and accuracy in real-time streaming applications, especially when handling data that may not arrive in perfect chronological order.

// Tumbling window
countStream.groupByKey()
	.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1)
		,Duration.ofSeconds(30)))  // Using 30 seconds for the grace period
	.count(Materialized.as("Tumbling-window-counting-store"))

// Session window
countStream.groupByKey()
	.windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMinutes(1),
		Duration.ofSeconds(30)))  // Using 30 seconds for the grace period
	.count(Materialized.as("Session-window-counting-store"))

  1. Bursts: In the context of Kafka Streams, the term “bursts” typically refers to sudden increases in the volume of data messages being processed by the stream. These bursts can occur due to various reasons, such as a rapid flood of events from producers, periodic increases in activity (e.g., during specific hours of the day or days of the week), or backlog processing after downtime or maintenance periods. 

Comments

comments powered by Disqus