I have this simple KTable definition that generates a Store:
KTable<String, JsonNode> table = kStreamBuilder.<String, JsonNode>table(ORDERS_TOPIC, ORDERS_STORE);
table.print();
I publish messages into the ORDERS_TOPIC but the store isn't really updated until every 30 seconds. This is the log where there is a message about committing because the 30000ms time has elapsed:
2017-07-25 23:53:15.465 DEBUG 17540 --- [ StreamThread-1] o.a.k.c.consumer.internals.Fetcher : Sending fetch for partitions [orders-0] to broker EXPRF026.SUMINISTRADOR:9092 (id: 0 rack: null)
2017-07-25 23:53:15.567 INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed
2017-07-25 23:53:15.567 INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.processor.internals.StreamTask : task [0_0] Committing its state
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.i.ProcessorStateManager : task [0_0] Flushing all stores registered in the state manager
f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec
{"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}
[KTABLE-SOURCE-0000000001]: f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec , ({"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}<-null)
2017-07-25 23:53:15.569 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.state.internals.ThreadCache : Thread order-service-streams-16941f70-87b3-45f4-88de-309e4fd22748-StreamThread-1 cache stats on flush: #puts=1, #gets=1, #evicts=0, #flushes=1
2017-07-25 23:53:15.576 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.internals.RecordCollectorImpl : task [0_0] Flushing producer
I found that the property that controls this is commit.interval.ms
:
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
Why is it set to 30000ms by default (sounds like a long time) and what are the implications of changing it to 10ms?
If instead of a KTable I work with a KStream...
KStream<String, JsonNode> kStream = kStreamBuilder.stream(ORDERS_TOPIC);
kStream.print();
...I can see the messages right away, without having to wait those 30000ms, why the difference?