我使用Spring的消息驱动通道适配器。 我的部件消耗来自Tibco的主题和发布消息的RabbitMQ话题
所以该消息流程如下:Tibco->(由订阅)组分(发布到) - >的RabbitMQ
服务活化剂如下所示:正如我们看到有一个输入通道和一个输出通道。 这个bean storeAndForwardActivator将业务逻辑(方法createIssueOfInterestOratorRecord内)
<int:service-activator input-channel="inboundOratorIssueOfInterestJmsInputChannel"
ref="storeAndForwardActivator" method="createIssueOfInterestOratorRecord"
output-channel="outboundIssueOfInterestRabbitmqOratorJmsOutputChannel" />
我也有一个消息=从动通道适配器。 在调用服务适配器之前该适配器将被调用。
<int-jms:message-driven-channel-adapter
id="oratorIssueOfInterestInboundChannel" channel="inboundOratorIssueOfInterestJmsInputChannel"
container="oratorIssueOfInterestmessageListenerContainer" />
即具体地,容器(如下所示)将持有主题名称中使用 - 这是使用DefaultMessageListenerContainer
<bean id="oratorIssueOfInterestmessageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
<property name="destination" ref="oratorTibcojmsDestination" />
<property name="sessionTransacted" value="true" />
<property name="maxConcurrentConsumers" value="1" />
<property name="concurrentConsumers" value="1" />
<property name="receiveTimeout" value="5000" />
<property name="recoveryInterval" value="60000" />
<property name="autoStartup" value="true" />
<property name="exposeListenerSession" value="false" />
<property name="subscriptionDurable" value="true" />
<property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
<property name="messageSelector" value="${topic.orator.selector}" />
</bean>
这种设置工作完全正常。 然而,在某些情况下,我的消费/组件接收一个“流氓”的消息。 即空的净荷或HashMap中的消息类型(而不是纯的TextMessage的) - 当我们得到这个 - 我观察的是 - 一个例外是在DefaultMessageListener一级抓(即我没有去尽可能我的生意豆即storeAndForwardActivator ),因为这是我的分量不发送ACK回来 - 而且由于这是一个持久的话题 - 有在主题消息的构建 - 这是不可取的。 有没有办法对我来说,ACK消息马上不论天气的异常是在DefaultMessageListener一级抓的?
或者我应该推出在DefaultMessageListener一个错误处理程序? 什么是处理这个问题,任何建议的最佳方式?
d长相
更新:
我尝试添加一个的ErrorHandler到org.springframework.jms.listener.DefaultMessageListenerContainer如下所示
<bean id="oratorIssueOfInterestmessageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
<property name="destination" ref="oratorTibcojmsDestination" />
<property name="sessionTransacted" value="true" />
<property name="maxConcurrentConsumers" value="1" />
<property name="concurrentConsumers" value="1" />
<property name="receiveTimeout" value="5000" />
<property name="recoveryInterval" value="60000" />
<property name="autoStartup" value="true" />
<property name="exposeListenerSession" value="false" />
<property name="subscriptionDurable" value="true" />
<property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
<property name="messageSelector" value="${topic.orator.selector}" />
<property name="errorHandler" ref="myErrorHandler"/>
</bean>
myErrorHandler是豆如下shpwn
<bean id="myErrorHandler"
class="com.igate.firds.icmf.activators.concentrator.MyErrorHandler" />
MyErroHandler实现的ErrorHandler
@Service
public class MyErrorHandler implements ErrorHandler{
private static Log log = LogFactory.getLog(MyErrorHandler.class);
@Override
public void handleError(Throwable t) {
if (t instanceof MessageHandlingException) {
MessageHandlingException exception = (MessageHandlingException) t;
if (exception != null) {
org.springframework.messaging.Message<?> message = exception.getFailedMessage();
Object payloadObject = message.getPayload();
if (null != payloadObject) {
log.info("Payload is not null, type is: " + payloadObject.getClass());
}
}
} else {
log.info("Exception is not of type: MessageHandlingException ");
}
}
}
我注意到的是异常被捕获(当用户消耗了流氓消息)。 我一直在循环中看到该日志
Exception is not of type: MessageHandlingException
Exception is not of type: MessageHandlingException
Exception is not of type: MessageHandlingException
即因为事务未提交 - 从持久主题相同的消息被一次又一次的消耗。 我的目标是耗时的消息后,发送一个ACK返回给代理(不管天气的异常被捕获或没有)。
我会尝试错误通道的明天。
d长相