我是新来卡夫卡和使用Apache卡夫卡消费者阅读生产的消息。 但是,当我停止并启动一定的时间。 所有产生的消息之间都将丢失。 如何处理这种情况。 我使用这些属性“auto.offset.reset”,“最新”和“enable.auto.commit”,“假”。
这是我using.Any帮助表示赞赏代码。
Properties props = new Properties();
props.put("bootstrap.servers", localhost:9092);
props.put("group.id", "service");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", KAFKA_DESERIALIER_STRING_KEYVALUE);
props.put("value.deserializer", KAFKA_DESERIALIER_STRING_KEYVALUE);
@SuppressWarnings("resource")
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicname));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
JSONObject jsonObj = new JSONObject(record.value());
JdbcUtilToUdm.insertdataintodb(args, jsonObj);
}
}