spring-integration: Error handling on inbound adap

2019-08-07 00:33发布

Basics:

  • Using spring 4.1.1 with integration, boot and 1.0.0 of the DSL.
  • Multiple inbound SFTP adapters on different schedules fetching files from different vendors.
  • Each integration flow attaches headers to the message after the files are downloaded to identify the vendor source.
  • Use a MessagePublishingErrorHandler to handle exceptions.
  • Standard message flow notify an external monitoring solution when a message is successfully processed or when a message fails to complete. Uses the message headers to identify which flow failed.

Both the success and error flows work great, after we have a message. However, when we encounter an error that occurs before a message is produced (i.e. could not connect to sftp server), we don't have a chance to attach the headers.

I've been looking at the samples and documentation, and I cannot find a good way to attach the same headers from the error message produced by the MessagePublishingErrorHandler without overriding the error handler on the Poller, or creating a custom error flow for every inbound integration. What options do I have here?

This is an example of one of the configs:

        IntegrationFlows
        .from(
            Sftp.inboundAdapter(sessionFactory)
                    .autoCreateLocalDirectory(true)
                    .localDirectory(configProperties.getLocalDirectory())
                    .preserveTimestamp(true)
                    .remoteDirectory(configProperties.getRemoteDirectory()),
            c -> c.poller(Pollers
                    .fixedRate(properties.getSftpInterval(), properties.getSftpIntervalUnit())
                    .errorHandler(errorHandler)) // custom error handler?
        )
        .enrichHeaders(
                    MapBuilder
                            .with("vendorName", "vendor1")
                            .get()
        )
        .channel("fileChannelHandler")
        .get();

1条回答
淡お忘
2楼-- · 2019-08-07 01:17

Yeah - the problem is that, with polled channel adapters, if an exception is thrown in the MessageSource.receive(), there's not yet any message to enhance with custom headers.

The framework currently doesn't allow modifying the headers of the ErrorMessage itself (which has the raw exception as its payload for exceptions like this).

One work-around (aside from your custom EH) would be to put a header enricher on each error-flow to identify the source adapter.

In future, the framework could add a header to the ErrorMessage, containing the id of the channel adapter so the source can be identified in a common error flow.

Feel free to open an 'Improvement' JIRA Issue and we'll take a look.

查看更多
登录 后发表回答