I am working with a simple Kafka based project using Spring Integration and we require that when the Broker is down, messages will pass into the ErrorChannel and we can deal with them /save as 'dead-letters' etc.
What we are getting is a countless run of Exceptions:
2017-09-19 17:14:19.651 DEBUG 12171 --- [ad | producer-1] o.apache.kafka.common.network.Selector : Connection with localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_131]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_131]
But the error channel is not referenced :-/
I have tried to hook it up, but to no avail - here is part of my app-context:
<bean id="channelExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="1"/>
<property name="maxPoolSize" value="10"/>
<property name="queueCapacity" value="1000"/>
</bean>
<int:channel id="producingChannel" >
<int:dispatcher task-executor="channelExecutor" />
</int:channel>
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="kafkaTemplate"
auto-startup="true"
channel="producingChannel"
topic="${kafka.topic}">
</int-kafka:outbound-channel-adapter>
<int:service-activator input-channel="errorChannel" ref="errorLogger" method="logError" />
<bean id="errorLogger" class="uk.co.sainsburys.integration.service.ErrorLogger" />
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerConfigs"/> <!-- producerConfigs piece is NOT included! -->
</bean>
Sadly, I am not an expert at Spring Integration - any ideas what I am doing wrong?
Thanks for your help.