FlinkCEP:我可以引用较早事件定义随后的比赛?(FlinkCEP: Can I referen

2019-10-29 04:30发布

下面是一个简单的例子:

val pattern = 
   Pattern.begin[Event]("start").where(_.getId == 42).
   next("middle").subtype(classOf[SubEvent]).where(x => x.getVolume == **first event matched**.getVolume) ...

本质上,第二个事件(“中间”)需要访问的第一个事件的状态(“启动”)。 是否有可能内FlinkCEP做到这一点,而不需要外部的状态?

Answer 1:

当然。 您可以通过与上下文的帮助特定的模式获取事件。

new IterativeCondition<Event>() {
            private static final long serialVersionUID = 8061969839441121955L;

            @Override
            public boolean filter(Event value, IterativeCondition.Context<Event> ctx) throws Exception {
                double sum = 0.0;
                for (Event e : ctx.getEventsForPattern("middle")) {
                    sum += e.getPrice();
                }
                return sum > 5.0;
            }
        }


文章来源: FlinkCEP: Can I reference an earlier event to define a subsequent match?