I was exploring the fact whether Kafka supports priority for any queue or message to process.
It seems it doesn't support any such thing. I googled and found this mail archive which supports this also:
http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201206.mbox/%3CCAOeJiJhVHsr=d6aSTihPsqWVg6vK5xYLam6yMDcd6UAUoXf-DQ@mail.gmail.com%3E
Does anyone here configured of Kafka to prioritize any topic or message?
Kafka is a fast, scalable, distributed in nature by its design, partitioned and replicated commit log service.So there is no priority on topic or message.
I also faced same problem that you have.Solution is very simple.Create topics in kafka queue,Let say:
1) high_priority_queue
2) medium_priority_queue
3) low_priority_queue
Publish high priority message in high_priority_queue and medium priority message in medium_priority_queue.
Now you can create kafka consumer and open stream for all topic.
val props = new Properties()
props.put("group.id", groupId)
props.put("zookeeper.connect", zookeeperConnect)
val config = new ConsumerConfig(props)
val connector = Consumer.create(config)
val topicWithStreamCount = Map(
"high_priority_queue" -> 1,"medium_priority_queue" -> 1,"low_priority_queue" -> 1)
val streamsMap = connector.createMessageStreams(topicWithStreamCount)
//this is scala code
You get stream of each topic.Now you can first read high_priority topic if topic does not have any message then fallback on medium_priority_queue topic. if medium_priority_queue is empty then read low_priority queue.
This trick is working fine for me.May be helpful for you!!.
you need to have a separate topics and stream them according to their priority
You can checkout priority-kafka-client for priority consumption from topics.
Basic idea is as follows (copy/pasting parts of the README):
In this context, priority is a positive integer (N) with priority levels 0 < 1 < ... < N-1
PriorityKafkaProducer (implements org.apache.kafka.clients.producer.Producer):
The implementation takes in an additional arg of priority level Future<RecordMetadata> send(int priority, ProducerRecord<K, V> record)
. This is an indication to produce record on that priority level. Future<RecordMetadata> send(int priority, ProducerRecord<K, V> record)
defaults the record production on the lowest priority level 0. For every logical topic XYZ - priority level 0 <= i < N is backed by Kafka topic XYZ-i
CapacityBurstPriorityKafkaConsumer (implements org.apache.kafka.clients.consumer.Consumer):
The implementation maintains a KafkaConsumer for every priority level 0 <= i < N. For every logical topic XYZ and logical group ID ABC - priority level 0 <= i < N consumer binds to Kafka topic XYZ-i
with group ID ABC-i
. This works in tandem with PriorityKafkaProducer.
max.poll.records
property is split across priority topic consumers based on maxPollRecordsDistributor
- defaulted to ExpMaxPollRecordsDistributor
. Rest of the KafkaConsumer configs are passed as is to each of the priority topic consumers. Care has to be taken when defining max.partition.fetch.bytes
, fetch.max.bytes
and max.poll.interval.ms
as these values will be used as is across all the priority topic consumers.
Works on the idea of distributing max.poll.records
property across each of the priority topic consumers as their reserved capacity. Records are fetched sequentially from all priority level topics consumers which are configured with distributed max.poll.records
values. The distribution must reserve higher capacity or processing rate to higher priorities.
Caution 1 - if we have skewed partitions in priority level topics e.g. 10K records in a priority 2 partition, 100 records in a priority 1 partition, 10 records in a priority 0 partition that are assigned to different consumer threads, then the implementation will not synchronize across such consumers to regulate capacity and hence will fail to honour priority. So the producers must ensure there are no skewed partitions (e.g. using round-robin - this "may" imply there is no message ordering assumptions and the consumer may choose to process records in parallel by separating out fetching and processing concerns).
Caution 2 - If we have empty partitions in priority level topics e.g. no pending records in assigned priority 2 and 1 partitions, 10K records in priority 0 partition that are assigned to the same consumer thread, then we want priority 0 topic partition consumer to burst its capacity to max.poll.records
and not restrict itself to its reserved capacity based on maxPollRecordsDistributor
else the overall capacity will be under utilized.
This implementation will try to address cautions explained above. Every consumer object will have individual priority level topic consumers, with each priority level consumer having reserved capacity based on maxPollRecordsDistributor. Each of the priority level topic consumer will try to burst into other priority level topic consumer's capacity in the group provided all the below are true:
It is eligible to burst - This is if in the last max.poll.history.window.size
attempts of poll()
atleast min.poll.window.maxout.threshold
times it has received number of records equal to assigned max.poll.records which was distributed based on maxPollRecordsDistributor
. This is an indication that the partition has more incoming records to be processed.
Higher priority level is not eligible to burst - There is no higher priority level topic consumer that is eligible to burst based on above logic. Basically give way to higher priorities.
If the above are true, then the priority level topic consumer will burst into all other priority level topic consumer capacities. The amount of burst per priority level topic consumer is equal to the least un-used capacity in the last max.poll.history.window.size
attempts of poll()
.