Kafka consumer losing state of messages after shut

2019-09-05 14:52发布

问题:

Thanks for taking time to answer the question. I am using kafka with a python consumer. Everything works great when the consumer is up and running and messages get pushed to kafka which are then read by the consumer.

However, if the consumer goes down for whatever reason, when it comes back up, it only reads the NEW messages that are posted to kafka after the consumer is back up. The messages between shutdown-poweron are lost, that is, the consumer does not read these messages after it comes back up.

consumer = KafkaConsumer(..)

is what I use to create the consumer.

回答1:

What client are you using? Maybe it is necessary to set the start offset for the consumer. Have a look at the seek() function and auto-commit setting. May my codes help, but maybe we use different consumer classes (mine:http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html):

def connect(self):
        '''Initialize Kafka Client and Consumer.'''
        try:
            print "Try to init KafkaClient:", self.Brokers
            self.__kafka_client = KafkaClient( self.Brokers )


            print "Try to init Kafka Consumer."
            self.__consumer = SimpleConsumer(
                    self.__kafka_client,
                    self.GroupID,
                    self.Topic,
                    auto_commit = True,
                    partitions=self.Partitions,
                    auto_commit_every_n = 100,
                    auto_commit_every_t=5000,
                    fetch_size_bytes=4096,
                    buffer_size=4096,
                    max_buffer_size=32768,
                    iter_timeout=None,
                    auto_offset_reset='largest' )


            print "Set the starting offset."
            self.__consumer.seek(0, self.OffsetMode)


self.__consumer.seek(0, 0) =>start reading from the beginning of the queue.
self.__consumer.seek(0, 1) =>start reading from current offset.
self.__consumer.seek(0, 2) =>skip all the pending messages and start reading only new messages (** maybeyour case**).