Controlled/manual error/recovery handling in strea

2019-07-12 17:47发布

I am working on an application based on Apache Flink, which makes use of Apache Kafka for input and out. Possibly this application will be ported to Apache Spark, so I have added this as a tag as well, and the question remains the same.

I have the requirement that all incoming messages received via kafka must be processed in-order, as well safely be stored in a persistence layer (database), and no message must get lost.

The streaming-part in this application is rather trivial/small, as the main logic will boil down to something like:

environment.addSource(consumer)    // 1) DataStream[Option[Elem]]
  .filter(_.isDefined)             // 2) discard unparsable messages
  .map(_.get)                      // 3) unwrap Option
  .map(InputEvent.fromXml(_))      // 4) convert from XML to internal representation
  .keyBy(_.id)                     // 5) assure in-order processing on logical-key level
  .map(new DBFunction)             // 6) database lookup, store of update and additional enrichment
  .map(InputEvent.toXml(_))        // 7) convert back to XML
  .addSink(producer)               // 8) attach kafka producer sink

Now, during this pipeline, several error situations could occur:

  • the database becomes unavailable (shutdown, tablespace full, ...)
  • changes cannot be stored because of logical errors (from column format)
  • the kafka producer cannot send a message because of broker inavailability

and probably other situations.

Now my question is, how can I assure consistency as per the above in those situations, when I in fact would have to do something like:

  1. Stream-Operator 6) detects a problem (DB unavailable)
  2. The DB-connection of the DBFunction object must be recovered, which might only succeed after some minutes
  3. This means that overall processing must be suspended, at best for the whole pipeline, so that incoming messages are lot loaded into memory
  4. Resume processing after database has been recovered. Processing must resume exactly with the message which encountered the problem at 1)

Now I know that there is at least 2 tools regarding failure handling:

  1. kafka consumer offsets
  2. apache flink checkpoints

However, searching the docs, I fail to see how either of those could be used in the middle of stream processing from within a single operator.

So, what would be the recommended strategies for fine-grained error handling and recovery in a streaming application?

1条回答
ら.Afraid
2楼-- · 2019-07-12 18:11

A few points:

The keyBy is not going to help ensure in-order processing. If anything, it may interleave events from different Kafka partitions (which may have been in-order within each partition), thereby creating out-of-orderness where it didn't previously exist. It's hard to comment more specifically about how you might guarantee in-order processing without understanding how many FlinkKafkaConsumer instances you intend to use, how many partitions each one will be consuming from, how the keys are distributed across the Kafka partitions, and why you think a keyBy is necessary -- but if you set things up correctly, preserving order may be achievable. reinterpretAsKeyedStream can be helpful here, but this feature is difficult to understand, and tricky to use correctly.

You could use Flink's AsyncFunction to manage the connection to the external DB in a fault tolerant, exactly-once, manner.

Flink doesn't support fine-grained recovery in a systematic way -- its checkpoints are global snapshots of the state of the entire distributed cluster, and are designed to be used during recovery as a monolithic, self-consistent, snapshot. If your job fails, normally the only recourse is to restart from a checkpoint, which will involve rewinding input queues (to the offsets stored in the checkpoint), replaying the events since those offsets, re-issuing the DB lookups (which the async function will do automatically), and using kafka transactions to achieve end-to-end exactly once semantics. However, in the case of embarrassingly parallel jobs, it is sometimes possible to take advantage of fine-grained recovery.

查看更多
登录 后发表回答