I have a stream of JSON objects that Im keying on a hash of a few values. Im hoping to count by key in n-second (10? 60?) intervals and use these values to do some pattern analysis.
My topology: K->aggregateByKey(n seconds)->process()
In the process - init()
step Ive called ProcessorContent.schedule(60 * 1000L)
in hopes of having the .punctuate()
get called. From here I would loop through the values in an internal hash and act accordingly.
Im seeing values come through the aggregation step and hit the process()
function but the .punctuate()
is never getting called.
Code:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> opxLines = kStreamBuilder.stream(TOPIC);
KStream<String, String> mapped = opxLines.map(new ReMapper());
KTable<Windowed<String>, String> ktRtDetail = mapped.aggregateByKey(
new AggregateInit(),
new OpxAggregate(),
TimeWindows.of("opx_aggregate", 60000));
ktRtDetail.toStream().process(new ProcessorSupplier<Windowed<String>, String>() {
@Override
public Processor<Windowed<String>, String> get() {
return new AggProcessor();
}
});
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
AggregateInit() returns null.
I guess I can do the .punctuate()
equivalent with a simple timer but I'd like to know why this code isn't working the way I'd hope.