messages lost between Apache kafka consumer stop a

2019-08-17 02:51发布

I am new to kafka and using Apache kafka consumer to read messages from producer. But when I stop and start for certain time. All the produced messages between are lost. how to handle this scenario. I am using these properties "auto.offset.reset", "latest" and "enable.auto.commit", "false" .

This is the code I am using.Any help is appreciated.

Properties props = new Properties();
        props.put("bootstrap.servers", localhost:9092);
        props.put("group.id", "service");
        props.put("enable.auto.commit", "false"); 
        props.put("auto.offset.reset", "latest"); 
        props.put("key.deserializer", KAFKA_DESERIALIER_STRING_KEYVALUE);
        props.put("value.deserializer", KAFKA_DESERIALIER_STRING_KEYVALUE);

        @SuppressWarnings("resource")
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topicname));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {        
                JSONObject jsonObj = new JSONObject(record.value());
                JdbcUtilToUdm.insertdataintodb(args, jsonObj);   
            }
        }   

1条回答
Emotional °昔
2楼-- · 2019-08-17 03:31

You have to explicitly call consumer.commitSync() or consumer.commitAsync() since you disabled auto commit. You can do the commit synchronously or not depending on which method you need/prefer. This is how the consumer group position in the log will be persisted. You should call commit after records are processed (so probably after you have finished all inserts but before you poll again in your case).

查看更多
登录 后发表回答