Spring Cloud Stream dynamic channels

2020-07-13 11:57发布

问题:

I am using Spring Cloud Stream and want to programmatically create and bind channels. My use case is that during application startup I receive the dynamic list of Kafka topics to subscribe to. How can I then create a channel for each topic?

回答1:

I ran into similar scenario recently and below is my sample of creating SubscriberChannels dynamically.

    ConsumerProperties consumerProperties = new ConsumerProperties();
    consumerProperties.setMaxAttempts(1); 
    BindingProperties bindingProperties = new BindingProperties();
    bindingProperties.setConsumer(consumerProperties);
    bindingProperties.setDestination(retryTopic);
    bindingProperties.setGroup(consumerGroup);

    bindingServiceProperties.getBindings().put(consumerName, bindingProperties);
    SubscribableChannel channel = (SubscribableChannel)bindingTargetFactory.createInput(consumerName);
    beanFactory.registerSingleton(consumerName, channel);
    channel = (SubscribableChannel)beanFactory.initializeBean(channel, consumerName);
    bindingService.bindConsumer(channel, consumerName);
    channel.subscribe(consumerMessageHandler);


回答2:

I had to do something similar for the Camel Spring Cloud Stream component. Perhaps the Consumer code to bind a destination "really just a String indicating the channel name" would be useful to you?

In my case I only bind a single destination, however I don't imagine it being much different conceptually for multiple destinations.

Below is the gist of it:

    @Override
    protected void doStart() throws Exception {
        SubscribableChannel bindingTarget = createInputBindingTarget();
        bindingTarget.subscribe(message -> {
            // have your way with the received incoming message
        });

        endpoint.getBindingService().bindConsumer(bindingTarget,
                endpoint.getDestination());

       // at this point the binding is done
    }

    /**
     * Create a {@link SubscribableChannel} and register in the
     * {@link org.springframework.context.ApplicationContext}
     */
    private SubscribableChannel createInputBindingTarget() {
        SubscribableChannel channel = endpoint.getBindingTargetFactory()
                .createInputChannel(endpoint.getDestination());
        endpoint.getBeanFactory().registerSingleton(endpoint.getDestination(), channel);
        channel = (SubscribableChannel) endpoint.getBeanFactory().initializeBean(channel,
                endpoint.getDestination());
        return channel;
    }

See here for the full source for more context.



回答3:

For the incoming messages, you can explicitly use BinderAwareChannelResolver to dynamically resolve the destination. You can check this example where router sink uses binder aware channel resolver.



回答4:

I had a task where I did not know the topics in advance. I solved it by having one input channel which listens to all the topics I need.

https://docs.spring.io/spring-cloud-stream/docs/Brooklyn.RELEASE/reference/html/_configuration_options.html

Destination

The target destination of a channel on the bound middleware (e.g., the RabbitMQ exchange or Kafka topic). If the channel is bound as a consumer, it could be bound to multiple destinations and the destination names can be specified as comma-separated String values. If not set, the channel name is used instead.

So my configuration

spring:
  cloud:
    stream:
      default:
        consumer:
          concurrency: 2
          partitioned: true
      bindings:
        # inputs
        input:
          group: application_name_group
          destination: topic-1,topic-2
          content-type: application/json;charset=UTF-8

Then I defined one consumer which handles messages from all these topics.

@Component
@EnableBinding(Sink.class)
public class CommonConsumer {

    private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);

    @StreamListener(target = Sink.INPUT)
    public void consumeMessage(final Message<Object> message) {
        logger.info("Received a message: \nmessage:\n{}", message.getPayload());
        // Here I define logic which handles messages depending on message headers and topic.
        // In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
    }
}

Note, in your case it may not be a solution. I needed to forward messages to webhooks, so I could have configuration mapping.

I also thought about other ideas. 1) You kafka client consumer without Spring Cloud.

2) Create a predefined number of inputs, for example 50.

input-1
intput-2
...
intput-50

And then have a configuration for some of these inputs.

Related discussions

  • Spring cloud stream to support routing messages dynamically
  • https://github.com/spring-cloud/spring-cloud-stream/issues/690
  • https://github.com/spring-cloud/spring-cloud-stream/issues/1089

We use Spring Cloud 2.1.1 RELEASE