How do I delete/clean Kafka queued messages withou

2020-05-17 09:07发布

问题:

Is there any way to delete queue messages without deleting Kafka topics?
I want to delete queue messages when activating the consumer.

I know there are several ways like:

  1. Resetting retention time

    $ ./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic MyTopic --config retention.ms=1000

  2. Deleting kafka files

    $ rm -rf /data/kafka-logs/<topic/Partition_name>

回答1:

In 0.11 or higher you can run the bin/kafka-delete-records.sh command to mark messages for deletion.

https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh

For example, publish 100 messages

seq 100 | ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytest

then delete 90 of those 100 messages with the new kafka-delete-records.sh command line tool

./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ./offsetfile.json

where offsetfile.json contains

 {"partitions": [{"topic": "mytest", "partition": 0, "offset": 90}], "version":1 }

and then consume the messages from the beginning to verify that 90 of the 100 messages are indeed marked as deleted.

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytest --from-beginning
91
92
93
94
95
96
97
98
99
100


回答2:

To delete all messages in a specific topic, you can run kafka-delete-records.sh

For example, I have a topic called test, which has 4 partitions.

Create a Json file , for example j.json:

{

"partitions": [

    {

        "topic": "test",

        "partition": 0,

        "offset": -1

    }, {

        "topic": "test",

        "partition": 1,

        "offset": -1

    }, {

        "topic": "test",

        "partition": 2,

        "offset": -1

    }, {

        "topic": "test",

        "partition": 3,

        "offset": -1

    }

],

"version": 1

}

now delete all messages by this command :

/opt/kafka/confluent-4.1.1/bin/kafdelete-records --bootstrap-server 192.168.XX.XX:9092 --offset-json-file j.json

After executing the command, this message will be displayed

Records delete operation completed:
partition: test-0   low_watermark: 7
partition: test-1   low_watermark: 7
partition: test-2   low_watermark: 7
partition: test-3   low_watermark: 7