How to forward event downstream from a Punctuator

2019-08-18 00:52发布

问题:

In KafkaStream, when implementing a ValueTransformer or ValueTransformerWithKey, on transform() call, I schedule a new Punctuator. When method punctuate() of the Punctuator is executed I want it to forward an event downstream using the context instance. However, the context instance seems not defined when part of a DSL topology.

Any clue on how to do this with a Transformer ?

Using the same logic in a Processor, implementing the low-level processor topology it works.

In ValueTransformerWithKey:

@Override 
    public Event transform(final String key, final Event event) { 
        this.context.schedule(timeout.toMillis(), PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(context, key, event));
        return null;
}

In MyPunctuator:

private class MytPunctuator implements Punctuator {
    private String key;
    private ProcessorContext context;
    private Event event;

    MyPunctuator(ProcessorContext context, String key, Event event)
    {
        this.context = context;
        this.key = key;
        this.event = event;
    }

    @Override
    public void punctuate(final long timestamp) {
        context.forward(key, AlertEvent.builder().withSource(event).build());
        context.commit();
    }
}

When executing

myStream
    .groupByKey(Serialized.with(Serdes.String(), Event.serde()))
    .reduce((k, v) -> v)
    .transformValues(() -> valueTransformerWithKey)
    .toStream().to(ALARM_TOPIC, Produced.with(Serdes.String(), AlarmEvent.serde()));

I expect the Alarm event produced by the punctuator to be forwared to the ALARM topic once expired.

Instead I got the following exception: ProcessorContext.forward() not supported.

回答1:

As usual, I found the answer in the javadoc about ValueTransformerWithKey interface: https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/ValueTransformerWithKey.html

Note, that using ProcessorContext.forward(Object, Object) or ProcessorContext.forward(Object, Object, To) is not allowed within transform and will result in an exception.

However, implementing the Transformer interface instead allows the usage of context.forward(). Thanks @Matthias J. Sax

https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html

If more than one output record should be forwarded downstream ProcessorContext.forward(Object, Object) and ProcessorContext.forward(Object, Object, To) can be used. If record should not be forwarded downstream, transform can return null.