Pipeline File Format
TypeStream pipelines-as-code use .typestream.json files. This page documents the complete file format.
Top-level structure
{
"name": "pipeline-name",
"version": "1",
"description": "Optional description",
"graph": {
"nodes": [ ... ],
"edges": [ ... ]
}
}
| Field | Required | Type | Description |
|---|---|---|---|
name | yes | string | Unique pipeline identifier |
version | yes | string | Version string for tracking changes |
description | no | string | Human-readable description |
graph | yes | object | Pipeline graph definition |
Graph
The graph contains nodes and edges forming a directed acyclic graph.
Edges
{ "fromId": "source-1", "toId": "filter-1" }
Each edge connects two nodes by their id.
Nodes
Each node has an id (string) and exactly one node type field. See the Node Reference for full details on each node type.
KafkaSource
{
"id": "source-1",
"kafkaSource": {
"topicPath": "/dev/kafka/local/topics/my_topic",
"encoding": "AVRO",
"unwrapCdc": false
}
}
topicPath: Virtual filesystem path to the Kafka topic (e.g./dev/kafka/local/topics/my_topic)encoding:"AVRO"|"JSON"unwrapCdc:trueto extract theafterpayload from Debezium CDC envelopes
PostgresSource
{
"id": "source-1",
"postgresSource": {
"topicPath": "/dev/kafka/local/topics/dbserver.public.orders"
}
}
topicPath: Virtual filesystem path to the Debezium CDC topic (CDC unwrapping is enabled automatically)
Filter
{
"id": "filter-1",
"filter": {
"expression": ".field == \"value\""
}
}
expression: A predicate expression (e.g..status_code == 200,.title ~= "Station")
GeoIp
{
"id": "geoip-1",
"geoIp": {
"ipField": "ip_address",
"outputField": "country_code"
}
}
TextExtractor
{
"id": "text-1",
"textExtractor": {
"filePathField": "file_url",
"outputField": "extracted_text"
}
}
EmbeddingGenerator
{
"id": "embed-1",
"embeddingGenerator": {
"textField": "title",
"outputField": "embedding",
"model": "text-embedding-3-small"
}
}
OpenAiTransformer
{
"id": "ai-1",
"openAiTransformer": {
"prompt": "Summarize: ${title}",
"outputField": "summary",
"model": "gpt-4o-mini"
}
}
MaterializedView
{
"id": "mv-1",
"materializedView": {
"groupByField": "status_code",
"aggregationType": "count",
"enableWindowing": false,
"windowSizeSeconds": 0
}
}
groupByField: Field to group byaggregationType:"count"for counting, or"latest"for keeping the latest value per keyenableWindowing:trueto use tumbling time windows (only with"count"aggregation)windowSizeSeconds: Window size in seconds (whenenableWindowingistrue)
KafkaSink
{
"id": "sink-1",
"kafkaSink": {
"topicName": "output_topic"
}
}
topicName: Name of the output Kafka topic
ElasticsearchSink
{
"id": "sink-1",
"elasticsearchSink": {
"connectionId": "my-elasticsearch",
"indexName": "documents",
"documentIdStrategy": "RECORD_KEY",
"writeMethod": "UPSERT",
"behaviorOnNullValues": "IGNORE"
}
}
WeaviateSink
{
"id": "sink-1",
"weaviateSink": {
"connectionId": "my-weaviate",
"collectionName": "documents",
"documentIdStrategy": "FieldIdStrategy",
"documentIdField": "doc_id",
"vectorStrategy": "FieldVectorStrategy",
"vectorField": "embedding",
"timestampField": "created_at"
}
}
DbSink
{
"id": "sink-1",
"dbSink": {
"connectionId": "my-postgres",
"tableName": "events",
"insertMode": "upsert",
"primaryKeyFields": "event_id"
}
}
Inspector
{
"id": "inspector-1",
"inspector": {
"label": "debug tap"
}
}
Complete example
{
"name": "webvisits-enriched",
"version": "1",
"description": "Enrich web visits with geolocation from IP address",
"graph": {
"nodes": [
{
"id": "source-1",
"kafkaSource": {
"topicPath": "/dev/kafka/local/topics/web_visits",
"encoding": "AVRO"
}
},
{
"id": "geoip-1",
"geoIp": {
"ipField": "ip_address",
"outputField": "country_code"
}
},
{
"id": "filter-1",
"filter": {
"expression": ".country_code == \"US\""
}
},
{
"id": "sink-1",
"kafkaSink": {
"topicName": "us_visits_enriched"
}
}
],
"edges": [
{ "fromId": "source-1", "toId": "geoip-1" },
{ "fromId": "geoip-1", "toId": "filter-1" },
{ "fromId": "filter-1", "toId": "sink-1" }
]
}
}