Spring AMQP Integration - Consumer Manual Acknowle

2019-07-06 21:19发布

问题:

I am testing Spring-AMQP with Spring-Integration support, I've following configuration and test:

<rabbit:connection-factory id="connectionFactory" />
<rabbit:queue name="durableQ"/>
<int:channel id="consumingChannel">
    <int:queue capacity="2"/> <!-- Message get Acked as-soon-as filled in Q -->
</int:channel>

<int-amqp:inbound-channel-adapter 
    channel="consumingChannel"
    queue-names="durableQ" 
    connection-factory="connectionFactory"
    concurrent-consumers="1"
    acknowledge-mode="AUTO"
    />


public static void main(String[] args) {
System.out.println("Starting consumer with integration..");
    AbstractApplicationContext context = new ClassPathXmlApplicationContext(
    "classpath:META-INF/spring/integration/spring-integration-context-consumer.xml");

    PollableChannel consumingChannel = context.getBean("consumingChannel",   
                                                          PollableChannel.class);           
        int count = 0;
        while (true) {
            Message<?> msg = consumingChannel.receive(1000);
            System.out.println((count++) + " \t -> " + msg);

            try { //sleep to check number of messages in queue
                Thread.sleep(50000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

In this configuration it was evident that as soon as message arrives at consumingChannel they are Acked and hence removed from queue. I validated this by placing a high sleep after receive and check queue-size. There are no further control on it.

Now if I set acknowledge-mode=MANUAL, there are no ways seems to do manual ack via spring integration.

My need is to process message and after processing do a manual-ack so till ack message remains persisted at durableQ.

Is there any way to handle MANUAL ack with spring-amqp-integration? I want to avoid passing ChannelAwareMessageListener to inbound-channel-adapter since I want to have control of consumer's receive.

Update:

It even doesn't seems to be possible when using own listener-container with inbound-channel-adapter:

// Below creates a default direct-channel (spring-integration channel) named "adapter", to receive poll this channel which is same as above
<int-amqp:inbound-channel-adapter id="adapter" listener-container="amqpListenerContainer" /> 

<bean id="amqpListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="durableQ" />
    <property name="acknowledgeMode" value="MANUAL" />

// messageListener not allowed when using with adapter, so no way of having own ChannelAwareMessageListener, so no channel exposed onMessage, hence no way to ack
    <property name="messageListener" ref="listener"/>
</bean>
<bean id="listener" class="com.sd.springint.rmq.MsgListener"/>

Above configuration throws error as messageListener property is not allowed, see inline comment on tag. So purpose of using listner-container got defeated (for exposing channel via ChannelAwareMessageListener).

To me spring-integration cannot be used for manual-acknowledgement (I know, this is a hard saying!), Can anyone help me in validating this or Is there any specific approach/configuration required for this which I am missing?

回答1:

The problem is because you are using async handoff using a QueueChannel. It is generally better to control the concurrency in the container (concurrent-consumers="2") and don't do any async handoffs in your flow (use DirectChannels). That way, AUTO ack will work just fine. Instead of receiving from the PollableChannel subscribe a new MessageHandler() to a SubscribableChannel.

Update:

You normally don't need to deal with Messages in an SI application, but the equivalent of your test with a DirectChannel would be...

    SubscribableChannel channel = context.getBean("fromRabbit", SubscribableChannel.class);

    channel.subscribe(new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("Got " + message);
        }
    });


回答2:

MANUAL Ack is allowed only via Channel.basicAck(). So, you should have an access to the Channel, on which your message was received.

Try to play with advice-chain of <int-amqp:inbound-channel-adapter>:

  1. Implement some Advice as MethodBeforeAdvice
  2. The advice-chain on Container is applied for ContainerDelegate#invokeListener
  3. The first argument of that method is exactly a Channel
  4. Suppose you can place to the MessageProperties.headers that Channel within that Advice
  5. And configure <int-amqp:inbound-channel-adapter> with mapped-request-headers to that Channel.
  6. And in the end try to invoke basicAck() on that Channel header from Spring Integration Message in the any place of your downstream flow.