I am aware that it is not possible to order multiple partitions in Kafka and that partition ordering is only guaranteed for a single consumer within a group (for a single partition). However with Kafka Streams 0.10 is it now possible to achieve this? If we use the timestamp feature so that each message in each partition maintains the order, at the consumer side, lets say with Kafka Streams 0.10 is this now possible? Assuming we receive all messages could we not sort all the partitions based on the consumed timestamp and perhaps forward them on to a separate topic for consumption?
At the moment I need to maintain ordering, but this means having a single partition with a single consumer thread. I wanted to change this to multiple partitions to increase parallelism but somehow 'get them in order'.
Any thoughts? thank you.
There are two problems you are facing in such a situation:
- A Kafka topic that has multiple partitions, and the fact Kafka does not guarantee global ordering (of the topic) for such multi-partition topics.
- The possibility of late-arriving / out-of-order messages for the topic and its partitions, which is related to time and timestamps.
I am aware that it is not possible to order multiple partitions in Kafka and that partition ordering is only guaranteed for a single consumer within a group (for a single partition). However with Kafka Streams 0.10 is it now possible to achieve this?
The short answer is: No, it is still not possible to achieve global order when you are reading from Kafka topics that have multiple partitions.
Also, "partition ordering" means "partition ordering based on the offsets of the messages in a partition". The ordering guarantee is not related to the timestamps of the messages.
Lastly, ordering is only guaranteed if max.in.flight.requests.per.connection == 1
:
Producer configuration settings from the Apache Kafka documentation:
max.in.flight.requests.per.connection
(default: 5
): The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).
Note that at this point we are talking about a combination of consumer behavior (which is what your original question started out with) and producer behavior in Kafka.
If we use the timestamp feature so that each message in each partition maintains the order, at the consumer side, lets say with Kafka Streams 0.10 is this now possible?
Even with the timestamp feature we still don't achieve "each message in each partition maintains the order". Why? Because of the possibility of late-arriving / out-of-order messages.
A partition is ordered by offsets, but it is not guaranteed to be ordered by timestamps. The following contents of a partition is perfectly possible in practice (timestamps are normally milliseconds-since-the-epoch):
Partition offsets 0 1 2 3 4 5 6 7 8
Timestamps 15 16 16 17 15 18 18 19 17
^^
oops, late-arriving data!
What are late-arriving / out-of-order messages? Imagine you have sensors scattered all over the world, all of which measure their local temperature and send the latest measurement to a Kafka topic. Some sensors may have unreliable Internet connectivity, thus their measurements may arrive with a delay of minutes, hours, or even days. Eventually their delayed measurements will make it to Kafka, but they will arrive "late". Same for mobile phones in a city: Some may run out of battery/energy and need to be recharged before they can send their data, some may lose Internet connectivity because you're driving underground, etc.
Assuming we receive all messages could we not sort all the partitions based on the consumed timestamp and perhaps forward them on to a separate topic for consumption?
In theory yes, but in practice that's quite difficult. The assumption "we receive all messages" is actually challenging for a streaming system (even for a batch processing system, though presumably the problem of late-arriving data is often simply ignored here). You never know whether you actually have received "all messages" -- because of the possibility of late-arriving data. If you receive a late-arriving message, what do you want to happen? Re-process/re-sort "all" the messages again (now including the late-arriving message), or ignore the late-arriving message (thus computing incorrect results)? In a sense, any such global ordering achieved by "let's sort all of them" is either very costly or best effort.
I'm not using Kafka streams - but it is possible to do this with the normal Consumer.
First sort the partitions - this assumes you've already seeked to the offset in each you want or used Consumer Group to do it.
private List<List<ConsumerRecord<String, String>>> orderPartitions(ConsumerRecords<String, String> events) {
Set<TopicPartition> pollPartitions = events.partitions();
List<List<ConsumerRecord<String, String>>> orderEvents = new ArrayList<>();
for (TopicPartition tp : pollPartitions) {
orderEvents.add(events.records(tp));
}
// order the list by the first event, each list is ordered internally also
orderEvents.sort(new PartitionEventListComparator());
return orderEvents;
}
/**
* Used to sort the topic partition event lists so we get them in order
*/
private class PartitionEventListComparator implements Comparator<List<ConsumerRecord<String, String>>> {
@Override
public int compare(List<ConsumerRecord<String, String>> list1, List<ConsumerRecord<String, String>> list2) {
long c1 = list1.get(0).timestamp();
long c2 = list2.get(0).timestamp();
if (c1 < c2) {
return -1;
} else if (c1 > c2) {
return 1;
}
return 0;
}
}
Then just round robin the partitions to get the events in order - in practice I've found this to work.
ConsumerRecords<String, String> events = consumer.poll(500);
int totalEvents = events.count();
log.debug("Polling topic - recieved " + totalEvents + " events");
if (totalEvents == 0) {
break; // no more events
}
List<List<ConsumerRecord<String, String>>> orderEvents = orderPartitions(events);
int cnt = 0;
// Each list is removed when it is no longer needed
while (!orderEvents.isEmpty() && sent < max) {
for (int j = 0; j < orderEvents.size(); j++) {
List<ConsumerRecord<String, String>> subList = orderEvents.get(j);
// The list contains no more events, or none in our time range, remove it
if (subList.size() < cnt + 1) {
orderEvents.remove(j);
log.debug("exhausted partition - removed");
j--;
continue;
}
ConsumerRecord<String, String> event = subList.get(cnt);
cnt++
}