How do I delete a Kafka Consumer Group to reset of

2019-02-22 06:54发布

I want to delete a Kakfa consumer group so that when the application creates a consumer and subscribes to a topic it can start at the beginning of the topic data.

This is with a single node development vm using the current latest Confluent Platform 3.1.2 which uses Kafka 0.10.1.1.

I try the normal syntax:

sudo /usr/bin/kafka-consumer-groups --new-consumer --bootstrap-server localhost:9092 --delete --group my_consumer_group

I get the error:

Option [delete] is only valid with [zookeeper]. Note that there's no need to delete group metadata for the new consumer as the group is deleted when the last committed offset for that group expires.

If I try the zookeeper variant:

sudo /usr/bin/kafka-consumer-groups --zookeeper localhost:2181 --delete --group my_consumer_group

I get:

Delete for group my_consumer_group failed because group does not exist.

If I list using the "old" consumer, I do not see my consumer group (or any other consumer groups)

sudo /usr/bin/kafka-consumer-groups --zookeeper localhost:2181 --list

If I list using the "new" consumer, I can see my consumer group but apparently I can't delete it:

sudo /usr/bin/kafka-consumer-groups --new-consumer --bootstrap-server localhost:9092 --list

4条回答
我只想做你的唯一
2楼-- · 2019-02-22 07:16

If you use Java client, you can first get the beginning offset.

TopicPartition partition = new TopicPartition("YOUR_TOPIC", YOUR_PARTITION);
Map<TopicPartition, Long> map = consumer.beginningOffsets(Collections.singleton(partition));

And the offset that consumer using to start processing, (if not delete the consumer group).

Long committedOffset = consumer.committed(partition).offset();

Now, if you think start from committedOffset is ok, just poll records. if you want the beginning offset, consumer.seek(partition, map.get(partition));

查看更多
做自己的国王
3楼-- · 2019-02-22 07:24

In Kafka 0.11 (or Confluent 3.3) you can reset the offsets of any existing consumer group without having to delete the topic. In fact you can change the offsets to any absolute offset value or timestamp or any relative position as well.

These new functions are all added with the new --reset-offsets flag on the kafka-consumer-groups command line tool.

See KIP-122 details here https://cwiki.apache.org/confluence/display/KAFKA/KIP-122%3A+Add+Reset+Consumer+Group+Offsets+tooling

查看更多
放我归山
4楼-- · 2019-02-22 07:25

This can be done with Kafka 1.1.x. From the documentation:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

查看更多
爷、活的狠高调
5楼-- · 2019-02-22 07:30

Upgrading to the just released Confluent Platform 3.2 with Kafka 0.10.2 solved my underlying issue. When I delete a topic, offset information is now correctly reset. So when I create a topic with the same name, consumers start from the beginning of the new data.

I still can't delete new style consumer groups with the kafka-consumer-groups tool, but my underlying issue is solved.

Before Kafka 0.10.2, there were hacks, but no clean solution to this issue.

查看更多
登录 后发表回答