Query regarding Spring message-driven-channel-adap

2019-08-01 07:10发布

I am using Spring's message-driven-channel-adapter. My component is consuming message from Tibco Topic and Publishing to RabbitMQ topic

So The message flow is as follows: Tibco-> (subscribed by )Component (Published to)-> RabbitMQ

The service activator is shown below: as we see there is a input-channel and an output-channel. The bean storeAndForwardActivator will have the business logic (within the method createIssueOfInterestOratorRecord)

<int:service-activator input-channel="inboundOratorIssueOfInterestJmsInputChannel"
    ref="storeAndForwardActivator" method="createIssueOfInterestOratorRecord"
    output-channel="outboundIssueOfInterestRabbitmqOratorJmsOutputChannel" />

I also have a message=driven-channel-adapter. This adapter will be invoked before the service adapter is invoked.

<int-jms:message-driven-channel-adapter
    id="oratorIssueOfInterestInboundChannel" channel="inboundOratorIssueOfInterestJmsInputChannel"
    container="oratorIssueOfInterestmessageListenerContainer" />

i.e. specifically the container (shown below) will hold the Topic name to be used - this is the DefaultMessageListenerContainer

<bean id="oratorIssueOfInterestmessageListenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
    <property name="destination" ref="oratorTibcojmsDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="concurrentConsumers" value="1" />
    <property name="receiveTimeout" value="5000" />
    <property name="recoveryInterval" value="60000" />
    <property name="autoStartup" value="true" />
    <property name="exposeListenerSession" value="false" />
    <property name="subscriptionDurable" value="true" />
    <property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
    <property name="messageSelector" value="${topic.orator.selector}" />
</bean>

This set up works perfectly fine. However in some cases my consumer/component receives a 'rogue' message. i.e. an empty payload or a message type of HashMap (instead of plain TextMessage) - when we get this - what I observe is - an exception is caught at the DefaultMessageListener level (i.e. I don't go as far as my business bean i.e. storeAndForwardActivator), because of this my component is not sending ACK back - and since this is a durable Topic - there is a build of messages at the Topic - which is undesirable. Is there a way for me to ACK the message straight away irrespective of weather an exception is caught at the DefaultMessageListener level?

Or should I introduce an error handler at the DefaultMessageListener? What's the best way to handle this, any suggestions?

regards D

Update:

I tried adding a errorHandler to the org.springframework.jms.listener.DefaultMessageListenerContainer as shown below

<bean id="oratorIssueOfInterestmessageListenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
    <property name="destination" ref="oratorTibcojmsDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="concurrentConsumers" value="1" />
    <property name="receiveTimeout" value="5000" />
    <property name="recoveryInterval" value="60000" />
    <property name="autoStartup" value="true" />
    <property name="exposeListenerSession" value="false" />
    <property name="subscriptionDurable" value="true" />
    <property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
    <property name="messageSelector" value="${topic.orator.selector}" />

    <property name="errorHandler" ref="myErrorHandler"/>
</bean>

myErrorHandler is a bean as shpwn below

<bean id="myErrorHandler"
    class="com.igate.firds.icmf.activators.concentrator.MyErrorHandler" />

MyErroHandler implements ErrorHandler

 @Service
 public class MyErrorHandler implements ErrorHandler{

private static Log log = LogFactory.getLog(MyErrorHandler.class);

@Override
   public void handleError(Throwable t) {

        if (t instanceof MessageHandlingException) {
            MessageHandlingException exception = (MessageHandlingException) t;
            if (exception != null) {
                org.springframework.messaging.Message<?> message = exception.getFailedMessage();
                Object payloadObject = message.getPayload();
                if (null != payloadObject) {
                    log.info("Payload  is not null, type is: " + payloadObject.getClass());
                }
            }
        } else {
            log.info("Exception is not of type: MessageHandlingException ");
        }
}

}

What I notice is that the exception is caught (when the subscriber consumes a rogue message). I keep on seeing this log in a loop

    Exception is not of type: MessageHandlingException 
    Exception is not of type: MessageHandlingException 
    Exception is not of type: MessageHandlingException 

i.e. since the transaction is not committed - the same message from durable topic is consumed again and again. My aim is to send an ACK back to the broker after consuming the message (irrespective of weather an exception is caught or not).

I will try the error-channel tomorrow.

regards D

1条回答
beautiful°
2楼-- · 2019-08-01 07:49

Add an error-channel to the message-driven adapter; the ErrorMessage will contain a MessagingException payload that has two fields; the cause (exception) and failedMessage.

If you use the default error-channel="errorChannel", the exception is logged.

If you want to do more than that you can configure your own error channel and add some flow to it.

EDIT:

Further to your comments below...

payload must not be null is not a stack trace; it's a message.

That said, payload must not be null looks like a Spring Integration message; it is probably thrown in the message listener adapter during message conversion, which is before we get to a point where the failure can go to the error-channel; such an exception will be thrown back to the container.

Turn on DEBUG logging and look for this log entry:

logger.debug("converted JMS Message [" + jmsMessage + "] to integration Message payload [" + result + "]");

Also, provide a FULL stack trace.

EDIT#2

So, I reproduced your issue by forcing the converted payload to null in a custom MessageConverter.

The DMLC error handler is called by the container after the transaction is rolled back so there's no way to stop the rollback.

We can add an option to the adapter to handle such errors differently but that will take some work.

In the meantime, a work-around would be to write a custom MessageConverter; something like the one in this Gist.

Then, your service will have to deal with handling the "Bad Message Received" payload.

You then provide the custom converter like this...

<jms:message-driven-channel-adapter id="jmsIn"
        destination="requestQueue" acknowledge="transacted"
        message-converter="converter"
        channel="jmsInChannel" />

<beans:bean id="converter" class="foo.MyMessageConverter" />
查看更多
登录 后发表回答