Kafka Streams with state stores - Reprocessing of

2019-04-02 14:19发布

问题:

We have the following topology with two transformers, and each transformer uses persistent state store:

kStreamBuilder.stream(inboundTopicName)
    .transform(() -> new FirstTransformer(FIRST_STATE_STORE), FIRST_STATE_STORE)
    .map((key, value) -> ...)
    .transform(() -> new SecondTransformer(SECOND_STATE_STORE), SECOND_STATE_STORE)
    .to(outboundTopicName);

and Kafka settings has auto.offset.reset: latest. After app was launched, I see two internal compacted topics were creates (and it's expected): appId_inbound_firstStateStore-changelog and appId_inbound_secondStateStore-changelog

Our app was down for two days, and after we started app again, messages were reprocessed from the beginning for specific partition (but we have multiple partitions). I know that committed offsets are stored during ~ 1 day for kafka brokers prior to version 2, so our offsets should be cleaned up by retention. But why messages were reprocessed from beginning if we use auto.offset.reset: latest? Maybe it's somehow relate to stateful operations or changelog internal topics.

I see the following logs (most of them are duplicated multiple times):

StoreChangelogReader Restoring task 0_55's state store firstStateStore from beginning of the changelog
Fetcher [Consumer clientId=xxx-restore-consumer, groupId=] Resetting offset for partition xxx-55 to offset 0
ConsumerCoordinator Setting newly assigned partitions
ConsumerCoordinator Revoking previously assigned partitions
StreamsPartitionAssignor Assigned tasks to clients
AbstractCoordinator Successfully joined group with generation
StreamThread partition revocation took xxx ms
Unsubscribed all topics or patterns and assigned partitions
AbstractCoordinator (Re-)joining group
Attempt to heartbeat failed since group is rebalancing
AbstractCoordinator Group coordinator xxx:9092 (id: xxx rack: null) is unavailable or invalid, will attempt rediscovery
FetchSessionHandler - [Consumer clientId=xxx-restore-consumer, groupId=] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 2: org.apache.kafka.common.errors.DisconnectException

Kafka broker version 0.11.0.2; Kafka Streams version 2.1.0