Filter and Route
This guide shows how to filter records from a Kafka topic and route matching results to a new topic.
Prerequisites
- TypeStream installed and running
- Demo data generators running (started automatically with
typestream local dev)
Filter by content
The simplest filter matches records containing a text string.
- CLI DSL
- Config-as-Code
- GUI
grep /dev/kafka/local/topics/web_visits "/products"
Bare words work too (case-insensitive):
grep /dev/kafka/local/topics/web_visits products
{
"name": "filter-products",
"version": "1",
"description": "Filter web visits to product pages",
"graph": {
"nodes": [
{
"id": "source-1",
"kafkaSource": {
"topicPath": "/dev/kafka/local/topics/web_visits",
"encoding": "AVRO"
}
},
{
"id": "filter-1",
"filter": {
"expression": ".url_path ~= \"/products\""
}
},
{
"id": "sink-1",
"kafkaSink": {
"topicName": "product_visits"
}
}
],
"edges": [
{ "fromId": "source-1", "toId": "filter-1" },
{ "fromId": "filter-1", "toId": "sink-1" }
]
}
}
- Drag a Kafka Source and select the
web_visitstopic - Drag a Filter node, connect it, and set the expression to
.url_path ~= "/products" - Drag a Kafka Sink and set the output topic
- Click Create Job
Filter by field
Use predicate expressions for field-based filtering:
grep /dev/kafka/local/topics/web_visits [.status_code > 399]
Combine conditions with && and ||:
grep /dev/kafka/local/topics/web_visits [ .status_code == 200 || .url_path ~= '/products' ]
Predicate operators
| Operator | Description |
|---|---|
== | Strict equality (same type required) |
!= | Strict inequality |
>, >=, <, <= | Numeric comparison |
~= | Contains (case-insensitive) |
Invert matching
Use -v to select records that do not match:
grep -v /dev/kafka/local/topics/web_visits "/health"
Filter by key
Use -k to match against the record key instead of the value:
grep -k /dev/kafka/local/topics/web_visits "some-key"
Route to an output topic
Use > to write filtered results to a new Kafka topic:
grep /dev/kafka/local/topics/web_visits [.status_code > 399] > /dev/kafka/local/topics/error_visits
The output topic is created automatically. Encoding follows the input: if the source is Avro, the output will also be Avro (since the schema is unchanged by filtering).
See also
- Node Reference: Filter -- full config fields and schema behavior
- Data Operators: grep -- DSL syntax reference