Kafka messages are reprocessed

2019-08-30 23:52发布

问题:

We have a micro-services that produces and consumes messages from Kafka using spring-boot and spring-cloud-stream.
versions:
spring-boot: 1.5.8.RELEASE
spring-cloud-stream: Ditmars.RELEASE
Kafka server: kafka_2.11-1.0.0

EDIT: We are working in a Kubernetes environment using StatefulSets cluster of 3 Kafka nodes and a cluster of 3 Zookeeper nodes.

We experienced several occurrences of old messages that are reprocessed when those messages where already processed few days ago.
Several notes:

  1. Before that happens the following logs were printed (there are more similar lines this is just a summary)

Revoking previously assigned partitions [] for group enrollment-service
Discovered coordinator dev-kafka-1.kube1.iaas.watercorp.com:9092 (id: 2147483646 rack: null)
Successfully joined group enrollment-service with generation 320

  1. The above-mentioned incidents of revoking and reassigning of partitions happens every few hours. And just in few of those incidents old messages are re-consumed. In most cases the reassigning doesn't triggers message consumption.
  2. The messages are from different partitions.
  3. There are more than 1 message per partition that is being reprocessed.

application.yml:

spring:
  cloud:
      stream:
        kafka:
          binder:
            brokers: kafka
            defaultBrokerPort: 9092
            zkNodes: zookeeper
            defaultZkPort: 2181
            minPartitionCount: 2
            replicationFactor: 1
            autoCreateTopics: true
            autoAddPartitions: true
            headers: type,message_id
            requiredAcks: 1
            configuration:
              "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
          bindings:
            user-enrollment-input:
              consumer:
                autoRebalanceEnabled: true
                autoCommitOnError: true
                enableDlq: true
            user-input:
              consumer:
                autoRebalanceEnabled: true
                autoCommitOnError: true
                enableDlq: true
            enrollment-mail-output:
              producer:
                sync: true
                configuration:
                  retries: 10000
            enroll-users-output:
              producer:
                sync: true
                configuration:
                  retries: 10000
        default:
          binder: kafka
          contentType: application/json
          group: enrollment-service
          consumer:
            maxAttempts: 1
          producer:
            partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor
        bindings:
          user-enrollment-input:
            destination: enroll-users
            consumer:
              concurrency: 10
              partitioned: true
          user-input:
            destination: user
            consumer:
              concurrency: 5
              partitioned: true
          enrollment-mail-output:
            destination: send-enrollment-mail
            producer:
              partitionCount: 10
          enroll-users-output:
            destination: enroll-users
            producer:
              partitionCount: 10

Is there any configuration that I might be missing? What can cause this behavior?

回答1:

So the actual problem is the one that is described in the following ticket: https://issues.apache.org/jira/browse/KAFKA-3806. Using the suggested workaround fixed it.