Skip to main content

Real-Time Aggregations

This guide shows how to build aggregation pipelines that count, group, and reduce streaming data into queryable materialized views.

Prerequisites

  • TypeStream installed and running
  • Demo data generators running (started automatically with typestream local dev)

Count records by field

Group records by a field and count occurrences. The result is a KTable that updates in real time as new records arrive.

cat /dev/kafka/local/topics/web_visits | wc

Windowed count

Count records within a tumbling time window (e.g., visits per status code per minute):

{
"id": "mv-1",
"materializedView": {
"groupByField": "status_code",
"aggregationType": "count",
"enableWindowing": true,
"windowSizeSeconds": 60
}
}

Replace the materializedView node in the pipeline above with this windowed variant. Each window produces a separate count that closes after the specified duration.

Keep latest value per key

Use aggregationType: "latest" to build a lookup table that always holds the most recent value for each key:

{
"id": "mv-1",
"materializedView": {
"groupByField": "status_code",
"aggregationType": "latest"
}
}

This is useful for maintaining a current-state view from a changelog stream (e.g., the latest order status per order ID).

Query materialized views

Once an aggregation pipeline is running, you can query its state store via the StateQueryService:

  • List stores: See all queryable state stores from running jobs
  • Get all values: Stream all key-value pairs from a store
  • Get value: Look up a single value by key

The GUI's job detail page shows materialized view data automatically.

See also