I am using Spring's message-driven-channel-adapter. My component is consuming message from Tibco Topic and Publishing to RabbitMQ topic
So The message flow is as follows: Tibco-> (subscribed by )Component (Published to)-> RabbitMQ
The service activator is shown below: as we see there is a input-channel and an output-channel. The bean storeAndForwardActivator will have the business logic (within the method createIssueOfInterestOratorRecord)
<int:service-activator input-channel="inboundOratorIssueOfInterestJmsInputChannel"
ref="storeAndForwardActivator" method="createIssueOfInterestOratorRecord"
output-channel="outboundIssueOfInterestRabbitmqOratorJmsOutputChannel" />
I also have a message=driven-channel-adapter. This adapter will be invoked before the service adapter is invoked.
<int-jms:message-driven-channel-adapter
id="oratorIssueOfInterestInboundChannel" channel="inboundOratorIssueOfInterestJmsInputChannel"
container="oratorIssueOfInterestmessageListenerContainer" />
i.e. specifically the container (shown below) will hold the Topic name to be used - this is the DefaultMessageListenerContainer
<bean id="oratorIssueOfInterestmessageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
<property name="destination" ref="oratorTibcojmsDestination" />
<property name="sessionTransacted" value="true" />
<property name="maxConcurrentConsumers" value="1" />
<property name="concurrentConsumers" value="1" />
<property name="receiveTimeout" value="5000" />
<property name="recoveryInterval" value="60000" />
<property name="autoStartup" value="true" />
<property name="exposeListenerSession" value="false" />
<property name="subscriptionDurable" value="true" />
<property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
<property name="messageSelector" value="${topic.orator.selector}" />
</bean>
This set up works perfectly fine. However in some cases my consumer/component receives a 'rogue' message. i.e. an empty payload or a message type of HashMap (instead of plain TextMessage) - when we get this - what I observe is - an exception is caught at the DefaultMessageListener level (i.e. I don't go as far as my business bean i.e. storeAndForwardActivator), because of this my component is not sending ACK back - and since this is a durable Topic - there is a build of messages at the Topic - which is undesirable. Is there a way for me to ACK the message straight away irrespective of weather an exception is caught at the DefaultMessageListener level?
Or should I introduce an error handler at the DefaultMessageListener? What's the best way to handle this, any suggestions?
regards D
Update:
I tried adding a errorHandler to the org.springframework.jms.listener.DefaultMessageListenerContainer as shown below
<bean id="oratorIssueOfInterestmessageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
<property name="destination" ref="oratorTibcojmsDestination" />
<property name="sessionTransacted" value="true" />
<property name="maxConcurrentConsumers" value="1" />
<property name="concurrentConsumers" value="1" />
<property name="receiveTimeout" value="5000" />
<property name="recoveryInterval" value="60000" />
<property name="autoStartup" value="true" />
<property name="exposeListenerSession" value="false" />
<property name="subscriptionDurable" value="true" />
<property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
<property name="messageSelector" value="${topic.orator.selector}" />
<property name="errorHandler" ref="myErrorHandler"/>
</bean>
myErrorHandler is a bean as shpwn below
<bean id="myErrorHandler"
class="com.igate.firds.icmf.activators.concentrator.MyErrorHandler" />
MyErroHandler implements ErrorHandler
@Service
public class MyErrorHandler implements ErrorHandler{
private static Log log = LogFactory.getLog(MyErrorHandler.class);
@Override
public void handleError(Throwable t) {
if (t instanceof MessageHandlingException) {
MessageHandlingException exception = (MessageHandlingException) t;
if (exception != null) {
org.springframework.messaging.Message<?> message = exception.getFailedMessage();
Object payloadObject = message.getPayload();
if (null != payloadObject) {
log.info("Payload is not null, type is: " + payloadObject.getClass());
}
}
} else {
log.info("Exception is not of type: MessageHandlingException ");
}
}
}
What I notice is that the exception is caught (when the subscriber consumes a rogue message). I keep on seeing this log in a loop
Exception is not of type: MessageHandlingException
Exception is not of type: MessageHandlingException
Exception is not of type: MessageHandlingException
i.e. since the transaction is not committed - the same message from durable topic is consumed again and again. My aim is to send an ACK back to the broker after consuming the message (irrespective of weather an exception is caught or not).
I will try the error-channel tomorrow.
regards D