I am getting exception while consuming the messages from kafka.
org.springframework.messaging.MessagingException: Consuming from Kafka failed; nested exception is java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Iterator is in failed state
I have one consumer in the application context with one outbound adapter.
Consumer configuration in application context
<int-kafka:consumer-context id="consumerContext" consumer-timeout="4000" zookeeper-connect="zookeeperConnect">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration group-id="GR1"
value-decoder="valueDecoder" key-decoder="valueDecoder" max-messages="1000">
<int-kafka:topic-filter pattern="SOME_TOPIC" streams="13"/>
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
and one outbound channel adapter
<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
kafka-consumer-context-ref="consumerContext" auto-startup="true" channel="kafka" group-id="GR1">
<int:poller fixed-delay="100" time-unit="MILLISECONDS" />
</int-kafka:inbound-channel-adapter>
I am deploying my application on glassfish 3 server
When consumer starts to work i see following exception in application logs.
2015-01-20 12:51:48.218 UTC || ERROR || [task-scheduler-3 ] || [ErrorHandler] - Error processing message Consuming from Kafka failed; nested exception is java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Iterator is in failed state org.springframework.messaging.MessagingException: Consuming from Kafka failed; nested exception is java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Iterator is in failed state
at org.springframework.integration.kafka.support.ConsumerConfiguration.executeTasks(ConsumerConfiguration.java:110) ~[spring-integration-kafka-1.0.0.M1.jar:?]
at org.springframework.integration.kafka.support.ConsumerConfiguration.receive(ConsumerConfiguration.java:86) ~[spring-integration-kafka-1.0.0.M1.jar:?]
at org.springframework.integration.kafka.support.KafkaConsumerContext.receive(KafkaConsumerContext.java:56) ~[spring-integration-kafka-1.0.0.M1.jar:?]
at org.springframework.integration.kafka.inbound.KafkaHighLevelConsumerMessageSource.receive(KafkaHighLevelConsumerMessageSource.java:41) ~[spring-integration-kafka-1.0.0.M1.jar:?]
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:124) ~[spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:192) ~[spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:55) ~[spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:149) ~[spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146) ~[spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298) ~[spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52) [spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) [spring-core-4.1.1.RELEASE.jar:4.1.1.RELEASE]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49) [spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292) [spring-integration-core-4.0.4.RELEASE.jar:?]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-4.1.1.RELEASE.jar:4.1.1.RELEASE]
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) [spring-context-4.1.1.RELEASE.jar:4.1.1.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [?:1.7.0_71]
at java.util.concurrent.FutureTask.run(FutureTask.java:262) [?:1.7.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) [?:1.7.0_71]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) [?:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [?:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [?:1.7.0_71]
at java.lang.Thread.run(Thread.java:745) [?:1.7.0_71] Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Iterator is in failed state
at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.7.0_71]
at java.util.concurrent.FutureTask.get(FutureTask.java:188) ~[?:1.7.0_71]
at org.springframework.integration.kafka.support.ConsumerConfiguration.executeTasks(ConsumerConfiguration.java:97) ~[spring-integration-kafka-1.0.0.M1.jar:?]
... 22 more Caused by: java.lang.IllegalStateException: Iterator is in failed state
at kafka.utils.IteratorTemplate.hasNext(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at kafka.utils.IteratorTemplate.next(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at kafka.consumer.ConsumerIterator.next(Unknown Source) ~[kafka_2.10-0.8.0.jar:0.8.0]
at org.springframework.integration.kafka.support.ConsumerConfiguration$1.call(ConsumerConfiguration.java:67) ~[spring-integration-kafka-1.0.0.M1.jar:?]
at org.springframework.integration.kafka.support.ConsumerConfiguration$1.call(ConsumerConfiguration.java:61) ~[spring-integration-kafka-1.0.0.M1.jar:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:262) ~[?:1.7.0_71]
... 3 more 2015-01-20 12:51:48.218 UTC || ERROR || [task-scheduler-3 ] || [LoggingHandler] - org.springframework.messaging.MessagingException: Consuming from Kafka failed; nested exception is java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Iterator is in failed state
at org.springframework.integration.kafka.support.ConsumerConfiguration.executeTasks(ConsumerConfiguration.java:110)
at org.springframework.integration.kafka.support.ConsumerConfiguration.receive(ConsumerConfiguration.java:86)
at org.springframework.integration.kafka.support.KafkaConsumerContext.receive(KafkaConsumerContext.java:56)
at org.springframework.integration.kafka.inbound.KafkaHighLevelConsumerMessageSource.receive(KafkaHighLevelConsumerMessageSource.java:41)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:124)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:192)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:149)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:292)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Iterator is in failed state
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at org.springframework.integration.kafka.support.ConsumerConfiguration.executeTasks(ConsumerConfiguration.java:97)
... 22 more Caused by: java.lang.IllegalStateException: Iterator is in failed state
at kafka.utils.IteratorTemplate.hasNext(Unknown Source)
at kafka.utils.IteratorTemplate.next(Unknown Source)
at kafka.consumer.ConsumerIterator.next(Unknown Source)
at org.springframework.integration.kafka.support.ConsumerConfiguration$1.call(ConsumerConfiguration.java:67)
at org.springframework.integration.kafka.support.ConsumerConfiguration$1.call(ConsumerConfiguration.java:61)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
... 3 more
I am using follwoing version of kafka and zookeeper.
kafka_2.9.2-0.8.1.1
zookeeper-3.4.6
http://www.springframework.org/schema/integration/kafka/spring- integration-kafka-1.0.xsd
glassfish3
It was working fine but suddenly it started failing consistently.
Other consumer in other Spring XD application reading from the same kafka server and zookeper are working fine.
Group of other consumer running in other application is completely different.