Kafka Streams - Explain the reason why KTable and

2019-07-22 19:33发布

问题:

I have this simple KTable definition that generates a Store:

KTable<String, JsonNode> table = kStreamBuilder.<String, JsonNode>table(ORDERS_TOPIC, ORDERS_STORE);
table.print();

I publish messages into the ORDERS_TOPIC but the store isn't really updated until every 30 seconds. This is the log where there is a message about committing because the 30000ms time has elapsed:

2017-07-25 23:53:15.465 DEBUG 17540 --- [ StreamThread-1] o.a.k.c.consumer.internals.Fetcher       : Sending fetch for partitions [orders-0] to broker EXPRF026.SUMINISTRADOR:9092 (id: 0 rack: null)
2017-07-25 23:53:15.567  INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed
2017-07-25 23:53:15.567  INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.processor.internals.StreamTask   : task [0_0] Committing its state
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.i.ProcessorStateManager        : task [0_0] Flushing all stores registered in the state manager
f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec
{"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}
[KTABLE-SOURCE-0000000001]: f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec , ({"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}<-null)
2017-07-25 23:53:15.569 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.state.internals.ThreadCache      : Thread order-service-streams-16941f70-87b3-45f4-88de-309e4fd22748-StreamThread-1 cache stats on flush: #puts=1, #gets=1, #evicts=0, #flushes=1
2017-07-25 23:53:15.576 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.internals.RecordCollectorImpl  : task [0_0] Flushing producer

I found that the property that controls this is commit.interval.ms:

props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);

Why is it set to 30000ms by default (sounds like a long time) and what are the implications of changing it to 10ms?

If instead of a KTable I work with a KStream...

KStream<String, JsonNode> kStream = kStreamBuilder.stream(ORDERS_TOPIC);
kStream.print();

...I can see the messages right away, without having to wait those 30000ms, why the difference?

回答1:

It's related to memory management in particular, the KTable caches: http://docs.confluent.io/current/streams/developer-guide.html#memory-management

KTable is actually updated all the time and if you use "Interactive Queries" to access the underlying state store, you can get each update immediately. However, the KTable cache buffers the updates to reduce downstream load and each time a commit is triggered, this cache needs to be flushed downstream to avoid data loss in case if failure. If your cache size is small, you might also see downstream records if a key get's evicted from the cache.

About commit interval: in general, the commit interval is set to a relatively large value, to reduce the commit load on the brokers.