查询有关春的消息驱动通道适配器(Query regarding Spring message-dri

2019-10-22 03:38发布

我使用Spring的消息驱动通道适配器。 我的部件消耗来自Tibco的主题和发布消息的RabbitMQ话题

所以该消息流程如下:Tibco->(由订阅)组分(发布到) - >的RabbitMQ

服务活化剂如下所示:正如我们看到有一个输入通道和一个输出通道。 这个bean storeAndForwardActivator将业务逻辑(方法createIssueOfInterestOratorRecord内)

<int:service-activator input-channel="inboundOratorIssueOfInterestJmsInputChannel"
    ref="storeAndForwardActivator" method="createIssueOfInterestOratorRecord"
    output-channel="outboundIssueOfInterestRabbitmqOratorJmsOutputChannel" />

我也有一个消息=从动通道适配器。 在调用服务适配器之前该适配器将被调用。

<int-jms:message-driven-channel-adapter
    id="oratorIssueOfInterestInboundChannel" channel="inboundOratorIssueOfInterestJmsInputChannel"
    container="oratorIssueOfInterestmessageListenerContainer" />

即具体地,容器(如下所示)将持有主题名称中使用 - 这是使用DefaultMessageListenerContainer

<bean id="oratorIssueOfInterestmessageListenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
    <property name="destination" ref="oratorTibcojmsDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="concurrentConsumers" value="1" />
    <property name="receiveTimeout" value="5000" />
    <property name="recoveryInterval" value="60000" />
    <property name="autoStartup" value="true" />
    <property name="exposeListenerSession" value="false" />
    <property name="subscriptionDurable" value="true" />
    <property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
    <property name="messageSelector" value="${topic.orator.selector}" />
</bean>

这种设置工作完全正常。 然而,在某些情况下,我的消费/组件接收一个“流氓”的消息。 即空的净荷或HashMap中的消息类型(而不是纯的TextMessage的) - 当我们得到这个 - 我观察的是 - 一个例外是在DefaultMessageListener一级抓(即我没有去尽可能我的生意豆即storeAndForwardActivator ),因为这是我的分量不发送ACK回来 - 而且由于这是一个持久的话题 - 有在主题消息的构建 - 这是不可取的。 有没有办法对我来说,ACK消息马上不论天气的异常是在DefaultMessageListener一级抓的?

或者我应该推出在DefaultMessageListener一个错误处理程序? 什么是处理这个问题,任何建议的最佳方式?

d长相

更新:

我尝试添加一个的ErrorHandler到org.springframework.jms.listener.DefaultMessageListenerContainer如下所示

<bean id="oratorIssueOfInterestmessageListenerContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="oratorIssueOfInterestTibcoConnectionFactory" />
    <property name="destination" ref="oratorTibcojmsDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="1" />
    <property name="concurrentConsumers" value="1" />
    <property name="receiveTimeout" value="5000" />
    <property name="recoveryInterval" value="60000" />
    <property name="autoStartup" value="true" />
    <property name="exposeListenerSession" value="false" />
    <property name="subscriptionDurable" value="true" />
    <property name="durableSubscriptionName" value="${topic.orator.durable-subscription-name}" />
    <property name="messageSelector" value="${topic.orator.selector}" />

    <property name="errorHandler" ref="myErrorHandler"/>
</bean>

myErrorHandler是豆如下shpwn

<bean id="myErrorHandler"
    class="com.igate.firds.icmf.activators.concentrator.MyErrorHandler" />

MyErroHandler实现的ErrorHandler

 @Service
 public class MyErrorHandler implements ErrorHandler{

private static Log log = LogFactory.getLog(MyErrorHandler.class);

@Override
   public void handleError(Throwable t) {

        if (t instanceof MessageHandlingException) {
            MessageHandlingException exception = (MessageHandlingException) t;
            if (exception != null) {
                org.springframework.messaging.Message<?> message = exception.getFailedMessage();
                Object payloadObject = message.getPayload();
                if (null != payloadObject) {
                    log.info("Payload  is not null, type is: " + payloadObject.getClass());
                }
            }
        } else {
            log.info("Exception is not of type: MessageHandlingException ");
        }
}

}

我注意到的是异常被捕获(当用户消耗了流氓消息)。 我一直在循环中看到该日志

    Exception is not of type: MessageHandlingException 
    Exception is not of type: MessageHandlingException 
    Exception is not of type: MessageHandlingException 

即因为事务未提交 - 从持久主题相同的消息被一次又一次的消耗。 我的目标是耗时的消息后,发送一个ACK返回给代理(不管天气的异常被捕获或没有)。

我会尝试错误通道的明天。

d长相

Answer 1:

添加error-channel的消息驱动的适配器; 所述ErrorMessage将包含MessagingException具有两个字段的有效载荷; 的cause (例外)和failedMessage

如果您使用默认的error-channel="errorChannel"中,记录异常。

如果你想要做的比您可以配置自己的错误渠道和增加一些流量更多。

编辑:

另外下面您的意见...

payload must not be null不是堆叠迹线; 这是一个消息。

这就是说, payload must not be null看起来像Spring集成消息; 它可能抛出的消息转换,这是在我们到达一个点,失败可以去在消息监听器适配器error-channel ; 这样的会抛出异常回容器。

打开调试日志,查找此日志条目:

logger.debug("converted JMS Message [" + jmsMessage + "] to integration Message payload [" + result + "]");

另外,提供了一个完整的堆栈跟踪。

编辑#2

所以,我通过强制转换的有效载荷为空在自定义转载您的问题MessageConverter

该DMLC错误处理程序由容器事务回滚回来后,所以没有办法阻止回滚调用。

我们可以添加一个选项,适配器以不同方式处理这样的错误,但是这将需要一些工作。

在此期间,一个变通是写一个定制MessageConverter ; 有点像一个在这个要点 。

然后,您的服务将不得不应对处理负载的“接收到错误信息”。

然后您提供这样的定制转换器...

<jms:message-driven-channel-adapter id="jmsIn"
        destination="requestQueue" acknowledge="transacted"
        message-converter="converter"
        channel="jmsInChannel" />

<beans:bean id="converter" class="foo.MyMessageConverter" />


文章来源: Query regarding Spring message-driven-channel-adapter