I just upgraded to spring-boot 2.1.3.RELEASE and I cannot have more than one StreamsBuilderFactoryBean because of this new class/ method (kafkaStreamsFactoryBeanConfigurer
requires exactly one factoryBean
):
@Configuration
@ConditionalOnClass(StreamsBuilder.class)
@ConditionalOnBean(name =
KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
class KafkaStreamsAnnotationDrivenConfiguration {
//...
@Bean
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
StreamsBuilderFactoryBean factoryBean) {
return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
}
}
I am getting this error:
Parameter 0 of method kafkaStreamsFactoryBeanConfigurer in org.springframework.boot.autoconfigure.kafka.KafkaStreamsAnnotationDrivenConfiguration required a single bean, but 2 were found:
- &defaultKafkaStreamsBuilder: defined by method 'defaultKafkaStreamsBuilder' in class path resource [com/elsevier/q2c/transaction/snapshot/builder/config/KafkaStreamsConfig.class]
- &snapshotKafkaStreamsBuilder: defined by method 'snapshotKafkaStreamsBuilder' in class path resource [com/elsevier/q2c/transaction/snapshot/builder/config/KafkaStreamsConfig.class]
I would expect to overcome this issue by marking one of the two StreamsBuilderFactoryBean(s) as @Primary
(as proposed here). But even if I do:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME)
@Primary
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(
@Qualifier(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) KafkaStreamsConfiguration streamsConfiguration) {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfiguration);
return streamsBuilderFactoryBean;
}
@Bean(name = SNAPSHOT_STREAMS_BUILDER_BEAN_NAME)
public StreamsBuilderFactoryBean snapshotKafkaStreamsBuilder(
@Qualifier(SNAPSHOT_STREAMS_CONFIG_BEAN_NAME) KafkaStreamsConfiguration streamsConfiguration) {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(streamsConfiguration);
return streamsBuilderFactoryBean;
}
Still I am getting exactly the same error.
I am thinking that maybe the @Primary
makes the constructed beans a Primary. Any help more than appreciated!
EDIT: I circumvented the issue, by removing @EnableKafkaStreams
and DEFAULT_STREAMS_BUILDER_BEAN_NAME
. As a result, KafkaStreamsFactoryBeanConfigurer
does not kick in.
I am not sure why
@Primary
didn't work. I opened a GitHub issue.In my case I circumventent the issue by removing @EnableKafkaStreams (i.e. no stream beans are created automatically) and I create manually all needed ones. Looks even better if you have more than one streams as naming can be more destriptive as well.