message-driven-channel-adapter drops first message

2020-04-23 03:00发布

问题:

I have an integration test for my Spring Integration config, which consumes messages from a JMS topic with durable subscription. For testing, I am using ActiveMQ instead of Tibco EMS. The issue I have is that I have to delay sending the first message to the endpoint using a sleep call at the beginning of our test method. Otherwise the message is dropped. If I remove the setting for durable subscription and selector, then the first message can be sent right away without delay. I'd like to get rid of the sleep, which is unreliable. Is there a way to check if the endpoint is completely setup before I send the message? Below is the configuration.

Thanks for your help!

    <int-jms:message-driven-channel-adapter
        id="myConsumer" connection-factory="myCachedConnectionFactory"
        destination="myTopic" channel="myChannel" error-channel="errorChannel"
        pub-sub-domain="true" subscription-durable="true"
        durable-subscription-name="testDurable"
        selector="..."
        transaction-manager="emsTransactionManager" auto-startup="false"/>

回答1:

If you are using a clean embedded activemq for the test, the durability of the subscription is irrelevant until the subscription is established. So you have no choice but to wait until that happens.

You could avoid the sleep by sending a series of startup messages and only start the real test when the last one is received.

EDIT

I forgot that there is a methodisRegisteredWithDestination() on the DefaultMessageListenerContainer.

Javadocs...

/**
 * Return whether at least one consumer has entered a fixed registration with the
 * target destination. This is particularly interesting for the pub-sub case where
 * it might be important to have an actual consumer registered that is guaranteed
 * not to miss any messages that are just about to be published.
 * <p>This method may be polled after a {@link #start()} call, until asynchronous
 * registration of consumers has happened which is when the method will start returning
 * {@code true} &ndash; provided that the listener container ever actually establishes
 * a fixed registration. It will then keep returning {@code true} until shutdown,
 * since the container will hold on to at least one consumer registration thereafter.
 * <p>Note that a listener container is not bound to having a fixed registration in
 * the first place. It may also keep recreating consumers for every invoker execution.
 * This particularly depends on the {@link #setCacheLevel cache level} setting:
 * only {@link #CACHE_CONSUMER} will lead to a fixed registration.
 */

We use it in some channel tests, where we get the container using reflection and then poll the method until we are subscribed to the topic.

/**
 * Blocks until the listener container has subscribed; if the container does not support
 * this test, or the caching mode is incompatible, true is returned. Otherwise blocks
 * until timeout milliseconds have passed, or the consumer has registered.
 * @see DefaultMessageListenerContainer#isRegisteredWithDestination()
 * @param timeout Timeout in milliseconds.
 * @return True if a subscriber has connected or the container/attributes does not support
 * the test. False if a valid container does not have a registered consumer within
 * timeout milliseconds.
 */
private static boolean waitUntilRegisteredWithDestination(SubscribableJmsChannel channel, long timeout) {
    AbstractMessageListenerContainer container =
            (AbstractMessageListenerContainer) new DirectFieldAccessor(channel).getPropertyValue("container");
    if (container instanceof DefaultMessageListenerContainer) {
        DefaultMessageListenerContainer listenerContainer =
            (DefaultMessageListenerContainer) container;
        if (listenerContainer.getCacheLevel() != DefaultMessageListenerContainer.CACHE_CONSUMER) {
            return true;
        }
        while (timeout > 0) {
            if (listenerContainer.isRegisteredWithDestination()) {
                return true;
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) { }
            timeout -= 100;
        }
        return false;
    }
    return true;
}