I have a logstash input setup as
input {
kafka {
bootstrap_servers => "zookeper_address"
topics => ["topic1","topic2"]
}
}
I need to feed the topics into two different indexes in elasticsearch. Can anyone help me with how the ouput should be setup for such a task. At this time I am only able to setup
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "my_index"
codec => "json"
document_id => "%{id}"
}
}
I need two indexes on the same elasticsearch instance say index1
and index2
which will be fed by messages coming in on topic1
and topic2
First, you need to add decorate_events
to your kafka
input in order to know from which topic the message is coming
input {
kafka {
bootstrap_servers => "zookeper_address"
topics => ["topic1","topic2"]
decorate_events => true
}
}
Then, you have two options, both involving conditional logic. The first is by introducing a filter for adding the correct index name depending on the topic name. For this you need to add
filter {
if [kafka][topic] == "topic1" {
mutate {
add_field => {"[@metadata][index]" => "index1"}
}
} else {
mutate {
add_field => {"[@metadata][index]" => "index2"}
}
}
# remove the field containing the decorations, unless you want them to land into ES
mutate {
remove_field => ["kafka"]
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[@metadata][index]}"
codec => "json"
document_id => "%{id}"
}
}
Then second option is to do the if/else directly in the output section, like this (but the additional kafka
field will land into ES):
output {
if [@metadata][kafka][topic] == "topic1" {
elasticsearch {
hosts => ["localhost:9200"]
index => "index1"
codec => "json"
document_id => "%{id}"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
index => "index2"
codec => "json"
document_id => "%{id}"
}
}
}