I am working on an application based on Apache Flink
, which makes use of Apache Kafka
for input and out. Possibly this application will be ported to Apache Spark
, so I have added this as a tag as well, and the question remains the same.
I have the requirement that all incoming messages received via kafka must be processed in-order, as well safely be stored in a persistence layer (database), and no message must get lost.
The streaming-part in this application is rather trivial/small, as the main logic will boil down to something like:
environment.addSource(consumer) // 1) DataStream[Option[Elem]]
.filter(_.isDefined) // 2) discard unparsable messages
.map(_.get) // 3) unwrap Option
.map(InputEvent.fromXml(_)) // 4) convert from XML to internal representation
.keyBy(_.id) // 5) assure in-order processing on logical-key level
.map(new DBFunction) // 6) database lookup, store of update and additional enrichment
.map(InputEvent.toXml(_)) // 7) convert back to XML
.addSink(producer) // 8) attach kafka producer sink
Now, during this pipeline, several error situations could occur:
- the database becomes unavailable (shutdown, tablespace full, ...)
- changes cannot be stored because of logical errors (from column format)
- the kafka producer cannot send a message because of broker inavailability
and probably other situations.
Now my question is, how can I assure consistency as per the above in those situations, when I in fact would have to do something like:
- Stream-Operator 6) detects a problem (DB unavailable)
- The DB-connection of the
DBFunction
object must be recovered, which might only succeed after some minutes - This means that overall processing must be suspended, at best for the whole pipeline, so that incoming messages are lot loaded into memory
- Resume processing after database has been recovered. Processing must resume exactly with the message which encountered the problem at 1)
Now I know that there is at least 2 tools regarding failure handling:
- kafka consumer offsets
- apache flink checkpoints
However, searching the docs, I fail to see how either of those could be used in the middle of stream processing from within a single operator.
So, what would be the recommended strategies for fine-grained error handling and recovery in a streaming application?