Message not available at end of Aggregator

2019-09-06 17:54发布

问题:

I have built a Spring Integration application and transferred some messages around and tried to bring them together with an Aggregator. The application reaches the Aggregator but does not deliver exactly what I want specifically I do not release the group and move onto the next step.

My problem however is my aggregator doesn't have the original message (from before the Splitter). My aggregator is defined as follows

<int:aggregator input-channel="deirBoxProcessorToAggregatorChannel" 
                 ref="loggingAggregator" method="logAggregation"
                 output-channel="aggregatorToTransformer" 
                 expire-groups-upon-completion="true"/>

And the code inside it is as follows..

public class LoggingAggregator {

private static final Logger LOGGER = Logger.getLogger(LoggingAggregator.class);

public void logAggregation(Message<File> message) {
    LOGGER.info("Have aggregated messsages. Will archive");
}

My message in that method, although it enters it, is always null.


Application Context/XML Spring Integration definition

<int:splitter input-channel="transformerToSplitterChannel" 
              ref="fileToMessageSplitter"
              output-channel="shippedSplitterToRouterChannel" 
              method="split" apply-sequence="true"/>

<!-- Now use a router to determine which Message builder these messages are sent onto -->
<int:router input-channel="shippedSplitterToRouterChannel" 
              ref="shippedToTypeRouter" />

<int:transformer input-channel="deirShippedBoxToTransformerChannel" 
               ref="shippedBoxTransformer" method="transform" output-
               channel="deirShippedTransformerToProcessorChannel"/>

<int:service-activator id="wellFormedShippedBoxProcess" 
                input-channel="deirShippedTransformerToProcessorChannel"
                output-channel="deirBoxProcessorToAggregatorChannel"
                ref="deirShippedFileProcessor" method="processBox" />

<int:service-activator id="malformedShippedBoxProcess"
                input-channel="deirMalformedShippedTransformerToProcessorChannel"
                output-channel="deirBoxProcessorToAggregatorChannel"
                ref="deirShippedFileProcessor" 
                method="processMalformedBox" />

<int:aggregator input-channel="deirBoxProcessorToAggregatorChannel" 
                ref="loggingAggregator" method="logAggregation"
                output-channel="aggregatorToTransformer" 
                expire-groups-upon-completion="true"/>

<int:transformer expression="headers.file_originalFile" 
                input-channel="aggregatorToTransformer"
                output-channel="transformerToArchiver" />

<int-file:outbound-channel-adapter id="deirArchiver" 
                channel="transformerToArchiver"
                directory="${dataexhange.springintg.refactor.archive.dir}"
                delete-source-files="true"/>

The process gets all the way to the Aggregator but does not seem to make it past to the Transformer or OutboundChannelAdapter archiver.

Thank you in advance.

回答1:

Your LoggingAggregator isn't correct. I recommend you to read the Reference Manual. Your logAggregation method should be like this:

public File logAggregation(List<String> lines) {
     LOGGER.info("Have aggregated messsages. Will archive");
     // Create Files from lines
     return file;
}

It is a main method of Aggregator: to get a list of objects and return one object.



回答2:

Artem's answer is correct. I mistakenly thought that the objects I returned to the aggregator would be of type that were sent off by the splitter. You can follow how through debugging I came to that realisation in the comments to Artem's answer.

I did see somewhere, probably in the manual you can in fact return a type that can be cast from the channel that feeds into the aggregator.

With that understanding I could in fact return Object, and cast back up to the required type for use in the logging object I would use either subsequent to or as part of the aggregator.