如何从一个ValueTransformer一个标点符号实例下游转发事件?(How to forwar

2019-10-31 09:42发布

在KafkaStream,实施ValueTransformerValueTransformerWithKey,变换()调用,我安排一个新的标点符号。 当执行加标点器的方法圈点()我希望它的下游使用该上下文实例转发的事件。 然而,上下文实例似乎没有定义DSL拓扑的一部分时。

如何与一个变压器做任何线索?

在处理器中使用相同的逻辑,实施低级别处理器拓扑它的工作原理。

在ValueTransformerWithKey:

@Override 
    public Event transform(final String key, final Event event) { 
        this.context.schedule(timeout.toMillis(), PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(context, key, event));
        return null;
}

在MyPunctuator:

private class MytPunctuator implements Punctuator {
    private String key;
    private ProcessorContext context;
    private Event event;

    MyPunctuator(ProcessorContext context, String key, Event event)
    {
        this.context = context;
        this.key = key;
        this.event = event;
    }

    @Override
    public void punctuate(final long timestamp) {
        context.forward(key, AlertEvent.builder().withSource(event).build());
        context.commit();
    }
}

当执行

myStream
    .groupByKey(Serialized.with(Serdes.String(), Event.serde()))
    .reduce((k, v) -> v)
    .transformValues(() -> valueTransformerWithKey)
    .toStream().to(ALARM_TOPIC, Produced.with(Serdes.String(), AlarmEvent.serde()));

我希望通过加标点产生的报警事件被forwared一旦过期报警话题。

相反,我得到了以下异常:ProcessorContext.forward()不支持。

Answer 1:

像往常一样,我发现在约ValueTransformerWithKey接口的Javadoc答案: https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/ValueTransformerWithKey.html

请注意,使用ProcessorContext.forward(对象,对象)或ProcessorContext.forward(对象,对象,对应)是没有在变换允许的,将导致异常。

然而,实现变压器接口而不是允许context.forward的使用()。 由于@Matthias J.萨克斯

https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html

如果多于一个的输出记录应该被转发下游ProcessorContext.forward(对象,对象)和ProcessorContext.forward(对象,对象,要)可以被使用。 如果记录不应该向下游转发,变换可以返回null。



文章来源: How to forward event downstream from a Punctuator instance in a ValueTransformer?