AFAIK,
The concept of partitions and (consumer) groups in kafka was introduced to implement parallelism. I am working with kafka through python. I have a certain topic, which has (say) 2 partitions. This means, if I start a consumer group with 2 consumers in it, they will be mapped(subscribed) to different partitions.
But, using kafka
library in python, I came across a weird issue. I started 2 consumers with essentially the same group-ids, and started the threads for them to consume messages.
But, every message in the kafka-stream is being consumed by both of them !! This seems ridiculous to me, and even conceptually incorrect. Is there anyway I can map the consumers to certain (distinct) partitions manually (if they are not mapped to different partitions automatically)?
Here is the code:
from kafka import KafkaConsumer
import thread
def con1(consumer):
for msg in consumer:
print msg
consumer1 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])
consumer2 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])
thread.start_new_thread(con1, (consumer1,))
thread.start_new_thread(con1, (consumer2,))
Here is the output for some messages that I produced using kafka-console-producer:
ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')
while expected was one of each. BTW, this topic k-test
has 2 partitions.
assign() might be work for you, but once you use it, kafka will not balance consumers automatically when there is consumer stop working.
Try running the bin/kafka-consumer-groups.sh command line tool to verify if the Python Kafka client you are using supports proper consumer group management. If both consumers are indeed in the same group then they should get messages from mutually exclusive partitions.
I guess you are working with Kafka 0.8 or lower version, which do not support this feature based on the documents: