Spring Integration Bridge with poller not working

2019-07-19 08:10发布

问题:

Using spring-integration 5.0.7 to throttle the bridging of msgs between two JMS queues.

The docs at: https://docs.spring.io/spring-integration/docs/5.0.7.RELEASE/reference/html/messaging-channels-section.html#bridge-namespace

suggest:

<int:bridge input-channel="pollable" output-channel="subscribable">
     <int:poller max-messages-per-poll="10" fixed-rate="5000"/>
 </int:bridge>

But schema validator complains "no nested poller allowed for subscribable input channel" on bridge elt.

But, if I put the poller on the input-channel-adapter as in:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int="http://www.springframework.org/schema/integration"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:beans="http://www.springframework.org/schema/beans"
             xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
             xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/jms
            http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
 ">
    <int:channel id="inChannel" />
    <int:channel id="outChannel" />

    <int-jms:inbound-channel-adapter id="jmsIn" connection-factory="jmsConnectionFactory" destination-name="_dev.inQueue" channel="inChannel">
       <int:poller fixed-delay="5000" max-messages-per-poll="2"/>
    </int-jms:inbound-channel-adapter>

    <int-jms:outbound-channel-adapter id="jmsOut" connection-factory="jmsConnectionFactory" destination-name="_dev.outQueue" channel="outChannel"/>
    <int:bridge input-channel="inChannel" output-channel="outChannel">
    </int:bridge>
</beans:beans>

Nothing is ever moved from input to output.

How can I bridge from one JMS queue to another with a rate-limit?

Update:

Turning on logging confirms nothing getting picked up from input channel but otherwise not helpful:

018-08-10 15:36:33.345 DEBUG 112066 --- [ask-scheduler-1] o.s.i.e.SourcePollingChannelAdapter      : Received no Message during the poll, returning 'false'
2018-08-10 15:36:38.113 DEBUG 112066 --- [ask-scheduler-2] o.s.integration.jms.DynamicJmsTemplate   : Executing callback on JMS Session: ActiveMQSession {id=ID:whitechapel-35247-1533940593148-3:2:1,started=true} java.lang.Object@5c278302
2018-08-10 15:36:38.116 DEBUG 112066 --- [ask-scheduler-2] o.s.i.e.SourcePollingChannelAdapter      : Received no Message during the poll, returning 'false'
2018-08-10 15:36:43.115 DEBUG 112066 --- [ask-scheduler-1] o.s.integration.jms.DynamicJmsTemplate   : Executing callback on JMS Session: ActiveMQSession {id=ID:whitechapel-35247-1533940593148-3:3:1,started=true} java.lang.Object@1c09a81e
2018-08-10 15:36:43.118 DEBUG 112066 --- [ask-scheduler-1] o.s.i.e.SourcePollingChannelAdapter      : Received no Message during the poll, returning 'false'

回答1:

Here is a Spring Boot app, using Java DSL configuration which is the exact equivalent of what you have in XML (minus the bridge); it works fine.

@SpringBootApplication
public class So51792909Application {

    private static final Logger logger = LoggerFactory.getLogger(So51792909Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So51792909Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> {
            for (int i = 0; i < 10; i++) {
                template.convertAndSend("foo", "test");
            }
        };
    }

    @Bean
    public IntegrationFlow flow(ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Jms.inboundAdapter(connectionFactory)
                        .destination("foo"), e -> e
                            .poller(Pollers
                                    .fixedDelay(5000)
                                    .maxMessagesPerPoll(2)))
                .handle(Jms.outboundAdapter(connectionFactory)
                        .destination("bar"))
                .get();
    }

    @JmsListener(destination = "bar")
    public void listen(String in) {
        logger.info(in);
    }

}

and

2018-08-10 19:38:52.534  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:38:52.543  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:38:57.566  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:38:57.582  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:02.608  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:02.622  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:07.640  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:07.653  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:12.672  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test
2018-08-10 19:39:12.687  INFO 13408 --- [enerContainer-1] com.example.So51792909Application        : test

As you can see, the consumer gets 2 messages every 5 seconds.

Your debug log implies there are no messages in the queue.

EDIT

I figured it out; the XML parser sets the JmsTemplate receiveTimeout to nowait (-1). Since you are not using a caching connection factory, we'll never get a message because the ActiveMQ client returns immediately if there's not already a message present in the client (see this answer). Since there's no caching going on, we get a new consumer on every poll (and do a no-wait receive each time).

The DSL leaves the JmsTemplate's default (Infinite wait - which is actually wrong since it blocks the poller thread indefinitely if there are no messages).

To fix the XML version, adding receive-timeout="1000" fixes it.

However, it's better to use a CachingConnectionFactory to avoid creating a new connection/session/consumer on each poll.

Unfortunately, configurating a CachingConnectionFactory turns off Spring Boot's auto-configuration. This is fixed in Boot 2.1.

I have opened an issue to resolve the inconsistency between the DSL and XML here.

If you stick with the DSL, I would recommend setting the receive timeout to something reasonable, rather than indefinite:

@Bean
public IntegrationFlow flow(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(Jms.inboundAdapter(connectionFactory)
                    .configureJmsTemplate(t -> t.receiveTimeout(1000))
                    .destination("foo"), e -> e
                        .poller(Pollers
                                .fixedDelay(5000)
                                .maxMessagesPerPoll(2)))
            .handle(Jms.outboundAdapter(connectionFactory)
                    .destination("bar"))
            .get();
}

But, the best solution is to use a CachingConnectionFactory.