How to write Logstash filter to filter kafka topic

2019-06-12 08:10发布

问题:

I want to use kafka as input and logstash as output. I will feed several topics into logstash, and want to filter according to topics. I tried to write the code like that:

input {
    kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["test", "payment"]
      }
}

filter {
    if [topic] = "test" {
       //do something
    } else {
       //do something
    }
}

But seems it doesn't work.

回答1:

You should add decorate_events to add kafka field.

Option to add Kafka metadata like topic, message size to the event. This will add a field named kafka to the logstash event containing the following attributes: topic: The topic this message is associated with consumer_group: The consumer group used to read in this event partition: The partition this message is associated with offset: The offset from the partition this message is associated with key: A ByteBuffer containing the message key

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-decorate_events

and then update conf like this

input {
  kafka {
    bootstrap_servers => "localhost:9092"
    topics => ["test", "payment"]
  }
}

filter {
  if [kafka][topic] = "test" {
   //do something
  } else {
   //do something
  }
}


回答2:

Change input part by adding decorate_events to add kafka field.

input {
    kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["test", "payment"]
        decorate_events => true
    }
}

Change filter part as follow:

filter {
    if [@metadata][kafka][topic] == "test" {
        //do something
    } else {
        //do something
  }
}