Sink to Elasticsearch
This guide shows how to create a pipeline that writes streaming data to an Elasticsearch index for full-text search and analytics.
Prerequisites
- TypeStream installed and running
- An Elasticsearch instance accessible from the TypeStream server
Register an Elasticsearch connection
Before creating the pipeline, register your Elasticsearch instance. In the GUI, navigate to Connections > Elasticsearch and add:
- URL: Your Elasticsearch endpoint (e.g.
http://elasticsearch:9200) - Credentials: Username and password (if authentication is enabled)
The server monitors connection health and the connection appears as a sink option in the graph builder.
Build the pipeline
- GUI
- Config-as-Code
- Drag a Kafka Source and select your topic
- Optionally add transform or enrichment nodes
- Drag an Elasticsearch Sink from the palette (appears under Database Sinks after registering a connection)
- Set the index name
- Configure the document ID strategy and write method
- Click Create Job
{
"name": "web-visits-to-elasticsearch",
"version": "1",
"description": "Index web visits into Elasticsearch",
"graph": {
"nodes": [
{
"id": "source-1",
"kafkaSource": {
"topicPath": "/dev/kafka/local/topics/web_visits",
"encoding": "AVRO"
}
},
{
"id": "sink-1",
"elasticsearchSink": {
"connectionId": "my-elasticsearch",
"indexName": "web_visits",
"documentIdStrategy": "RECORD_KEY",
"writeMethod": "UPSERT",
"behaviorOnNullValues": "IGNORE"
}
}
],
"edges": [
{ "fromId": "source-1", "toId": "sink-1" }
]
}
}
Under the hood, the Elasticsearch sink writes to an intermediate Kafka topic, and a Kafka Connect sink connector forwards records to Elasticsearch. You must register the Elasticsearch connection via the GUI or the ConnectionService gRPC API before applying the pipeline.
Elasticsearch sink configuration
| Field | Description |
|---|---|
index_name | Elasticsearch index to write to |
document_id_strategy | How to derive the document _id from records |
write_method | Write behavior: INSERT or UPSERT |
behavior_on_null_values | How to handle null field values |
How it works
Under the hood, TypeStream creates a Kafka Connect Elasticsearch sink connector. The pipeline writes processed records to an intermediate Kafka topic, and the connector forwards them to Elasticsearch. Credentials are resolved server-side from the registered connection -- they never appear in pipeline definitions.
See also
- Node Reference: Sink -- sink node configuration
- Geo-Enrich Events -- enrich data before indexing