Is there a way in Spring Integration Java DSL to modify an existing message header?
I am reimplementing a download retry mechanism using SI Java DSL, and want to increment a message header holding the download attempts when a failure occurs, before routing the message based on the number of attempts compared to a limit.
I have the routing working nicely based upon the RouterTests included with SI. With HeaderEnrichers I can easily add a header, but I can't see a way to modify an existing header.
Thanks
/**
* Unit test of {@link RetryRouter}.
*
* Based on {@link RouterTests#testMethodInvokingRouter2()}.
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext
public class RetryRouterTests {
/** Failed download attempts are sent to this channel to be routed by {@link ContextConfiguration#failedDownloadRouting( ) } */
@Autowired
@Qualifier("failed")
private MessageChannel failed;
/** Retry attempts for failed downloads are sent to this channel by {@link ContextConfiguration#failedDownloadRouting( ) }*/
@Autowired
@Qualifier("retry-channel")
private PollableChannel retryChannel;
/** Failed download attempts which will not be retried, are sent to this channel by {@link ContextConfiguration#failedDownloadRouting( ) }*/
@Autowired
@Qualifier("exhausted-channel")
private PollableChannel exhaustedChannel;
/**
* Unit test of {@link ContextConfiguration#failedDownloadRouting( ) } and {@link RetryRouter}.
*/
@Test
public void retryRouting() {
final int limit = 2;
for ( int attempt = 0 ; attempt <= limit + 1 ; attempt++ ){
this.failed.send( failed( attempt, limit) );
if ( attempt < limit){
assertEquals( payload( attempt ) , this.retryChannel.receive( ).getPayload( ) );
assertNull(this.exhaustedChannel.receive( 0 ) );
}else{
assertEquals( payload( attempt ) , this.exhaustedChannel.receive( ).getPayload( ) );
assertNotNull( this.exhaustedChannel.receive( ).getPayload( ) );
}
}
}
private Message<String> failed( int retry , int limit ) {
return MessageBuilder
.withPayload( payload( retry ) )
.setHeader("retries", new AtomicInteger( retry ) )
.setHeader("limit", limit)
.build();
}
private String payload (int retry){
return "retry attempt "+retry;
}
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@Bean
public MessageChannel loggerChannel() {
return MessageChannels.direct().get();
}
@Bean(name = "retry-channel")
public MessageChannel retryChannel() {
return new QueueChannel();
}
@Bean(name = "exhausted-channel")
public MessageChannel exhaustedChannel() {
return new QueueChannel();
}
/**
* Decides if a failed download attempt can be retried or not, based upon the number of attempts already made
* and the limit to the number of attempts that may be made. Logic is in {@link RetryRouter}.
* <p>
* The number of download attempts already made is provided as a header {@link #attempts} from the connector doing the download,
* and the limit to the number of attempts is another header {@link #retryLimit} which is originally setup as
* a header by {@link DownloadDispatcher} from retry configuration.
* <p>
* Messages for failed download attempts are listened to on channel {@link #failed}.
* Retry attempts are routed to {@link #retryChannel()}
*
* @return
*/
@Bean
public IntegrationFlow failedDownloadRouting() {
return IntegrationFlows.from( "failed" )
.handle( "headers.retries.getAndIncrement()" )
.handle( logMessage ( "failed" ) )
.route(new RetryRouter())
.get();
}
/**
* Decides if a failed download attempt can be retried or not, based upon the number of attempts already made
* and the limit to the number of attempts that may be made.
* <p>
*/
private static class RetryRouter {
@Router
public String routeByHeader(@Header("retries") AtomicInteger attempts , @Header("limit") Integer limit) {
if (attempts.intValue() < limit.intValue()){
return "retry-channel";
}
return "exhausted-channel";
}
/** This method is not used but is required by Spring Integration otherwise application context doesn't load because of
* {@code Caused by: java.lang.IllegalArgumentException: Target object of type
* [class org.springframework.integration.dsl.test.routers.RetryRouterTests$RetryRouter] has no eligible methods for handling Messages.}
*
* @throws UnsupportedOperationException if called
*/
@SuppressWarnings("unused")
public String routeMessage(Message<?> message) {
throw new UnsupportedOperationException( "should not be used." );
}
}
}
The following code does work
There is a way do what you need without modifications for headers:
Then when you need to increment it you should do just this:
and this handle as a first one-way first subscriber on the publish-subscriber-channel for the retry service.
UPDATE
Thank you for sharing the config on the matter: now I an issue and you misundestanding. The solution must be like this:
Where we have a
PublishSubscribeChannel
, the.subscribe()
in the sub-flow is a first subscriber for the first one, and.handle( logMessage ( "failed" ) )
in the main flow is a second subscriber. The last one won't be called until the finish of work of the first one subscriber.See the Spring Integration Reference Manual and Java DSL Manual for more info.