Getting a simple Spring JMS client acknowledge to

2019-03-25 03:19发布

问题:

Just starting to get my head around getting JMS ActiveMQ Acknowledgements working in Spring. So far I have a consumer working perfectly, with the exception that when I don't acknowledge the message, it's still taken from the queue (I expect it to stay there or end in a dead letter queue).

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

    <!-- A JMS connection factory for ActiveMQ -->
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
    p:brokerURL="failover://(tcp://jms1:61616,tcp://jms2:61616)?randomize=false&amp;jms.redeliveryPolicy.maximumRedeliveries=5" />

    <!-- A POJO that implements the JMS message listener -->
    <bean id="simpleMessageListener" class="com.company.ConsumerClass" />

    <!-- A JMS namespace aware Spring configuration for the message listener container -->
    <jms:listener-container
            container-type="default"
            connection-factory="connectionFactory"
            acknowledge="client"
            concurrency="10-50"
            cache="consumer">
        <jms:listener destination="someQueue" ref="simpleMessageListener" method="onMessage" />
    </jms:listener-container>
</beans>

In the ConsumerClass, my simple consumer looks something like this:

@Override public final void onMessage(Message message) {
    Object postedMessage = null;
    try {
        postedMessage = ((ObjectMessage) message).getObject();

        if (postedMessage.getClass() == SomeMessageType.class) {
            try {
                //Some logic here

                message.acknowledge();
                return; //Success Here
            } catch (MyException e) {
                logger.error("Could not process message, but as I didn't call acknowledge I expect it to end up in the dead message queue");
            }
        }
    } catch (JMSException e) {
        logger.error("Error occurred pulling Message from Queue", e);
    }

    //Also worth noting, if I throw new RuntimeException("Aww Noos"); here then it won't take it from the queue, but it won't get consumed (or end up as dead letter)...
}

回答1:

Read this documentation: Spring JMS container does not use the message.acknowledge()

The listener container offers the following message acknowledgment options:

"sessionAcknowledgeMode" set to "AUTO_ACKNOWLEDGE" (default): Automatic message acknowledgment before listener execution; no redelivery in case of exception thrown.
"sessionAcknowledgeMode" set to "CLIENT_ACKNOWLEDGE": Automatic message acknowledgment after successful listener execution; no redelivery in case of exception thrown.
"sessionAcknowledgeMode" set to "DUPS_OK_ACKNOWLEDGE": Lazy message acknowledgment during or after listener execution; potential redelivery in case of exception thrown.
"sessionTransacted" set to "true": Transactional acknowledgment after successful listener execution; guaranteed redelivery in case of exception thrown.



回答2:

I found the answer at http://ourcraft.wordpress.com/2008/07/21/simple-jms-transaction-rollbacks-work/

It appears it works well if you change acknowledge="transacted" and make sure you throw new RuntimeException("Message could not be consumed. Roll back transaction"); at the end of the OnMessage() routine.

Still no idea what acknowledge="client" achieves though



回答3:

Nowadays Spring provides good wrapper over plain JMS message listeners.

See JavaDocs of AbstractMessageListenerContainer.

"sessionAcknowledgeMode" set to "CLIENT_ACKNOWLEDGE": Automatic message acknowledgment after successful listener execution; best-effort redelivery in case of a user exception thrown as well as in case of other listener execution interruptions (such as the JVM dying).

So, when you define a @JmsListener method, the acknowledgement is sent automatically when it's completed successfully, but you can throw an exception to receive the message again.



回答4:

In my practice client acknowledgement rarely if ever used. Typical setup is auto acknowledge so if your code returns from onMessage() normally (w/o exception) the message is automatically acknowledged. If your code throws exception from onMessage() there is no acknowledgement and message typically would be re-delivered up to pre-configured number of times, after which it would typically be discarded or put into dead message queue.

In your case, from JMS server point of view, it looks like client asked for message but never acknowledged so it's still 'being processed' by the client. In such case message would be invisible for other consumers on the same Queue so you might get the impression that message was 'take off the queue' while as the matter of fact it's still there. Obviously you won't see such message in dead message queue either.

I suggest you read JMS specification to get clear understanding of different acknowledgement modes.



回答5:

Call acknowledge() method on message in your consumer.

Consider the following scenario: An application receives but does not acknowledge a message. The application receives a subsequent message and acknowledges it. What happens to the former message? The former message is also considered acknowledged. Generally, acknowledging a particular message acknowledges all prior messages the session receives. In the above output, only message 5 is explicitly acknowledged. All the messages before message 5 are implicitly acknowledged. Messages after message 5 are not acknowledged.

For more details refer this article

Also Check this article Sun Java System Message Queue 4.3 Developer's Guide for Java Clients



回答6:

Use following code it will work.

<bean id="{containerName}"  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref={connectionFactoryBean} />
    <property name="destinationName" ref="{queue}" />
    <property name="messageListener" ref="{listner}" />
    <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/>
</bean>