ActiveMQ Override scheduled message

2019-07-01 19:13发布

问题:

I am trying to implement delayed queue with overriding of messages using Active MQ.

Each message is scheduled to be delivered with delay of x (say 60 seconds)

In between if same message is received again it should override previous message.

So even if I receive 10 messages say in x seconds. Only one message should be processed.

Is there clean way to accomplish this?

回答1:

The question has two parts that need to be addressed separately:

Can a message be delayed in ActiveMQ?

Yes - see Delay and Schedule Message Delivery. You need to set <broker ... schedulerSupport="true"> in your ActiveMQ config, as well as setting the AMQ_SCHEDULED_DELAY property of the JMS message saying how long you want the message to be delayed (10000 in your case).

Is there any way to prevent the same message being consumed more than once?

Yes, but that's an application concern rather than an ActiveMQ one. It's often referred to as de-duplication or idempotent consumption. The simplest way if you only have one consumer is to keep track of messages received in a map, and check that map whether you receive a message. It it has been seen, discard.

For more complex use cases where you have multiple consumers on different machines, or you want that state to survive application restart, you will need to keep a table of messages seen in a database, and query it each time.

Please vote this answer up if it helps, as it encourages people to help you out.



回答2:

Also according to method from ActiveMQ BrokerService class you should configure persistence to have ability to use scheduler functionality.

public boolean isSchedulerSupport() {
    return this.schedulerSupport && (isPersistent() || jobSchedulerStore != null);
}


回答3:

you can configure activemq broker to enable "schedulerSupport" with the following entry in your activemq.xml file located in conf directory of your activemq home directory.

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">


回答4:

You can Override the BrokerService in your configuration

    @Configuration
    @EnableJms
    public class JMSConfiguration {

        @Bean
        public BrokerService brokerService() throws Exception {
            BrokerService brokerService = new BrokerService();
            brokerService.setSchedulerSupport(true);
            return brokerService;
        }
    }