How to requeue messages in RabbitMQ

2019-01-31 08:54发布

问题:

After the consumer gets a message, consumer/worker does some validations and then call web service. In this phase, if any error occurs or validation fails, we want the message put back to the queue it was originally consumed from.

I have read RabbitMQ documentation. But I am confused about differences between reject, nack and cancel methods.

回答1:

Short answer:

To requeue specific message you can pick both basic.reject or basic.nack with multiple flag set to false.

basic.consume calling may also results to messages redelivering if you are using message acknowledge and there are un-acknowledged message on consumer at specific time and consumer exit without ack-ing them.

basic.recover will redeliver all un-acked messages on specific channel.

Long answer:

basic.reject and basic.nack both serves to same purpose - drop or requeue message that can't be handled by specific consumer (at the given moment, under certain conditions or at all). The main difference between them is that basic.nack supports bulk messages processing, whilst basic.reject doesn't.

This difference described in Negative Acknowledgements article on official RabbitMQ web site:

The AMQP specification defines the basic.reject method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them. Unfortunately, basic.reject provides no support for negatively acknowledging messages in bulk.

To solve this, RabbitMQ supports the basic.nack method that provides all the functionality of basic.reject whilst also allowing for bulk processing of messages.

To reject messages in bulk, clients set the multiple flag of the basic.nack method to true. The broker will then reject all unacknowledged, delivered messages up to and including the message specified in the delivery_tag field of the basic.nack method. In this respect, basic.nack complements the bulk acknowledgement semantics of basic.ack.

Note, that basic.nack method is RabbitMQ-specific extension while basic.reject method is part of AMQP 0.9.1 specification.

As to basic.cancel method, it used to notify server that client stops message consuming. Note, that client may receive arbitrary messages number between basic.cancel method sending an receiving the cancel-ok reply. If message acknowledge is used by client and it has any un-acknowledged messages they will be moved back to the queue they originally was consumed from.

basic.recover has some limitations in RabbitMQ: it - basic.recover with requeue=false - basic.recover synchronicity

In addition to errata, according to RabbitMQ specs basic.recover has partial support (Recovery with requeue=false is not supported.)

Note about basic.consume:

When basic.consume started without auto-ack (no­ack=false) and there are some pending messages non-acked messages, then when consumer get canceled (dies, fatal error, exception, whatever) that pending messages will be redelivered. Technically, that pending messages will not be processed (even dead-lettered) until consumer release them (ack/nack/reject/recover). Only after that they will be processed (e.g. deadlettered).

For example, let say we post originally 5 message in a row:

Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)

And then consume 3 of them, but not ack them, and then cancel consumer. We will have this situation:

Queue(main) (tail) { [4] [3] [2*] [1*] [0*] } (head)

where star (*) notes that redelivered flag set to true.

Assume that we have situation with dead-lettered exchange set and queue for dead-lettered messages

Exchange(e-main)                                   Exchange(e-dead) 
  Queue(main){x-dead-letter-exchange: "e-dead"}       Queue(dead) 

And assume we post 5 message with expire property set to 5000 (5 sec):

Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)
Queue(dead) (tail) { }(head)

and then we consume 3 message from main queue and hold them for 10 second:

Queue(main) (tail) { [2!] [1!] [0!] } (head)
Queue(dead) (tail) { [4*] [3*] } (head)

where exclamation mark (!) stands for unacked message. Such messages can't be delivered to any consumer and they normally can't be viewed in management panel. But let's cancel consumer, remember, that it still hold 3 un-acked message:

Queue(main) (tail) { } (head)
Queue(dead) (tail) { [2*] [1*] [0*] [4*] [3*] } (head)

So now that 3 messages which was in the head put back to original queue, but as they has per-message TTL set, they are dead-lettered to the tail of dead-letter queue (sure, via dead-letter exchange).

P.S.:

Consuming message aka listening for new one is somehow different from direct queue access (getting one or more message without taking care of others). See basic.get method description for more.