Im hoping to group together windowed batches of output from a KStream and write them to a secondary store.
I was expecting to see .punctuate()
get called roughly every 30 seconds. What I got instead is saved here.
(The original file was several thousand lines long)
Summary - .punctuate()
is getting called seemingly randomly and then repeatedly. It doesn't appear to adhere to the value set via ProcessorContext.schedule().
Edit:
Another run of the same code produced calls to .punctuate()
roughly every four minutes. I didn't see the crazy repeated values this time. No change in source - just different result.
Using the following code:
Main
StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
lines.process(new BPS2());
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
Processor
public class BP2 extends AbstractProcessor<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);
private ProcessorContext context;
private final long delay;
private final ArrayList<String> values;
public BP2(long delay) {
LOGGER.debug("BatchProcessor() constructor");
this.delay = delay;
values = new ArrayList<>();
}
@Override
public void process(String s, String s2) {
LOGGER.debug("batched processor s:{} s2:{}", s, s2);
values.add(s2);
}
@Override
public void init(ProcessorContext context) {
LOGGER.info("init");
super.init(context);
values.clear();
this.context = context;
context.schedule(delay);
}
@Override
public void punctuate(long timestamp) {
super.punctuate(timestamp);
LOGGER.info("punctuate ts: {} count: {}", timestamp, values.size());
context().commit();
}
}
ProcessorSupplier
public class BPS2 implements ProcessorSupplier<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class);
@Override
public Processor<String, String> get() {
try {
return new BP2(30000);
} catch(Exception exception) {
LOGGER.error("Unable to instantiate BatchProcessor()", exception);
throw new RuntimeException();
}
}
}
Edit:
To make sure my debugger wasn't slowing this down I built it and ran it on the same box as my kafka process. This time it didn't even try to lag for 4 minutes or more - within seconds it was outputting spurious calls to .punctuate()
. Many (most) of these with no intervening calls to .process()
.