How to make aggregator wait for response from all

2019-09-14 18:26发布

问题:

I have used the below spring configuration to combine results from publish subscribe channel using aggregator. But the aggregator populates response only from the first service activator in the publish subscribe channel and it does not wait for the response from the other service activators. How should i modify my configuration to make the aggregator wait for response from all 4 service activators?

<int:bridge id="ValidationsBridge" input-channel="RequestChannel" output-channel="bridgeOutputChannel"></int:bridge>

    <int:publish-subscribe-channel id="bridgeOutputChannel"  apply-sequence="true" />
        <int:service-activator input-channel="bridgeOutputChannel" output-channel="aggregatorInput"
        method="populateResponse1" ref="WebServiceImpl" >
    </int:service-activator>
    <int:service-activator input-channel="bridgeOutputChannel" method="populateResponse2" ref="WebServiceImpl" output-channel="aggregatorInput"
    >
    </int:service-activator>
    <int:service-activator input-channel="bridgeOutputChannel" method="populateResponse3" ref="WebServiceImpl" output-channel="aggregatorInput"
    >
    </int:service-activator>
    <int:service-activator input-channel="bridgeOutputChannel" method="populateResponse4" ref="WebServiceImpl" output-channel="aggregatorInput"
    >
    </int:service-activator>

    <task:executor id="executor" pool-size="4" keep-alive="20"/> 

    <int:aggregator input-channel="aggregatorInput" output-channel="aggregatorOutput" ref="vehicleAggregator"   method="populateResponse"
     ></int:aggregator>


    <int:service-activator id="processorServiceActivator" input-channel="aggregatorOutput" ref="Processor" method="mapResponse"  output-channel="ResponseChannel"/> 


     <int:channel id="bridgeOutputChannel" />
     <int:channel id="aggregatorInput" />
    <int:channel id="aggregatorOutput" /> 

  </beans>

Below is a snippet from my aggregator

public Message<?> populateResponse(Collection<Message<?>> message){
        MessageBuilder<?> MsgBuilder =null;
        MsgBuilder=MessageBuilder.withPayload(message.iterator().next().getPayload());
        for (Message<?> message2 : message) {

            if(null!=message2.getHeaders().get(Constants.RESPONSE1)){
                MsgBuilder.setHeader(Constants.RESPONSE1, message2.getHeaders().get(Constants.RESPONSE1));
            }
            if(null!=message2.getHeaders().get(Constants.RESPONSE2)){
                MsgBuilder.setHeader(Constants.RESPONSE2, message2.getHeaders().get(Constants.RESPONSE2));
            }
                    }
        return (Message<?>) MsgBuilder.build();
    }

回答1:

You should use apply-sequence="true" on that publish-subscribe-channel (which should be there by default) and don't use any correlation options on the aggregator - just rely on the default correlationKey header populated by that apply-sequence.

Having the correlation strategy based on the message id (your code), makes the aggregator to build new groups for each message, just because message id is always unique.

By the way, you don't need release strategy as well. The aggregator easily do that by the populate sequenceNumber header.

And I'm not sure that you need group-timeout too.

In other words what you need for your use-case is just rely on the out-of-the-box sequence details feature:

http://docs.spring.io/spring-integration/docs/4.3.9.RELEASE/reference/html/messaging-channels-section.html#channel-configuration-pubsubchannel http://docs.spring.io/spring-integration/docs/4.3.9.RELEASE/reference/html/messaging-routing-chapter.html#aggregator