I have an application that needs to listen to multiple different topics; each topic has separate logic for how the messages are handled. I had thought to use the same kafka properties for each KafkaStreams instance, but I get an error like the one below.
Error
java.lang.IllegalArgumentException: Assigned partition my-topic-1 for non-subscribed topic regex pattern; subscription pattern is my-other-topic
Code (kotlin)
class KafkaSetup() {
companion object {
private val LOG = LoggerFactory.getLogger(this::class.java)
}
fun getProperties(): Properties {
val properties = Properties()
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app")
return properties
}
private fun listenOnMyTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-topic")
kStream.foreach { key, value -> LOG.info("do stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
private fun listenOnMyOtherTopic() {
val kStreamBuilder = KStreamBuilder()
val kStream: KStream<String, String> = kStreamBuilder.stream("my-other-topic")
kStream.foreach { key, value -> LOG.info("do other stuff") }
val kafkaStreams = KafkaStreams(kStreamBuilder, getProperties())
kafkaStreams.start()
}
}
I found this reference that suggest that you can not use application.id
for multiple topics, however I am finding it hard to find reference documentation to support that. The documentation for application.id
states:
An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.
Questions
- What does this error mean, and what causes it.
- Given that you can have multiple instance of you app running with the same id to consume from multiple topic partitions, what does "Must be unique within the Kafka cluster" mean?
- Can you use the same Kafka streams
application.id
to start twoKafkaStreams
that are listing on different topics? and if so, how?
Details: kafka 0.11.0.2