logstash 5.0.1: setup elasticsearch multiple index

2020-07-24 04:04发布

I have a logstash input setup as

input {
  kafka {
  bootstrap_servers => "zookeper_address"
  topics => ["topic1","topic2"]
  }
}

I need to feed the topics into two different indexes in elasticsearch. Can anyone help me with how the ouput should be setup for such a task. At this time I am only able to setup

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "my_index"
    codec => "json"
    document_id => "%{id}"
  }
}

I need two indexes on the same elasticsearch instance say index1 and index2 which will be fed by messages coming in on topic1 and topic2

1条回答
欢心
2楼-- · 2020-07-24 04:41

First, you need to add decorate_events to your kafka input in order to know from which topic the message is coming

input {
  kafka {
    bootstrap_servers => "zookeper_address"
    topics => ["topic1","topic2"]
    decorate_events => true
  }
}

Then, you have two options, both involving conditional logic. The first is by introducing a filter for adding the correct index name depending on the topic name. For this you need to add

filter {
   if [kafka][topic] == "topic1" {
      mutate {
         add_field => {"[@metadata][index]" => "index1"}
      }
   } else {
      mutate {
         add_field => {"[@metadata][index]" => "index2"}
      }
   }
   # remove the field containing the decorations, unless you want them to land into ES
   mutate {
      remove_field => ["kafka"]
   }
}
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "%{[@metadata][index]}"
    codec => "json"
    document_id => "%{id}"
  }
}

Then second option is to do the if/else directly in the output section, like this (but the additional kafka field will land into ES):

output {
   if [@metadata][kafka][topic] == "topic1" {
     elasticsearch {
       hosts => ["localhost:9200"]
       index => "index1"
       codec => "json"
       document_id => "%{id}"
     }
   } else {
     elasticsearch {
       hosts => ["localhost:9200"]
       index => "index2"
       codec => "json"
       document_id => "%{id}"
     }
   }
}
查看更多
登录 后发表回答