Skip to main content

Filter and Route

This guide shows how to filter records from a Kafka topic and route matching results to a new topic.

Prerequisites

  • TypeStream installed and running
  • Demo data generators running (started automatically with typestream local dev)

Filter by content

The simplest filter matches records containing a text string.

grep /dev/kafka/local/topics/web_visits "/products"

Bare words work too (case-insensitive):

grep /dev/kafka/local/topics/web_visits products

Filter by field

Use predicate expressions for field-based filtering:

grep /dev/kafka/local/topics/web_visits [.status_code > 399]

Combine conditions with && and ||:

grep /dev/kafka/local/topics/web_visits [ .status_code == 200 || .url_path ~= '/products' ]

Predicate operators

OperatorDescription
==Strict equality (same type required)
!=Strict inequality
>, >=, <, <=Numeric comparison
~=Contains (case-insensitive)

Invert matching

Use -v to select records that do not match:

grep -v /dev/kafka/local/topics/web_visits "/health"

Filter by key

Use -k to match against the record key instead of the value:

grep -k /dev/kafka/local/topics/web_visits "some-key"

Route to an output topic

Use > to write filtered results to a new Kafka topic:

grep /dev/kafka/local/topics/web_visits [.status_code > 399] > /dev/kafka/local/topics/error_visits

The output topic is created automatically. Encoding follows the input: if the source is Avro, the output will also be Avro (since the schema is unchanged by filtering).

See also