Adding patterns dynamically in Apache Flink withou

2019-07-16 05:44发布

问题:

My use case is that I want to apply different CEP patterns to the same datastream. the CEP patterns come dynamically & i want them to be added to flink without having to restart the job. While all conditions can be handled via custom classes that implement IterativeCondition, my main problem is that the temporal condition accepts only TimeWindow; which cannot be handled. Is there some way that the value passed to .within() be set based on the input elements?

Something similar was asked here: Flink and Dynamic templates recognition

Best Answer: "What one could add is a co-flat map operator which receives on one input channel the events and on the other input channel patterns. For each newly received pattern one either updates the existing NFA (this functionality is missing) or compiles a new one. In the latter case, one would apply incoming events to all stored NFAs."

I am trying to implement this but I am facing some difficulty. Specifically, on the point of "In the latter case, one would apply incoming events to all stored NFAs"

Reason being that I apply stream to pattern using: PatternStream matchStream = CEP.pattern(tmatchStream, pattern);

But the stream "tmatchStream" would not be defined inside the co-flatMap. Am I missing something here??? Any help would be greatly appreciated.

回答1:

Unfortunately the answer to the linked question is still valid. Flink CEP does not support dynamic patterns at that moment. There is already a JIRA ticket for that though: FLINK-7129

The earliest reasonable target version for that feature will be 1.6.0