I'm looking for a way to delete (completely remove) consumed records from a Kafka topic. I know there are several ways of doing this, by either changing the retention time for the topic or removing the Kafka-logs folder, for example. But what I'm looking for is a way to delete a certain amount of records for a topic using the Java API, if that is possible.
I've tried testing the AdminClient API, specifically the adminclient.deleteRecords(recordsToDelete) method. But if I'm not mistaken, that method only Changes the offsets in the topic, not actually deleting said records from the hard drive.
Is there a Java API that does actually remove the records from the hard drive?
No Kafka does not provide the feature of deleting specific offset in a topics and No APIs are available for this.
Kafka doesn't support removing records from topics. The way it works is by building a buffer of messages that grow as message pushed to it. While the client that reads the messages basically only holding an offset to that buffer. So clients in Kafka are basically in "read-only" mode and can't alter the buffer. Think about a case when several different clients (different client-groups) reading the same topic and each saves its own offset. what would happen if someone will start deleting messages from the buffer where the offset is set to.
Kafka topics are immutable, meaning you can only add new messages to them. There is not delete per se.
However, to avoid "running out of disk", Kafka provides two concepts for keeping the size of topics down: retention policy and compaction.
Retention If you have a topic where you don't need the data around forever, you just set a retention policy of however long you need to have the data around, i.e. 72 hours. Kafka will then automatically delete messages older than 72 hours for you.
Compaction If you DO need data to stay around forever, or for a long time at least, but you only need the latest value, then you can set the topic to be compacted. This wil automatically remove older messages as soon as a new message is added with a key that already exists.
A central part of planning your Kafka architecture is to think through HOW your data is stored in a topic. If, for example, you push updates to a customer record in a kafka topic, let's say that customer's last login date (very contrived example...), then you're only interested in the LAST entry (since all previous entries are no longer the "last" login). If the partition key for this was the customer ID, and log compaction was enabled, then as soon as the user logs in and the kafka topic receives this event, any other previous message with the same partition key (customer ID) would be automatically removed from the topic.
This got me confused a bit also at first, why the included bin/kafka-delete-records.sh was able to delete but I couldn't using Java API
The missing piece is you need to call KafkaFuture.get() since deleteRecords returns a map of Futures
Here's the code
In this code, you need to call
entry.getValue().get().lowWatermark()