How to handle errors in custom MapFunction correct

2020-08-25 07:41发布

问题:

I have implemented MapFunction for my Apache Flink flow. It is parsing incoming elements and convert them to other format but sometimes error can appear (i.e. incoming data is not valid).

I see two possible ways how to handle it:

  • Ignore invalid elements but seems like I can't ignore errors because for any incoming element I must provide outgoing element.
  • Split incoming elements to valid and invalid but seems like I should use other function for this.

So, I have two questions:

  1. How to handle errors correctly in my MapFunction?
  2. How to implement such transformation functions correctly?

回答1:

You could use a FlatMapFunction instead of a MapFunction. This would allow you to only emit an element if it is valid. The following shows an example implementation:

input.flatMap(new FlatMapFunction<String, Long>() {
    @Override
    public void flatMap(String input, Collector<Long> collector) throws Exception {
        try {
            Long value = Long.parseLong(input);
            collector.collect(value);
        } catch (NumberFormatException e) {
            // ignore invalid data
        }
    }
});


回答2:

This is to build on @Till Rohrmann's idea above. Adding this as an answer instead of a comment for better formatting.

I think one way to implement "split + select" could be to use a ProcessFunction with a SideOutput. My graph would look something like this:

Source --> ValidateProcessFunction ---good data--> UDF--->SinkToOutput
                                    \
                                     \---bad data----->SinkToErrorChannel

Would this work? Is there a better way?