Timeout waiting for connection from pool while pol

2019-08-28 03:36发布

问题:

I am working on a backend service which polls S3 bucket periodically using spring aws integration and processes the polled object from S3. Below is the implementation for it

@Configuration
@EnableIntegration
@IntegrationComponentScan
@EnableAsync
public class S3PollerConfiguration {

    //private static final Logger log = (Logger) LoggerFactory.getLogger(S3PollerConfiguration.class);

    @Value("${amazonProperties.bucketName}")
    private String bucketName;

    @Bean
    @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "5"))
    public MessageSource<InputStream> s3InboundStreamingMessageSource() {    
        S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
        messageSource.setRemoteDirectory(bucketName);   
        return messageSource;
    }

    @Bean
    public S3RemoteFileTemplate template() {
        return new S3RemoteFileTemplate(new S3SessionFactory(thumbnailGeneratorService.getImagesS3Client()));
    }

    @Bean
    public PollableChannel s3FilesChannel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow fileReadingFlow() throws IOException {
        return IntegrationFlows
                .from(s3InboundStreamingMessageSource(),
                        e -> e.poller(p -> p.fixedDelay(10, TimeUnit.SECONDS)))
                .handle(Message.class, (payload, header) -> processS3Object(payload.getHeaders(), payload.getPayload()))
                .get();
    }
}

I am getting the messages from S3 on object upload and I am able to process it using the input stream received as part of message payload. But the problem I face here is that I get 'Time out waiting for connection from pool' exception after receiving few messages

2019-01-06 02:19:06.156 ERROR 11322 --- [ask-scheduler-5] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:445)
    at org.springframework.integration.file.remote.RemoteFileTemplate.list(RemoteFileTemplate.java:405)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.listFiles(AbstractRemoteFileStreamingMessageSource.java:194)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.poll(AbstractRemoteFileStreamingMessageSource.java:180)
    at org.springframework.integration.aws.inbound.S3StreamingMessageSource.poll(S3StreamingMessageSource.java:70)
    at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:153)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:155)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:236)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:250)

I know that the issue is related to not closing the opened S3Object like stated here https://github.com/aws/aws-sdk-java/issues/1405 so I have implemented closing the input stream of the S3Object received as part of message payload. But that does not solve the issue and I keep getting the exceptions. Can someone help me to fix this issue ?

回答1:

Your problem that you still mix Messaging Annotations declarations with Java DSL in your configuration.

Looks like in the fileReadingFlow you close those InputStreams in your code processS3Object() method, but you do nothing with InputStreams produced by the @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "5")). Why do you have it in fist place at all? What makes you to keep that code if you don't use it?

This S3StreamingMessageSource is polled all the time twice: by the @InboundChannelAdapter and IntegrationFlows.from().

You just have to remove that @InboundChannelAdapter from the S3StreamingMessageSource bean definition and that's all.

Please, read more Reference Manual to determine the reason of such an annotation and how you don't need it when you use Java DSL:

https://docs.spring.io/spring-integration/reference/html/configuration.html#_using_the_literal_inboundchanneladapter_literal_annotation

https://docs.spring.io/spring-integration/reference/html/java-dsl.html#java-dsl-inbound-adapters