It seems like my kafka node consumer:
var kafka = require('kafka-node');
var consumer = new Consumer(client, [], {
...
});
is fetching way too many messages than I can handle in certain cases. Is there a way to limit it (for example accept no more than 1000 messages per second, possibly using the pause api?)
- I'm using kafka-node, which seems to have a limited api comparing to the Java version
As far as I know the API does not have any kind of throttling. But both consumers (Consumer and HighLevelConsumer) have a 'pause()' function. So you could stop consuming if you get to much messages. Maybe that already offers what you need.
Please keep in mind what's happening. You send a fetch request to the broker and get a batch of message back. You can configure the min and max size of the messages (according to the documentation not the number of messages) you want to fetch:
In Kafka, poll and process should happen in a coordinated/synchronized way. Ie, after each poll, you should process all received data first, before you do the next poll. This pattern will automatically throttle the number of messages to the max throughput your client can handle.
Something like this (pseudo-code):
(That is the reason, why there is not parameter "fetch.max.messages" -- you just do not need it.)
From FAQ in the README
async.queue
with message processor and concurrency of one (the message processor itself is wrapped withsetImmediate
function so it will not freeze up the event loop)queue.drain
toresume()
the consumerpause()
the consumer and pushes the message to the queue.I had a similar situation where I was consuming messages from Kafka and had to throttle the consumption because my consumer service was dependent on a third party API which had its own constraints.
I used
async/queue
along with a wrapper ofasync/cargo
calledasyncTimedCargo
for batching purpose. The cargo gets all the messages from the kafka-consumer and sends it to queue upon reaching a size limitbatch_config.batch_size
or timeoutbatch_config.batch_timeout
.async/queue
providessaturated
andunsaturated
callbacks which you can use to stop the consumption if your queue task workers are busy. This would stop the cargo from filling up and your app would not run out of memory. The consumption would resume upon unsaturation.