I am using a Kafka producer - consumer model in my framework. The record consumed at the consumer end is later indexed onto the elasticsearch. Here i have a use case where if the ES is down, I will have to pause the kafka consumer until the ES is up, Once it is up, I need to resume the consumer and consume the record from where I last left. I don't think this can be achieved with @KafkaListener. Can anyone please give me a solution for this? I figured out that I need to write my own KafkaListenerContainer for this, but I am not able to implement it correctly. Any help would be much appreciated.
问题:
回答1:
There are sevaral solutions possible, one simple way would be to use the KafkaConsumer API. In KafkaConsumer implementation keeps track of the position on the topic which will be retrieved with the next call to poll(...). Your problem is after you get the record from Kafka, you may be unable to insert it into Elastic Search. In this case, you have to write a routine to reset the position of the consumer, which in your case will be consumer.seek(partition, consumer.position(partition)-1). This will reset the position to the earlier position. At this point a good approach would be to pause the partition (this will enable the server to do some resource clean up) and then poll the ES (by whatever mechanism you desire). Once ES is available, call resume on the consumer and continue with your usual poll-insert cycle.
EDITED AFTER DISCUSSION
Create a spring bean with the lifecycle methods specified. In the initialization method of the bean instantiate your KafkaConsumer (retrieve the configuration of consumer from any source). From the method start a thread to interact with consumer and update ES, rest of the design is as per above. This is a single thred model. For higher throughput consider keeping the data retrieved from Kafka in small in memory queue and a dispatcher thread to take the message and give it to a pooled thread for updating ES.
回答2:
I would suggest rather pausing consumer , why can't you retry the same message again and again and commit offset once message is consumed successfully.
For Example:
Annotate your method with @Retryable
And block your method with try/catch and throw new exception in catch block.
For ListenerFactory configuration add property:
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setAckOnError(false);