Geo-Enrich Events
This guide shows how to use the GeoIP node to add geographic information (country, city) to events that contain an IP address field.
TypeStream bundles a MaxMind GeoLite2 database -- no external API keys or services needed.
Prerequisites
- TypeStream installed and running
- A topic with records containing an IP address field (the demo
web_visitstopic works)
Enrich with GeoIP
The GeoIP node takes two parameters:
ipField-- the name of the field containing the IP addressoutputField-- the name of the new field to add with the geographic result
- CLI DSL
- Config-as-Code
- GUI
cat /dev/kafka/local/topics/web_visits | enrich { visit -> http "https://api.country.is/#{$visit.ip_address}" }
note
The CLI DSL uses the enrich operator with a block expression for HTTP-based enrichment. The built-in GeoIP node is available in config-as-code and the GUI.
{
"name": "webvisits-enriched",
"version": "1",
"description": "Enrich web visits with geolocation from IP address",
"graph": {
"nodes": [
{
"id": "source-1",
"kafkaSource": {
"topicPath": "/dev/kafka/local/topics/web_visits",
"encoding": "AVRO"
}
},
{
"id": "geoip-1",
"geoIp": {
"ipField": "ip_address",
"outputField": "country_code"
}
},
{
"id": "sink-1",
"kafkaSink": {
"topicName": "web_visits_enriched"
}
}
],
"edges": [
{ "fromId": "source-1", "toId": "geoip-1" },
{ "fromId": "geoip-1", "toId": "sink-1" }
]
}
}
Apply it:
typestream apply webvisits-enriched.typestream.json
- Drag a Kafka Source and select the
web_visitstopic - Drag a GeoIP node and connect it to the source
- Set
ipFieldtoip_addressandoutputFieldtocountry_code - Drag a Kafka Sink and set the output topic
- Click Create Job
The GeoIP node will show the added country_code field in the schema preview.
Schema behavior
The GeoIP node validates at compile time that the ipField exists in the input schema. It adds the outputField (type: string) to the output schema, so downstream nodes can reference it.
For example, you can chain a filter after enrichment:
{
"id": "filter-1",
"filter": {
"expression": ".country_code == \"US\""
}
}
See also
- Node Reference: GeoIp -- full configuration details
- Schema Propagation -- how TypeStream tracks schema changes through the pipeline