I'm trying to use Spring Cloud Stream to publish and consume Kafka messages. I've been working off of the documentation here on Accessing Bound Channels. I'm trying to use a custom name on the channel for my topic, so I have a @Qualifier when I'm trying to inject it, but spring can't find the relevant bean. It says "For each bound interface, Spring Cloud Stream will generate a bean that implements the interface", but the auto-wiring isn't working.
The error I'm getting is "Parameter 0 of constructor in com...MessagingManager required a bean of type 'org.springframework.messaging.MessageChannel' that could not be found."
I tried using @Autowired before the MessagingManager constructor like in the example, but then got a similar error in bean factory about there being 2 of them, so I took it out, and got the current error.
It's probably complicated by my trying to use a Processor.
Here are my components. I'm running it with spring boot and trying to test it with this :
@Component
public class StartupTester implements ApplicationListener<ContextRefreshedEvent> {
MessagingManager messagingManager;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
messagingManager.sendThingCreatedMessage(new ThingCreated("12345", "667788"));
}
}
@Component
public class MessagingManager {
private MessageChannel thingCreatedChannel;
public MessagingManager(@Qualifier(ThingChannelProcessor.THING_CREATED) MessageChannel output) {
thingCreatedChannel = output;
}
public void sendThingCreatedMessage(ThingCreated thingCreated) {
thingCreatedChannel.send(MessageBuilder.withPayload(thingCreated).build());
}
}
@Component
public interface ThingsChannelProcessor extends Processor {
String THING_REQUEST = "thing-request";
String THING_CREATED = "thing-created";
@Input(THING_REQUEST )
SubscribableChannel thingsRequest();
@Output(THING_CREATED )
MessageChannel thingCreated();
}
And I also have @EnableBinding(ThingsMessagingManager.class) on my main class which is annotated with @SpringBootApplication.