Multiple StreamListeners with Spring Cloud Stream

2019-08-20 16:46发布

In a Spring Boot app using Spring Cloud Stream connecting to Kafka, I'm trying to set up two separate stream listener methods:

  • One reads from topics "t1" and "t2" as KTables, re-partitioning using a different key in one, then joining to data from the other
  • The other reads from an unrelated topic, "t3", as a KStream.

Because the first listener does some joining and aggregating, some topics are created automatically, e.g. "test-1-KTABLE-AGGREGATE-STATE-STORE-0000000007-repartition-0". (Not sure if this is related to the problem or not.)

When I set up the code by having two separate methods annotated with @StreamListener, I get the error below when the Spring Boot app starts:

Exception in thread "test-d44cb424-7575-4f5f-b148-afad034c93f4-StreamThread-2" java.lang.IllegalArgumentException: Assigned partition t1-0 for non-subscribed topic regex pattern; subscription pattern is t3
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:195)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:225)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:848)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:805)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:771)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:741)

I think the important part is: "Assigned partition t1-0 for non-subscribed topic regex pattern; subscription pattern is t3". These are the two unrelated topics, so as far as I can see nothing related to t3 should be subscribing to anything related to t1. The exact topic which causes the problem also changes intermittently: sometimes it's one of the automatically generated topics which is mentioned, rather than t1 itself.

Here is how the two stream listeners are set up (in Kotlin):

@StreamListener
fun listenerForT1AndT2(
        @Input("t1") t1KTable: KTable<String, T1Obj>,
        @Input("t2") t2KTable: KTable<String, T2Obj>) {

    t2KTable
        .groupBy(...)
        .aggregate(
                { ... },
                { ... },
                { ... },
                Materialized.with(Serdes.String(), JsonSerde(SomeObj::class.java)))
        .join(t1KTable,
                { ... },
                Materialized.`as`<String, SomeObj, KeyValueStore<Bytes, ByteArray>>("test")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(JsonSerde(SomeObj::class.java)))
}

@StreamListener
fun listenerForT3(@Input("t3") t3KStream: KStream<String, T3Obj>) {
    events.map { ... }
}

However, when I set up my code by having just one method annotated with @StreamListener, and take parameters for all three topics, everything works fine, e.g.

@StreamListener
fun compositeListener(
        @Input("t1") t1KTable: KTable<String, T1Obj>,
        @Input("t2") t2KTable: KTable<String, T2Obj>,
        @Input("t3") t3KStream: KStream<String, T3Obj>) {
    ...
}

But I don't think it's right that I can only have one @StreamListener method.

I know that there is content-based routing for adding conditions to the StreamListener annotation, but if the methods define the input channels then I'm not sure if I need to be using this here - I'd have thought the use of the @Input annotations on the method parameters would be enough to tell the system which channels (and therefore which Kafka topics) to bind to? If I do need to use content-based routing, how can I apply it here to have each method receive only the items from the relevant topic(s)?

I've also tried separating out the two listener methods into two separate classes, each of which has @EnableBinding for only the interface it's interested in (i.e. one interface for t1 and t2, and a separate interface for t3), but that doesn't help.

Everything else I've found related to this error message, e.g. here, is about having multiple app instances, but in my case there's only one Spring Boot app instance.

1条回答
劫难
2楼-- · 2019-08-20 17:45

You need separate application id for each StreamListener methods. Here is an example:

spring.cloud.stream.kafka.streams.bindings.t1.consumer.application-id=processor1-application-id spring.cloud.stream.kafka.streams.bindings.t2.consumer.application-id=processor1-application-id spring.cloud.stream.kafka.streams.bindings.t3.consumer.application-id=processor2-application-id

You probably want to test with the latest snapshot (2.1.0) as there were some recent changes with the way application id is processed by the binder.

Please see the update here for more details. Here is a working sample of multiple StreamListener methods which are Kafka Streams processors.

查看更多
登录 后发表回答