how to increment a message header

2019-07-15 05:02发布

问题:

Is there a way in Spring Integration Java DSL to modify an existing message header?

I am reimplementing a download retry mechanism using SI Java DSL, and want to increment a message header holding the download attempts when a failure occurs, before routing the message based on the number of attempts compared to a limit.

I have the routing working nicely based upon the RouterTests included with SI. With HeaderEnrichers I can easily add a header, but I can't see a way to modify an existing header.

Thanks

/**
 * Unit test of {@link RetryRouter}.
 * 
 * Based on {@link RouterTests#testMethodInvokingRouter2()}.
 */
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext
public class RetryRouterTests {

    /** Failed download attempts are sent to this channel to be routed by {@link ContextConfiguration#failedDownloadRouting( ) } */
    @Autowired
    @Qualifier("failed")
    private MessageChannel failed;

    /** Retry attempts for failed downloads are sent to this channel by {@link ContextConfiguration#failedDownloadRouting( ) }*/
    @Autowired
    @Qualifier("retry-channel")
    private PollableChannel retryChannel;

    /** Failed download attempts which will not be retried, are sent to this channel by {@link ContextConfiguration#failedDownloadRouting( ) }*/
    @Autowired
    @Qualifier("exhausted-channel")
    private PollableChannel exhaustedChannel;

    /**
     * Unit test of {@link ContextConfiguration#failedDownloadRouting( ) } and {@link RetryRouter}.
     */
    @Test
    public void retryRouting() {

        final int limit = 2;

        for ( int attempt = 0 ; attempt <= limit + 1 ; attempt++ ){

            this.failed.send( failed( attempt, limit) );

            if ( attempt < limit){

                assertEquals( payload( attempt ) , this.retryChannel.receive( ).getPayload( ) );
                assertNull(this.exhaustedChannel.receive( 0 ) );

            }else{

                assertEquals( payload( attempt ) , this.exhaustedChannel.receive( ).getPayload( ) );
                assertNotNull( this.exhaustedChannel.receive( ).getPayload( ) );
            }
        }

    }

    private Message<String> failed( int retry , int limit ) {

        return MessageBuilder
            .withPayload(  payload( retry ) )
            .setHeader("retries", new AtomicInteger( retry ) )
            .setHeader("limit", limit)
            .build();
    }

    private String payload (int retry){
        return "retry attempt "+retry;
    }


    @Configuration
    @EnableIntegration
    public static class ContextConfiguration {

        @Bean
        public MessageChannel loggerChannel() {
            return MessageChannels.direct().get();
        }

        @Bean(name = "retry-channel")
        public MessageChannel retryChannel() {
            return new QueueChannel();
        }

        @Bean(name = "exhausted-channel")
        public MessageChannel exhaustedChannel() {
            return new QueueChannel();
        }


        /**
         * Decides if a failed download attempt can be retried or not, based upon the number of attempts already made 
         * and the limit to the number of attempts that may be made. Logic is in {@link RetryRouter}.
         * <p>
         * The number of download attempts already made is provided as a header {@link #attempts} from the connector doing the download, 
         * and the limit to the number of attempts is another header {@link #retryLimit} which is originally setup as
         * a header by {@link DownloadDispatcher} from retry configuration.
         * <p>
         * Messages for failed download attempts are listened to on channel {@link #failed}. 
         * Retry attempts are routed to {@link #retryChannel()}
         *  
         * @return
         */
        @Bean
        public IntegrationFlow failedDownloadRouting() {

            return IntegrationFlows.from( "failed" )

                .handle( "headers.retries.getAndIncrement()" )
                .handle( logMessage ( "failed" ) )
                .route(new RetryRouter())
                .get();
        }

        /**
         * Decides if a failed download attempt can be retried or not, based upon the number of attempts already made 
         * and the limit to the number of attempts that may be made. 
         * <p>
         */
        private static class RetryRouter {

            @Router
            public String routeByHeader(@Header("retries") AtomicInteger attempts , @Header("limit") Integer limit) {

                if (attempts.intValue() < limit.intValue()){
                    return "retry-channel";
                }
                return "exhausted-channel";
            }

            /** This method is not used but is required by Spring Integration otherwise application context doesn't load because of
             * {@code Caused by: java.lang.IllegalArgumentException: Target object of type 
             * [class org.springframework.integration.dsl.test.routers.RetryRouterTests$RetryRouter] has no eligible methods for handling Messages.}
             * 
             * @throws UnsupportedOperationException if called
             */
            @SuppressWarnings("unused")
            public String routeMessage(Message<?> message) {

                throw new UnsupportedOperationException( "should not be used." );
            }
        }
    }

回答1:

There is a way do what you need without modifications for headers:

.enrichHeaders(h -> h.header("downloadRetries", new AtomicInteger()))

Then when you need to increment it you should do just this:

.handle(m -> m.getHeaders().get("downloadRetries", AtomicInteger.class).getAndIncrement())

and this handle as a first one-way first subscriber on the publish-subscriber-channel for the retry service.

UPDATE

is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow.

Thank you for sharing the config on the matter: now I an issue and you misundestanding. The solution must be like this:

        return IntegrationFlows.from( "failed" )

            .publishSubscribeChannel(s -> s
                .subscribe(f -> f
                        .handle(m -> m.getHeaders().get("downloadRetries",
                                  AtomicInteger.class).getAndIncrement()))
            .handle( logMessage ( "failed" ) )
            .route(new RetryRouter())
            .get();
    }

Where we have a PublishSubscribeChannel, the .subscribe() in the sub-flow is a first subscriber for the first one, and .handle( logMessage ( "failed" ) ) in the main flow is a second subscriber. The last one won't be called until the finish of work of the first one subscriber.

See the Spring Integration Reference Manual and Java DSL Manual for more info.



回答2:

The following code does work

.handle( new GenericHandler<Message<String>>() {

    @Override
    public Object handle( Message<String> payload , Map<String,Object> headers ) {

        ((AtomicInteger)headers.get( "retries" )).getAndIncrement();
        return payload;
    }})