I'm running a job using Beam KafkaIO source in Google Dataflow and cannot find an easy way to persist offsets across job restarts (job update option is not enough, i need to restart the job)
Comparing Beam's KafkaIO against PubSubIO (or to be precise comparing PubsubCheckpoint with KafkaCheckpointMark) I can see that checkpoint persistence is not implemented in KafkaIO (KafkaCheckpointMark.finalizeCheckpoint method is empty) whereas it's implemented in PubsubCheckpoint.finalizeCheckpoint which does acknowledgement to PubSub.
Does this mean I have no means of reliably managing Kafka offsets on job restarts with minimum effort?
Options I considered so far:
Implement my own logic for persisting offsets - sounds complicated, I'm using Beam though Scio in Scala.
Do nothing but that would result in many duplicates on job restarts (topic has 30 days retention period).
Enable auto-commit but that would result in lost messages so even worse.
There two options : enable commitOffsetsInFinalize() in KafkaIO or alternately enable auto-commit in Kafka consumer configuration. Note that while
commitOffsetsInFinalize()
is more in sync with what has been processed in Beam than Kafka's auto-commit, it does not provide strong guarantees exactly-once processing. Imagine a two stage pipeline, Dataflow finalizes Kafka reader after the first stage, without waiting for second stage to complete. If you restart the pipeline from scratch at that time, you would not process the records that completed first stage, but haven't been processed by the second. The issue is no different for PubsubIO.Regd option (2) : You can configure KafkaIO to start reading from specific timestamp (assuming Kafka server supports it (version 10+)). But does not look any better than enabling auto_commit.
That said, KafkaIO should support finalize. Might be simpler to use than enabling auto_commit (need to think about frequency etc). We haven't had many users asking for it. Please mention it on user@beam.apache.org if you can.
[Update: I am adding support for committing offsets to KafkaCheckpointMark in PR 4481]