Python kafka consumer group id issue

2019-06-10 04:29发布

AFAIK,

The concept of partitions and (consumer) groups in kafka was introduced to implement parallelism. I am working with kafka through python. I have a certain topic, which has (say) 2 partitions. This means, if I start a consumer group with 2 consumers in it, they will be mapped(subscribed) to different partitions.

But, using kafka library in python, I came across a weird issue. I started 2 consumers with essentially the same group-ids, and started the threads for them to consume messages.

But, every message in the kafka-stream is being consumed by both of them !! This seems ridiculous to me, and even conceptually incorrect. Is there anyway I can map the consumers to certain (distinct) partitions manually (if they are not mapped to different partitions automatically)?

Here is the code:

from kafka import KafkaConsumer
import thread

def con1(consumer):
    for msg in consumer:
        print msg

consumer1 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])
consumer2 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])

thread.start_new_thread(con1, (consumer1,))
thread.start_new_thread(con1, (consumer2,))

Here is the output for some messages that I produced using kafka-console-producer:

ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')

while expected was one of each. BTW, this topic k-test has 2 partitions.

3条回答
相关推荐>>
2楼-- · 2019-06-10 04:42
from kafka import KafkaConsumer
from kafka import TopicPartition

TOPIC = "k-test"
PARTITION_0 = 0
PARTITION_1 = 1

consumer_0 = KafkaConsumer(
    TOPIC, group_id='grp1', bootstrap_servers=['10.50.23.120:9092']
)
consumer_1 = KafkaConsumer(
    TOPIC, group_id='grp1', bootstrap_servers=['10.50.23.120:9092']
)
topic_partition_0 = TopicPartition(TOPIC, PARTITION_0)
topic_partition_1 = TopicPartition(TOPIC, PARTITION_1)
# format: topic, partition
consumer_0.assign([topic_partition_0])
consumer_1.assign([topic_partition_1])

assign() might be work for you, but once you use it, kafka will not balance consumers automatically when there is consumer stop working.

查看更多
The star\"
3楼-- · 2019-06-10 04:50

Try running the bin/kafka-consumer-groups.sh command line tool to verify if the Python Kafka client you are using supports proper consumer group management. If both consumers are indeed in the same group then they should get messages from mutually exclusive partitions.

查看更多
淡お忘
4楼-- · 2019-06-10 04:55

I guess you are working with Kafka 0.8 or lower version, which do not support this feature based on the documents:

... Some features will only be enabled on newer brokers, however; for example, fully coordinated consumer groups -- i.e., dynamic partition assignment to multiple consumers in the same group -- requires use of 0.9+ kafka brokers ...

查看更多
登录 后发表回答