Spring integration inbound-gateway Fire an event w

2019-07-11 21:13发布

问题:

I'm a newbie around but I'll try to be consice.

{INPUT QUEUE}->[INBOUND-GATEWAY-1]-->[ROUTER]----------->(ACTIVATOR)<---------------
                                        \                                          /
                                         \-->{HOLD QUEUE}--->[INBOUND-GATEWAY-2]--^

I'm having an scenario in which I have to dynamically change routing conditions in a flow like the former. Messages comming from a queue are sent to an activator to be processed, or another queue to be put on hold. At certain time, I have to close INBOUND-GATEWAY-1 so no new messages come into the flow, and open INBOUND-GATEWAY-2 to let all messages from HOLD QUEUE be processed. Once all messages from HOLD QUEUE were been consumed, both gateways must me closed/opened as they were before. The thing here is how could I know when HOLD QUEUE is empty so I could trigger a method in which gateway-1 could be started?

I'd be grateful if somebody could help me.

Thanks in advance

回答1:

After some debugging and reading, finally I came to a solution for this issue. An inbound-gateway is a JmsMessageDrivenEndpoint, based in two inner components, a MessageListenerContainer and a MessageListener. MessageListenerContainer is the one in charge at scheduling MessageListener behaviour so, overriding the noMessageReceived and messageReceived, and adding some attributes to control the desired behaviour, I could be able to do the "magic".

My MessageListenerContainer implementation got like this.

public class ControlMessageListenerContainer extends DefaultMessageListenerContainer{

    private JmsMessageDrivenEndpoint mainInputGateway;

    private long timeOut;

    private long lastTimeReceived;  

    public PassControlMessageListenerContainer() {
        this.setAutoStartup(false);
    }

    @Override
    public void start() throws JmsException {
        /*When the container is started the lastTimeReceived is set to actial time*/
        lastTimeReceived = (new Date()).getTime();
        super.start();
    }

    @Override
    protected void noMessageReceived(Object invoker, Session session) {
        long actualTime = (new Date()).getTime();

        if((actualTime - lastTimeReceived) >= timeOut 
                && mainInputGateway != null && !mainInputGateway.isRunning()){
            mainInputGateway.start();
        }       
        super.noMessageReceived(invoker, session);
    }

    @Override
    protected void messageReceived(Object invoker, Session session) {
        /*lastTimeReceived is set again to actual time at new message arrive*/
        lastTimeReceived = (new Date()).getTime();
        super.messageReceived(invoker, session);
    }
}

And finally, the spring bean config get like this:

<bean id="listenerContainer" 
    class="org.merol.ControlMessageListenerContainer">
    <property name="mainInputGateway" ref="mainGateway" />
    <property name="destination" ref="onHoldQueue" />
    <property name="timeOut" value="10000"/>
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

<bean id="messageListener" 
    class="org.springframework.integration.jms.ChannelPublishingJmsMessageListener">
    <property name="requestChannel" ref="outputChannel" />
</bean>

<bean id="inboundGateway" 
    class="org.springframework.integration.jms.JmsMessageDrivenEndpoint">
    <constructor-arg name="listenerContainer" ref="listenerContainer" />
    <constructor-arg name="listener" ref="messageListener" />
</bean>

Hope this could be helpful for someone else.

Thanks to @Nicholas for the clues.



回答2:

I would put this functionality into the inbound gateway processors. For example:

Gateway1Processor:

  • start(): Start consumer off the main queue and process.
  • stop(): Stop consumer.

Gateway2Processor:

  • start(): Start consumer off the HOLD queue. Specify an appropriate timeout. When timeout is fired, (the HOLD queue is empty) call stop().
  • stop(): Start Gateway1Processor and stop this consumer.

Therefore, the operating sequence would be:

  1. Start Gateway1Processor
  2. At a certain time, call Gateway1Processor.stop() and Gateway2Processor.start()
  3. Gateway2Processor will drain the HOLD queue, restart Gateway1Processor and then stop.
  4. Go To #2.