JMSListener - dynamic selector

2019-08-03 09:06发布

问题:

I currently have a JMSListener as shown below. It uses a selector of a value in a properties file. This works fine.

 @JmsListener(destination = "myQueueDest",
     selector = MyHeaders.SELECTOR_KEY + " = '${myapp.selector_val}'")
 private void consumeData(MyCustomObj mycustomObj) { }

I have a need now to use a dynamic selector with a value in memory, rather than the spring property. Is there a way to use JMSListener (or some other listener mechnaism) to do a selection off the ActiveMQ queue?

Update:

It may be possible to assign an ID to my @JMSListener, and then retrieve it from my JmsListenerEndpointRegistry bean. Get the listener container by ID, cast it to DefaultMessageListenerContainer, and call setMessageSelector(), although I'm not entirely sure if this will work yet.

This requires setting my DefaultJmsListenerContainerFactory bean to have the cache level of CACHE_SESSION.

But this doesn't seem to work, as the listener picks up all messages, regardless of what I set the message selector to be.

回答1:

JMS specification says the selection string must be provided while creating a consumer. So the answer is NO. Consumer must be closed and recreated with a different selection string to receive messages that match a different selection criteria.

If using JMS API is not a must for your project, then you could explore using Active MQ's native APIs. I am sure the API will have a way to specify a different selection string every time a receive is called. IBM MQ's native API provides such a functionality.



回答2:

As stated in one of the comments:

the javadoc for setMessageSelector says it can be set at runtime. http://docs.spring.io/spring-framework/docs/2.5.x/api/org/springframework/jms/listener/AbstractMessageListenerContainer.html#setMessageSelector(java.lang.String)

This example explains how to setup at startup but doing it dynamically should be possible with a few more tricks:

@EnableJms
@Configuration
public class JmsConfiguration {

    @Value("${my.int.param:100}")
    private int config;

    @Bean
    public MessageConverter messageConverter() {
        final MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    @Bean
    public JmsListenerContainerFactory<?> specialQueueListenerFactory() {
        final String selector = "foo > " + config;
        final DefaultJmsListenerContainerFactory factory = new CustomJmsListenerContainerFactory(selector);
        factory.setMessageConverter(messageConverter());
        return factory;
    }
}

And the CustomJmsListenerContainerFactory

public class CustomJmsListenerContainerFactory extends DefaultJmsListenerContainerFactory {

    private final String selector;

    public CustomJmsListenerContainerFactory(String jmsSelector) {
        this.selector = jmsSelector;
    }

    @Override
    public DefaultMessageListenerContainer createListenerContainer(JmsListenerEndpoint endpoint) {
        final DefaultMessageListenerContainer instance = super.createListenerContainer(endpoint);
        instance.setMessageSelector(selector);
        return instance;
    }
}