Kafka Streams: use the same `application.id` to co

2019-04-24 13:12发布

问题:

I have an application that needs to listen to multiple different topics; each topic has separate logic for how the messages are handled. I had thought to use the same kafka properties for each KafkaStreams instance, but I get an error like the one below.

Error

java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic

Code (kotlin)

class KafkaSetup() {
    companion object {
        private val LOG = LoggerFactory.getLogger(this::class.java)
    }

    fun getProperties(): Properties {
        val properties = Properties()
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
        return properties
    }

    private fun listenOnMyTopic() {
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")

        kStream.foreach { key, value -> LOG.info("do stuff") }

        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    }

    private fun listenOnMyOtherTopic() {
        val kStreamBuilder = KStreamBuilder()
        val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")

        kStream.foreach { key, value -> LOG.info("do other stuff") }

        val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
        kafkaStreams.start()
    }
}

I found this reference that suggest that you can not use application.id for multiple topics, however I am finding it hard to find reference documentation to support that. The documentation for application.id states:

An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.

Questions

  1. What does this error mean, and what causes it.
  2. Given that you can have multiple instance of you app running with the same id to consume from multiple topic partitions, what does "Must be unique within the Kafka cluster" mean?
  3. Can you use the same Kafka streams application.id to start two KafkaStreams that are listing on different topics? and if so, how?

Details: kafka 0.11.0.2

回答1:

Kafka Streams scales via partitions, not topics. Thus, if you start multiple application with the same application.id they must be identical with regard to input topic they subscribe to and their processing logic. The application forms a consumer-group using the application.id as group.id and thus different partitions of the input topic(s) are assigned to different instances.

If you have different topic with the same logic, you can subscribe to all topic at once (in each instance you start). Scaling is still based on partitions though. (It's basically a "merge" of your input topics.)

If you want to scale via topics and/or have different processing logic, you must use different application.id for the different Kafka Streams applications.