I am streaming loads of data through kafka. And then I have spark streaming which is consuming these messages. Basically down the line, spark streaming throws this error:
kafka.common.OffsetOutOfRangeException
Now I am aware what this error means. So I changed the retention policy to 5 days. However I still encountered the same issue. Then I listed all the messages for a topic using --from-beginning in kafka. Surely enough, ton of messages from the beginning of the kafka streaming part were not present and since spark streaming is a little behind the kafka streaming part, spark streaming tries to consume messages that have been deleted by kafka. However I thought changing the retention policy would take care of this:
--add-config retention.ms=....
What I suspect is happening that kafka is deleting messages from the topic to free up space (because we are streaming tons of data) for the new messages. Is there a property which I can configure that specifies how much bytes of data kafka can store before deleting the prior messages?
Another way to solve this problem is to specify in the configuration the spark parameter :
You can set the maximum size of the topic when u create the topic using the topic configuration property
retention.bytes
via console like:or u can use global broker configuration property
log.retention.bytes
to set the maximum size for all topics.what is important to know is that
log.retention.bytes
doesn't enforce a hard limit on a topic size, but it just signal to Kafka when to start deleting the oldest messages