I have a topology that looks like this:
KTable<ByteString, User> users = topology.table(USERS);
KStream<ByteString, JoinRequest> joinRequests = topology.stream(JOIN_REQUESTS)
.mapValues(entityTopologyProcessor::userNew)
.to(USERS);
topology.stream(SETTINGS_CONFIRM_REQUESTS)
.join(users, entityTopologyProcessor::userSettingsConfirm)
.to(USERS);
topology.stream(SETTINGS_UPDATE_REQUESTS)
.join(users, entityTopologyProcessor::userSettingsUpdate)
.to(USERS);
At runtime this topology works fine. Users are created with join requests. They confirm their settings with settings confirm requests. They update their settings with settings update requests.
However, reprocessing this topology does not produce the original results. Specifically, the settings update joiner does not see the user that resulted from the settings confirm joiner, even though in terms of timestamps, many seconds elapse from the time the user is created, to the time the user is confirmed to the time the user updates their settings.
I'm at a loss. I've tried turning off caching/logging on the user table. No idea what to do to make this reprocess properly.
A KStream-KTable join is not 100% deterministic (and might never become 100% deterministic). We are aware of the problem and discuss solutions, to at least mitigate the issue.
One problem is, that if a Consumer fetches from the brokers, we cannot control easily for which topics and/or partitions the broker returns data. And depending on the order in which we receive data from the broker, the result might slightly differ.
One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
This blog post might help, too: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
I was able to partially solve my problem by replacing the code in the question with:
KTable<ByteString, User> users = topology.table(JOIN_REQUESTS)
.mapValue(entityTopologyProcessor::user)
.leftJoin(topology
.stream(CONFIRM_SETTINGS_REQUESTS)
.groupByKey()
.reduce((a, b) -> b),
entityTopologyProcessor::confirmSettings)
.leftJoin(topology
.stream(SETTINGS_UPDATE_REQUESTS)
.groupByKey()
.reduce(entityTopologyProcessor::settingsUpdateReduce),
entityTopologyProcessor::settingsUpdate);
This solution leverages the fact that all table-table joins are deterministic. During reprocess, resulting state may temporarily be incorrect, but once the topology is caught up, the final value is correct (the final timestamp for a given result will still not be deterministic). Generally speaking, this approach, this groups all the events (in this example: join requests, confirm settings requests, settings update requests) for a given entity (in this example: user) into a single task and joins their accumulations into a single product. This example could be extended with delete events by joining another stream at the end that nulls the result.
Beyond this approach, generally, writing a reprocessable topology requires thinking about the topology in two dimensions: real-time and reprocess-time. As of Kafka Streams 1.0.0, this is something of an art for the developer.