kafka KStream - topology to take n-second counts

2019-06-08 01:36发布

问题:

I have a stream of JSON objects that Im keying on a hash of a few values. Im hoping to count by key in n-second (10? 60?) intervals and use these values to do some pattern analysis.

My topology: K->aggregateByKey(n seconds)->process()

In the process - init() step Ive called ProcessorContent.schedule(60 * 1000L) in hopes of having the .punctuate() get called. From here I would loop through the values in an internal hash and act accordingly.

Im seeing values come through the aggregation step and hit the process() function but the .punctuate() is never getting called.


Code:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> opxLines = kStreamBuilder.stream(TOPIC);

KStream<String, String> mapped = opxLines.map(new ReMapper());

KTable<Windowed<String>, String> ktRtDetail = mapped.aggregateByKey(
            new AggregateInit(),
            new OpxAggregate(),
            TimeWindows.of("opx_aggregate", 60000));

ktRtDetail.toStream().process(new ProcessorSupplier<Windowed<String>, String>() {
                            @Override
                            public Processor<Windowed<String>, String> get() {
                                 return new AggProcessor();
                            }
                       });

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

AggregateInit() returns null.

I guess I can do the .punctuate() equivalent with a simple timer but I'd like to know why this code isn't working the way I'd hope.

回答1:

I think this is related to an improper setup for the kafka cluster. After changing the file descriptor count to a much higher value than the default (1024 -> 65535) this seems to be working per spec.