Kafka KStreams - processing timeouts

2019-01-25 13:59发布

问题:

I am attempting to use <KStream>.process() with a TimeWindows.of("name", 30000) to batch up some KTable values and send them on. It seems that 30 seconds exceeds the consumer timeout interval after which Kafka considers said consumer to be defunct and releases the partition.

I've tried upping the frequency of poll and commit interval to avoid this:

config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");
config.put(StreamsConfig.POLL_MS_CONFIG, "5000");

Unfortunately these errors are still occurring:

(lots of these)

ERROR  o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog 
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0

Followed by these:

INFO   o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1
WARN   o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]: 
  org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)

Clearly I need to be sending heartbeats back to the server more often. How?

My topology is:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String>  kt = lines.aggregateByKey(
            new DBAggregateInit(),
            new DBAggregate(),
            TimeWindows.of("write_aggregate2", 30000));

DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();

kt.toStream().process(dbProcessorSupplier);
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

The KTable is grouping values by key every 30 seconds. In Processor.init() I call context.schedule(30000).

DBProcessorSupplier provides an instance of DBProcessor. This is an implementation of AbstractProcessor where all the overrides have been provided. All they do is LOG so I know when each is being hit.

It's a pretty simple topology but it's clear I'm missing a step somewhere.


Edit:

I get that I can adjust this on the server side but Im hoping there is a client-side solution. I like the notion of partitions being made available pretty quickly when a client exits / dies.


Edit:

In an attempt to simplify the problem I removed the aggregation step from the graph. It's now just consumer->processor(). (If I send the consumer directly to .print() it works v quickly so I know it's ok). (Similarly If I output the aggregation (KTable) via .print() it seems ok too).

What I found was that the .process() - which should be calling .punctuate() every 30 seconds is actually blocking for variable lengths of time and outputting somewhat randomly (if at all).

  • Main program
  • Debug output
  • Processor Supplier
  • Processor

Further:

I set the debug level to 'debug' and reran. Im seeing lots of messages:

DEBUG  o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord <info>

but a breakpoint in the .punctuate() function isn't getting hit. So it's doing lots of work but not giving me a chance to use it.

回答1:

A few clarifications:

  • StreamsConfig.COMMIT_INTERVAL_MS_CONFIG is a lower bound on the commit interval, ie, after a commit, the next commit happens not before this time passed. Basically, Kafka Stream tries to commit ASAP after this time passed, but there is no guarantee whatsoever how long it will actually take to do the next commit.
  • StreamsConfig.POLL_MS_CONFIG is used for the internal KafkaConsumer#poll() call, to specify the maximum blocking time of the poll() call.

Thus, both values are not helpful to heartbeat more often.

Kafka Streams follows a "depth-first" strategy when processing record. This means, that after a poll() for each record all operators of the topology are executed. Let's assume you have three consecutive maps, than all three maps will be called for the first record, before the next/second record will get processed.

Thus, the next poll() call will be made, after all record of the first poll() got fully processed. If you want to heartbeat more often, you need to make sure, that a single poll() call fetches less records, such that processing all records takes less time and the next poll() will be triggered earlier.

You can use configuration parameters for KafkaConsumer that you can specify via StreamsConfig to get this done (see https://kafka.apache.org/documentation.html#consumerconfigs):

streamConfig.put(ConsumerConfig.XXX, VALUE);

  • max.poll.records: if you decrease this value, less record will be polled
  • session.timeout.ms: if you increase this value, there is more time for processing data (adding this for completeness because it is actually a client setting and not a server/broker side configuration -- even if you are aware of this solution and do not like it :))

EDIT

As of Kafka 0.10.1 it is possible (and recommended) to prefix consumer and procuder configs within streams config. This avoids parameter conflicts as some parameter names are used for consumer and producer and cannot be distinguiesh otherwise (and would be applied to consumer and producer at the same time). To prefix a parameter you can use StreamsConfig#consumerPrefix() or StreamsConfig#producerPrefix(), respectively. For example: streamsConfig.put(StreamsConfig.consumerPrefix(ConsumerConfig.PARAMETER), VALUE);

One more thing to add: The scenario described in this question is a known issue and there is already KIP-62 that introduces a background thread for KafkaConsumer that send heartbeats, thus decoupling heartbeats from poll() calls. Kafka Streams will leverage this new feature in upcoming releases.