Data Operators
Data operators transform, filter, and process records flowing through a TypeStream pipeline. They are chained together using the pipe (|) operator.
Cat
Synopsis
cat <path>
Description
The cat operator reads records from a data stream and outputs them. It is typically the first operator in a pipeline.
cat /dev/kafka/local/topics/web_visits
Cut
Synopsis
cut <field1> [<field2> ...]
Description
The cut operator selects specific fields from each record, producing a new record with only the named fields.
cat /dev/kafka/local/topics/web_visits | cut .url_path .status_code
Each
Synopsis
each <block expression>
Description
The each data operator is used to execute a block expression for each record in a data stream. The block expression is required.
each must be the last operator in a pipeline.
Here's an example of using each to make a HTTP request for each record in a data stream:
cat /dev/kafka/local/topics/web_visits | each { visit -> http post https://example.com/visits "{\"path\": #{$visit.url_path}}" }
Echo
Synopsis
echo <expression>
Description
The echo data operator outputs a literal value or expression result. It can be used to output strings, variables, or field selections.
echo "hello world"
echo $v.title
Enrich
Synopsis
enrich <block expression>
Description
The enrich data operator is used to enrich data streams. The block expression is required. enrich cannot be the first operator in a pipeline.
enrich evaluates the block expression for each record in the data stream and will merge the result with the original record, therefore enriching it.
cat /dev/kafka/local/topics/web_visits | enrich { visit -> http "https://api.country.is/#{$visit.ip_address}" }
Grep
Synopsis
grep [-kv] [<pattern>|[<predicate>]] [<path>]
Description
The grep data operator is used to filter data streams. The default string pattern usage filters by "content" (i.e. the whole record) matching each record against the pattern.
For more complex filter operations, grep supports predicate expressions. See the predicate expressions section for more details.
The following options are supported:
-k--by-key- filter by key-v--invert-match- invert the sense of matching, to select non-matching records
Predicate expressions
While in the rest of this document the characters [ and ] indicate
optional parts of a command, here they are part of the syntax and must be
included in the expression.
[ .field <operator> <value> ]
Expressions can be combined with && and || operators and grouped with ().
Here's the list of supported operators:
==Strict equality operator. The value must be of the same type as the field.!=Strict inequality operator. The value must be of the same type as the field.>,>=,<,<=Numeric comparison operators. The value must be a number.~="Contains" operator. It matches if the field contains the value. Ignores case.
See the filter and route how-to guide for examples.
Join
Synopsis
join <path>
Description
The join operator joins two data streams by key. The first stream comes from the pipe, and the second stream is specified as the argument. Records with matching keys from both streams are merged into a single output record containing fields from both sides.
cat /dev/kafka/local/topics/orders | join /dev/kafka/local/topics/users > /dev/kafka/local/topics/orders_enriched
The output encoding defaults to JSON since the schema differs from either input.
Wc
Synopsis
wc [-b <field>]
Description
The wc (word count) operator counts records per key. It is typically used as the last operator in a pipeline to produce aggregation counts.
The following options are supported:
-b--by- counts by the specified field instead of the record key
cat /dev/kafka/local/topics/web_visits | wc
cat /dev/kafka/local/topics/web_visits | wc -b .status_code