在KafkaStream,实施ValueTransformer或ValueTransformerWithKey,上变换时()调用,我安排一个新的标点符号。 当执行加标点器的方法圈点()我希望它的下游使用该上下文实例转发的事件。 然而,上下文实例似乎没有定义DSL拓扑的一部分时。
如何与一个变压器做任何线索?
在处理器中使用相同的逻辑,实施低级别处理器拓扑它的工作原理。
在ValueTransformerWithKey:
@Override
public Event transform(final String key, final Event event) {
this.context.schedule(timeout.toMillis(), PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(context, key, event));
return null;
}
在MyPunctuator:
private class MytPunctuator implements Punctuator {
private String key;
private ProcessorContext context;
private Event event;
MyPunctuator(ProcessorContext context, String key, Event event)
{
this.context = context;
this.key = key;
this.event = event;
}
@Override
public void punctuate(final long timestamp) {
context.forward(key, AlertEvent.builder().withSource(event).build());
context.commit();
}
}
当执行
myStream
.groupByKey(Serialized.with(Serdes.String(), Event.serde()))
.reduce((k, v) -> v)
.transformValues(() -> valueTransformerWithKey)
.toStream().to(ALARM_TOPIC, Produced.with(Serdes.String(), AlarmEvent.serde()));
我希望通过加标点产生的报警事件被forwared一旦过期报警话题。
相反,我得到了以下异常:ProcessorContext.forward()不支持。