Suppose I have a stream "stream-1" consisting of 1 datapoint every second and I'd like to calculate a derived stream "stream-5" which contains the sum using a hopping window of 5 seconds and another stream "stream-10" which is based off "stream-5" containing the sum using a hopping window of 10 seconds. The aggregation needs to be done for each key separately and I'd like to be able to run each step in a different process. It is not a problem in itself if stream-5 and stream-10 contain updates for the same key/timestamp (so I don't necessarily need How to send final kafka-streams aggregation result of a time windowed KTable?) as long as the last values are correct.
Is there an (easy) way to solve this using the high-level Kafka Streams DSL? So far I fail to see an elegant way to deal with the intermediate updates produced on stream-5 due to the aggregation.
I know the intermediate updates can be somehow controlled with the cache.max.bytes.buffering
and commit.interval.ms
settings but I don't think any setting can guarantee in all cases that no intermediate values will be produced. Also I could try converting "stream-5" to a KTable on read with the timestamp part of the key, but then it seems KTable does not support windowing operations like KStreams do.
This is what I have so far and which fails due to the intermediate aggregate values on stream-5
Reducer<DataPoint> sum = new Reducer<DataPoint>() {
@Override
public DataPoint apply(DataPoint x, DataPoint y) {
return new DataPoint(x.timestamp, x.value + y.value);
}
};
KeyValueMapper<Windowed<String>, DataPoint, String> strip = new
KeyValueMapper<Windowed<String>, DataPoint, String>() {
@Override
public String apply(Windowed<String> wKey, DataPoint arg1) {
return wKey.key();
}
};
KStream<String, DataPoint> s1 = builder.stream("stream-1");
s1.groupByKey()
.reduce(sum, TimeWindows.of(5000).advanceBy(5000))
.toStream()
.selectKey(strip)
.to("stream-5");
KStream<String, DataPoint> s5 = builder.stream("stream-5");
s5.groupByKey()
.reduce(sum, TimeWindows.of(10000).advanceBy(10000))
.toStream()
.selectKey(strip)
.to("stream-10");
Now if stream-1 contains intputs (the key is just KEY)
KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":1000,"value":1.0}
KEY {"timestamp":2000,"value":1.0}
KEY {"timestamp":3000,"value":1.0}
KEY {"timestamp":4000,"value":1.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":6000,"value":1.0}
KEY {"timestamp":7000,"value":1.0}
KEY {"timestamp":8000,"value":1.0}
KEY {"timestamp":9000,"value":1.0}
stream-5 contains the correct (final) values:
KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":2.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":4.0}
KEY {"timestamp":0,"value":5.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":5000,"value":2.0}
KEY {"timestamp":5000,"value":3.0}
KEY {"timestamp":5000,"value":4.0}
KEY {"timestamp":5000,"value":5.0}
but stream-10 is wrong (final value should be 10.0) because it also takes into account the intermediate values on stream-5:
KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":6.0}
KEY {"timestamp":0,"value":10.0}
KEY {"timestamp":0,"value":15.0}
KEY {"timestamp":0,"value":21.0}
KEY {"timestamp":0,"value":28.0}
KEY {"timestamp":0,"value":36.0}
KEY {"timestamp":0,"value":45.0}
KEY {"timestamp":0,"value":55.0}