We have a micro-services that produces and consumes messages from Kafka using spring-boot and spring-cloud-stream.
versions:
spring-boot: 1.5.8.RELEASE
spring-cloud-stream: Ditmars.RELEASE
Kafka server: kafka_2.11-1.0.0
EDIT: We are working in a Kubernetes environment using StatefulSets cluster of 3 Kafka nodes and a cluster of 3 Zookeeper nodes.
We experienced several occurrences of old messages that are reprocessed when those messages where already processed few days ago.
Several notes:
- Before that happens the following logs were printed (there are more similar lines this is just a summary)
Revoking previously assigned partitions [] for group enrollment-service
Discovered coordinator dev-kafka-1.kube1.iaas.watercorp.com:9092 (id: 2147483646 rack: null)
Successfully joined group enrollment-service with generation 320
- The above-mentioned incidents of revoking and reassigning of partitions happens every few hours. And just in few of those incidents old messages are re-consumed. In most cases the reassigning doesn't triggers message consumption.
- The messages are from different partitions.
- There are more than 1 message per partition that is being reprocessed.
application.yml:
spring: cloud: stream: kafka: binder: brokers: kafka defaultBrokerPort: 9092 zkNodes: zookeeper defaultZkPort: 2181 minPartitionCount: 2 replicationFactor: 1 autoCreateTopics: true autoAddPartitions: true headers: type,message_id requiredAcks: 1 configuration: "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol bindings: user-enrollment-input: consumer: autoRebalanceEnabled: true autoCommitOnError: true enableDlq: true user-input: consumer: autoRebalanceEnabled: true autoCommitOnError: true enableDlq: true enrollment-mail-output: producer: sync: true configuration: retries: 10000 enroll-users-output: producer: sync: true configuration: retries: 10000 default: binder: kafka contentType: application/json group: enrollment-service consumer: maxAttempts: 1 producer: partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor bindings: user-enrollment-input: destination: enroll-users consumer: concurrency: 10 partitioned: true user-input: destination: user consumer: concurrency: 5 partitioned: true enrollment-mail-output: destination: send-enrollment-mail producer: partitionCount: 10 enroll-users-output: destination: enroll-users producer: partitionCount: 10
Is there any configuration that I might be missing? What can cause this behavior?