Spring Kafka - Consume last N messages for partiti

2020-05-29 09:41发布

问题:

I'm trying to read the requested no of kafka messages. For non transactional messages we would seek from endoffset - N for M partitions start polling and collect messages where current offset is less than end offset for each partitions. For idempotent/transactional messages we have to account for transaction markers/duplicate messages and meaning offsets will not be continuous, in such case endoffset - N will not return N messages and we would need go back and seek for more messages until we have N messages for each partitions or beginning offset is reached

As there are multiple partitions I would need to keep track of all the offsets read so I can stop when all is done. There are two steps, first step to calculate the the start offset (end offset - requested no of messages) and end offset. ( the offsets are not continuous there are gaps) and I would seek the partition to start consuming from start offset. Second step is to poll the messages and count the messages in each partitions and if we don't meet the requested no of messages repeat first and second step again until we met the no of messages for each partition.

Conditions

Initial poll may not return any records so continue polling. Stop polling when you have reached the end offset for each partition or poll returns no results. Check each partition for messages read same as messages requested. If yes mark as complete, if no mark as continue and repeat steps. Account for gaps in messages. Should work for both transactional and non transactional producer.

Question:

How would I go about keeping track of all the messages have been read for each partition and break out of loop ? Messages in each partition will come in order if it is helpful.

Does spring kafka support such use case ? More details can be found here

Update: I'm asking to read last N messages in each partition. Partitions and no of messages is the user input. I would like to keep all the offset management in the memory. In essence we are trying to read the messages in the LIFO order. This makes it tricky as Kafka allows you to read forward not backward.

回答1:

Why is there such a need, I don't understand. Kafka itself manages when there is nothing in the queue. If messages jump from state-to-state, one can have separate queues/topics. However, here's how one can do it.

When we consume messages from a partition using something like -

ConsumerIterator<byte[], byte[]> it = something; //initialize consumer
while (it.hasNext()) {
  MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
  String kafkaMessage = new String(messageAndMetadata.message());
  int partition = messageAndMetadata.partition();
  long offset = messageAndMetadata.offset();
  boolean processed = false;
  do{
    long maxOffset = something; //fetch from db
    //if offset<maxOffset, then process messages and manual commit
    //else busy wait or something more useful
  }while(processed);
}

We get information about about the offsets, partition number and the message itself. You can choose to do anything with this info.

For your use-case, you might also decide to persist the consumed offsets into a database so that the next time, offsets can be adjusted. Also, I would recommend shutdown hookup for cleanup and a final saving the processed offsets to DB.



回答2:

So if I understand you correctly, this should be doable with a standard Kafka Consumer.

Consumer<?, Message> consumer = ...

public Map<Integer, List<Message>> readLatestFromPartitions(String topic, Collection<Integer> partitions, int count) {

    // create the TopicPartitions we want to read
    List<TopicPartition> tps = partitions.stream().map(p -> new TopicPartition(topic, p)).collect(toList());
    consumer.assign(tps);

    // create and initialize the result map
    Map<Integer, List<Message>> result = new HashMap<>();
    for (Integer i : partitions) { result.add(new ArrayList<>()); }

    // read until the expected count has been read for all partitions
    while (result.valueSet().stream().findAny(l -> l.size() < count)) {
        // read until the end of the topic
        ConsumerRecords<?, Message> records = consumer.poll(Duration.ofSeconds(5));
        while (records.count() > 0) {
            Iterator<ConsumerRecord<?, Message>> recordIterator = records.iterator();
            while (recordIterator.hasNext()) {
                ConsumerRecord<?, Message> record = recordIterator.next();
                List<Message> addTo = result.get(record.partition);
                // only allow 10 entries per partition
                if (addTo.size() >= count) {
                    addTo.remove(0);
                }
                addTo.add(record.value);
            }
            records = consumer.poll(Duration.ofSeconds(5));
        }
        // now we have read the whole topic for the given partitions.
        // if all lists contain the expected count, the loop will finish;
        // otherwise it will wait for more data to arrive.
    }

    // the map now contains the messages in the order they were sent,
    // we want them reversed (LIFO)
    Map<Integer, List<Message>> returnValue = new HashMap<>();
    result.forEach((k, v) -> returnValue.put(k, Collections.reverse(v)));
    return returnValue;
}