Kafka KStream - using AbstractProcessor with a Win

2019-02-20 20:30发布

问题:

Im hoping to group together windowed batches of output from a KStream and write them to a secondary store.

I was expecting to see .punctuate() get called roughly every 30 seconds. What I got instead is saved here.

(The original file was several thousand lines long)

Summary - .punctuate() is getting called seemingly randomly and then repeatedly. It doesn't appear to adhere to the value set via ProcessorContext.schedule().


Edit:

Another run of the same code produced calls to .punctuate() roughly every four minutes. I didn't see the crazy repeated values this time. No change in source - just different result.

Using the following code:

Main

StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);

lines.process(new BPS2());

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

Processor

public class BP2 extends AbstractProcessor<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);

    private ProcessorContext context;
    private final long delay;
    private final ArrayList<String> values;

    public BP2(long delay) {
        LOGGER.debug("BatchProcessor() constructor");
        this.delay = delay;

       values = new ArrayList<>();

    }

    @Override
    public void process(String s, String s2) {
        LOGGER.debug("batched processor s:{}   s2:{}", s, s2);

        values.add(s2);
    }

    @Override
    public void init(ProcessorContext context) {
        LOGGER.info("init");

        super.init(context);

        values.clear();

        this.context = context;
        context.schedule(delay);
    }

    @Override
    public void punctuate(long timestamp) {
        super.punctuate(timestamp);

        LOGGER.info("punctuate   ts: {}   count: {}", timestamp, values.size());

        context().commit();
    }
}

ProcessorSupplier

public class BPS2 implements ProcessorSupplier<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class);

    @Override
    public Processor<String, String> get() {
        try {
            return new BP2(30000);
        } catch(Exception exception) {
            LOGGER.error("Unable to instantiate BatchProcessor()", exception);
            throw new RuntimeException();
        }
    }
}

Edit:

To make sure my debugger wasn't slowing this down I built it and ran it on the same box as my kafka process. This time it didn't even try to lag for 4 minutes or more - within seconds it was outputting spurious calls to .punctuate(). Many (most) of these with no intervening calls to .process().

回答1:

Update: this part of the answer is for Kafka version 0.11 or earlier (for Kafka 1.0 and later see below)

In Kafka Streams, punctuations are based on stream-time and not system time (aka processing-time).

Per default stream-time is event-time, ie, the timestamp embedded in the Kafka records themselves. As you do not set a non-default TimestampExtractor (see timestamp.extractor in http://docs.confluent.io/current/streams/developer-guide.html#optional-configuration-parameters), the calls to punctuate depend only on the process of the event time with regard to the records you process. Thus, if you need multiple minutes to process "30 seconds" (event time) of records, punctuate will be called less frequently than 30 seconds (wall-clock time)...

This can also explain your irregular calling patterns (ie, burst and long delays). If your data event time does "jump", and your data to be processed is already completely available in your topic, Kafka Streams also "jumps" with regard to internally maintained stream-time.

I would assume, that you can resolve your issue by using WallclockTimestampExtractor (see http://docs.confluent.io/current/streams/developer-guide.html#timestamp-extractor)

One more thing to mention: stream-time is only advanced if data is processed -- if your application reaches the end of the input topics and waits for data, punctuate will not be called. This applies even if you use WallclockTimestampExtractor.

Btw: there is currently a discussion about the punctuation behavior of Streams: https://github.com/apache/kafka/pull/1689

Answer for Kafka 1.0 and later

Since Kafka 1.0 it is possible to register punctuations based on wall-clock time or event-time: https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#id2



回答2:

Just finished reading the answer to this question which I think answers yours too. The gist of it is that:

  1. The streams consumer executes a poll for records
  2. All returned records are processed completely.
  3. The punctuate callback is then scheduled with the configured delay.

Point being that punctuate is not a fixed time interval event, and variations in how long #2 takes will result in equivalent variations in the execution period of punctuate.

....but read that link, he says it better than me.



回答3:

Ok - I think this is a bug in Kafka.

Here's why:

In my original testing I was using a single machine to run both the Producer and Consumer. I would run the Producer for a few minutes to generate some test data and then run my tests. This would give the strange output I posted originally.

Then I decided to push the Producer to the background and leave it running. Now I see 100% perfect 30 second intervals between calls to .punctuate(). No more problems with this.

In other words - if the kafka server isn't processing any inbound data then it doesn't seem to be consistent with running the KStreams processes.