I have implemented MapFunction
for my Apache Flink flow. It is parsing incoming elements and convert them to other format but sometimes error can appear (i.e. incoming data is not valid).
I see two possible ways how to handle it:
- Ignore invalid elements but seems like I can't ignore errors because for any incoming element I must provide outgoing element.
- Split incoming elements to valid and invalid but seems like I should use other function for this.
So, I have two questions:
- How to handle errors correctly in my
MapFunction
?
- How to implement such transformation functions correctly?
You could use a FlatMapFunction
instead of a MapFunction
. This would allow you to only emit an element if it is valid. The following shows an example implementation:
input.flatMap(new FlatMapFunction<String, Long>() {
@Override
public void flatMap(String input, Collector<Long> collector) throws Exception {
try {
Long value = Long.parseLong(input);
collector.collect(value);
} catch (NumberFormatException e) {
// ignore invalid data
}
}
});
This is to build on @Till Rohrmann's idea above. Adding this as an answer instead of a comment for better formatting.
I think one way to implement "split + select" could be to use a ProcessFunction with a SideOutput. My graph would look something like this:
Source --> ValidateProcessFunction ---good data--> UDF--->SinkToOutput
\
\---bad data----->SinkToErrorChannel
Would this work? Is there a better way?