Kafka only subscribe to latest message?

2019-03-04 04:50发布

问题:

Sometimes(seems very random) Kafka sends old messages. I only want the latest messages so it overwrite messages with the same key. Currently it looks like I have multiple messages with the same key it doesn't get compacted.

I use this setting in the topic:

cleanup.policy=compact

I'm using Java/Kotlin and Apache Kafka 1.1.1 client.

Properties(8).apply {
    val jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";"
    val jaasCfg = String.format(jaasTemplate, Configuration.kafkaUsername, Configuration.kafkaPassword)
    put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            BOOTSTRAP_SERVERS)
    put(ConsumerConfig.GROUP_ID_CONFIG,
            "ApiKafkaKotlinConsumer${Configuration.kafkaGroupId}")
    put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer::class.java.name)
    put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer::class.java.name)

    put("security.protocol", "SASL_SSL")
    put("sasl.mechanism", "SCRAM-SHA-256")
    put("sasl.jaas.config", jaasCfg)
    put("max.poll.records", 100)
    put("receive.buffer.bytes", 1000000)
}

Have I missed some settings?

回答1:

If you want have only one value for each key, you have to use KTable<K,V> abstraction: StreamsBuilder::table(final String topic) from Kafka Streams. Topic used here should have cleanup policy set to compact.

If you use KafkaConsumer you just pull data from brokers. It doesn't give you any mechanism that perform some kind of deduplication. Depending on if compaction was performed or not, you can get one to n messages for same key.

Regarding compaction

Compaction doesn't mean, that all old value for same key are removed immediately. When old message for same key will be removed, depends on several properties. The most important are:

  • log.cleaner.min.cleanable.ratio

The minimum ratio of dirty log to total log for a log to eligible for cleaning

  • log.cleaner.min.compaction.lag.ms

The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted.

  • log.cleaner.enable

Enable the log cleaner process to run on the server. Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size.

More detail about compaction you can find https://kafka.apache.org/documentation/#compaction