I want to use kafka as input and logstash as output. I will feed several topics into logstash, and want to filter according to topics. I tried to write the code like that:
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["test", "payment"]
}
}
filter {
if [topic] = "test" {
//do something
} else {
//do something
}
}
But seems it doesn't work.
You should add decorate_events
to add kafka
field.
Option to add Kafka metadata like topic, message size to the event. This will add a field named kafka to the logstash event containing the following attributes: topic: The topic this message is associated with consumer_group: The consumer group used to read in this event partition: The partition this message is associated with offset: The offset from the partition this message is associated with key: A ByteBuffer containing the message key
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-decorate_events
and then update conf like this
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["test", "payment"]
}
}
filter {
if [kafka][topic] = "test" {
//do something
} else {
//do something
}
}
Change input part by adding decorate_events
to add kafka
field.
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["test", "payment"]
decorate_events => true
}
}
Change filter part as follow:
filter {
if [@metadata][kafka][topic] == "test" {
//do something
} else {
//do something
}
}