Real-Time Aggregations
This guide shows how to build aggregation pipelines that count, group, and reduce streaming data into queryable materialized views.
Prerequisites
- TypeStream installed and running
- Demo data generators running (started automatically with
typestream local dev)
Count records by field
Group records by a field and count occurrences. The result is a KTable that updates in real time as new records arrive.
- CLI DSL
- Config-as-Code
- GUI
cat /dev/kafka/local/topics/web_visits | wc
{
"name": "visits-by-status",
"version": "1",
"description": "Count web visits grouped by status code",
"graph": {
"nodes": [
{
"id": "source-1",
"kafkaSource": {
"topicPath": "/dev/kafka/local/topics/web_visits",
"encoding": "AVRO"
}
},
{
"id": "mv-1",
"materializedView": {
"groupByField": "status_code",
"aggregationType": "count"
}
}
],
"edges": [
{ "fromId": "source-1", "toId": "mv-1" }
]
}
}
- Drag a Kafka Source and select the
web_visitstopic - Drag a Materialized View node and connect it
- Set the
groupByFieldtostatus_codeand the aggregation type tocount - Click Create Job
Windowed count
Count records within a tumbling time window (e.g., visits per status code per minute):
{
"id": "mv-1",
"materializedView": {
"groupByField": "status_code",
"aggregationType": "count",
"enableWindowing": true,
"windowSizeSeconds": 60
}
}
Replace the materializedView node in the pipeline above with this windowed variant. Each window produces a separate count that closes after the specified duration.
Keep latest value per key
Use aggregationType: "latest" to build a lookup table that always holds the most recent value for each key:
{
"id": "mv-1",
"materializedView": {
"groupByField": "status_code",
"aggregationType": "latest"
}
}
This is useful for maintaining a current-state view from a changelog stream (e.g., the latest order status per order ID).
Query materialized views
Once an aggregation pipeline is running, you can query its state store via the StateQueryService:
- List stores: See all queryable state stores from running jobs
- Get all values: Stream all key-value pairs from a store
- Get value: Look up a single value by key
The GUI's job detail page shows materialized view data automatically.
See also
- Node Reference: Count -- count node details
- Node Reference: WindowedCount -- windowed count details
- Node Reference: ReduceLatest -- reduce latest details
- Node Reference: Group -- group node details