Skip to main content

Pipeline-as-Code

This guide covers how to define TypeStream pipelines as .typestream.json files and manage them with the CLI -- similar to how you'd use Terraform or Kubernetes manifests.

Prerequisites

  • TypeStream installed and running
  • Demo data generators running (started automatically with typestream local dev)

File format

A pipeline file is a JSON document with four fields:

{
"name": "my-pipeline",
"version": "1",
"description": "What this pipeline does",
"graph": {
"nodes": [ ... ],
"edges": [ ... ]
}
}
FieldRequiredDescription
nameyesUnique identifier for the pipeline
versionyesVersion string (for tracking changes)
descriptionnoHuman-readable description
graphyesA UserPipelineGraph containing nodes and edges

The graph field defines the pipeline as a directed acyclic graph. Each node has an id and exactly one node type field (e.g. kafkaSource, filter, kafkaSink). Edges connect nodes by their IDs.

See the Node Reference for all available node types and their configuration fields.

Validate

Check a pipeline file for errors without applying it:

typestream validate my-pipeline.typestream.json

This verifies:

  • JSON syntax is valid
  • All referenced topics exist
  • Node configurations are correct
  • Schema propagation succeeds

Plan

Preview what changes apply would make -- like terraform plan:

typestream plan my-pipeline.typestream.json

Output shows color-coded actions:

ActionMeaning
CREATE (green)New pipeline, will be created
UPDATE (yellow)Pipeline exists, definition changed
UNCHANGED (dim)Pipeline exists, no changes
DELETE (red)Pipeline exists on server but not in files

Plan a directory

Scan a directory for all *.typestream.json files and show a combined diff:

typestream plan ./pipelines/

This is especially useful for CI/CD: you can see exactly what would change before applying.

Apply

Create or update a pipeline:

typestream apply my-pipeline.typestream.json

The server compiles the graph, starts a Kafka Streams job, and persists the pipeline definition to a compacted Kafka topic (__typestream_pipelines). If the server restarts, all pipelines are automatically recovered.

List and delete

List all managed pipelines:

typestream pipelines list

Delete a pipeline by name:

typestream pipelines delete my-pipeline

Example: filter pipeline

{
"name": "webvisits-ok",
"version": "1",
"description": "Filter web visits to successful requests",
"graph": {
"nodes": [
{
"id": "source-1",
"kafkaSource": {
"topicPath": "/dev/kafka/local/topics/web_visits",
"encoding": "AVRO"
}
},
{
"id": "filter-1",
"filter": {
"expression": ".status_code == 200"
}
},
{
"id": "sink-1",
"kafkaSink": {
"topicName": "web_visits_ok"
}
}
],
"edges": [
{ "fromId": "source-1", "toId": "filter-1" },
{ "fromId": "filter-1", "toId": "sink-1" }
]
}
}

Example: aggregation pipeline

{
"name": "visits-by-status",
"version": "1",
"description": "Count web visits grouped by status code",
"graph": {
"nodes": [
{
"id": "source-1",
"kafkaSource": {
"topicPath": "/dev/kafka/local/topics/web_visits",
"encoding": "AVRO"
}
},
{
"id": "mv-1",
"materializedView": {
"groupByField": "status_code",
"aggregationType": "count"
}
}
],
"edges": [
{ "fromId": "source-1", "toId": "mv-1" }
]
}
}

See also