An aggregator that can release when all records ar

2019-08-13 23:38发布

问题:

I am building a system with Spring Integration that processes all lines in a file as records. Because some of the String records are malformed I have multiple paths through the application via a Splitter and Aggregator combination (I'm building the Aggregator as we speak).

Further, some of the records are so malformed that they are effectively errors. However I have a requirement that all records must be processed therefore I must identify and log gross malformation errors separately and finish processing the file. In other words, I can not fail to process the file but instead must only log errors.


Aggregator
I intend to do achieve the goal of processing grossly malformed records by modifying the headers on the incoming message and passing the message on-ward to the Aggregator which can search for the existence of such a parameter. I'll effectively be hand coding in some error handling situations to my processors and aggregator.

My Release Strategy for the Aggregator will be when all messages are processed.


Code Extract
This code comes from a blog entry by Matt Vickery. He constructs an entirely new message (using MessageBuilder and transferring headers) whereas I will just add something to the Message headers. He includes this code in a gateway which subsequently transfers the Message onto the Aggregator.

public Message<AvsResponse> service(Message<AvsRequest> message) {

    Assert.notNull(message, MISSING_MANDATORY_ARG);
    Assert.notNull(message.getPayload(), MISSING_MANDATORY_ARG);

    MessageHeaders requestMessageHeaders = message.getHeaders();
    Message<AvsResponse> responseMessage = null;
    try {
        logger.debug("Entering AVS Gateway");
        responseMessage = avsGateway.send(message);
        if (responseMessage == null)
            responseMessage = buildNewResponse(requestMessageHeaders,
                    AvsResponseType.NULL_RESULT);
        logger.debug("Exited AVS Gateway");

        return responseMessage;
    } 
    catch (Exception e) {
        return buildNewResponse(responseMessage, requestMessageHeaders,
                AvsResponseType.EXCEPTION_RESULT, e);
    }
}


Confusion (...at least, that which I know about)
My questions are as follows:

  • When I have such a release strategy (all messages processed), is that the best way to ensure all messages get through to the Aggregator?
  • When using an Aggregator it seems like in practical cases, it would be very common to need access to the Message in some previous step, as opposed to just passing and processing simple POJOs. Would that be true or is there something I should be doing to simplify my design so I can avoid Message

I came across a blog entry by Matt Vickery showing how he achieves what seems to be similar with an Aggregator. I'm using his work as a guide.

P.S. Per Artem Bilan's advice, I'm avoiding creating my own messages and letting SI turn them into Messages

回答1:

There is no difference for Aggregator if payload is valid or not. Its general purpose is to build a List (by default) of payloads to one Message. And it does it via some sequenceDetails from MessageHeaders. It is first.

If you use Splitter, it is responsible to enrich each produced Message with default sequenceDetails. So, if you have this configuration:

<splitter/>

<aggregator/>

And if your inbound payload is List, you end up with List after aggregator as well.

I assume, that your Splitter just produces String payloads from File lines.

Then you pass each Message to some service/transformer.

The result of that you may pass to the Aggregator.

But as you say some of payloads are not valid and your processor fails with an Exception.

So, how about just try...catch within that POJO method and return some payload with error indicator, e.g. simple String "Oops!".

As I described before: the result of POJO method will be pushed to payload of the Message by Framework. And what is magic, that sequenceDetails will be there in the MessageHeaders too.

I don't see reason to write some custom ReleaseStrategy for this task, or even any other Aggregator's strategies...

Let me know, what you don't understand.

UPDATE

To add some error-indicator to message headers and don't throw Exception, it really will be simpler to build a new Message from code, not via some error-channel flow:

try {
   return [GOOD_RESULT];
}
catch(Exception e) {
   return MessageBuilder.withPayload(payload).setHeader("ERROR", e.getMessage()).build();
}

But in this case you should use <service-activator> instead of <transformer>, because the last one doesn't copy headers from inbound Message. And you really need them - setHeader for aggregator.