How to retract a message in RabbitMQ?

2019-03-15 06:37发布

问题:

I've got something like a job queue over RabbitMQ and, upon a request to cancel a job, I'd like to retract the tasks that have not yet started processing (their messages have not been ack'd), which corresponds to retracting these messages from the queues that they've been routed to.

I haven't found this functionality in AMQP or in the RabbitMQ API; perhaps I haven't searched well enough? Or will I have to use a workaround (it's not hard, but still)?

回答1:

I would solve this scenario by having the worker check some sort of authoritative data source to determine if the the job should proceed or not. For example, the worker would check the job's status in a database to see if the job was canceled already.

For scenarios where the speed of processing jobs may be faster than the speed with which the authoritative store can be updated and read, a less guaranteed data store that trades speed for other characteristics may be useful.

An example of this would be to use Redis as the store for canceling processing of a message instead of a relational DB like MySQL. Redis is very fast, but makes fewer guarantees regarding the data it holds, whereas MySQL is much slower, but offers more guarantees about the data it holds.

In the end, the concept of checking with another source for whether or not to process a message is the same, but the way you implement that depends on your particular scenario.



回答2:

RabbitMQ doesn't let you modify or delete messages after they've been enqueued. For that, you want some kind of database to hold the state of each job, and to use RabbitMQ to notify interested parties of changes in that state.

For lowish volumes, you can kludge it together with a queue per job. Create the queue, post the job description to the queue, announce the name of the queue to the workers. If the job needs to be cancelled before it is processed, deleted the job's queue; when the workers come to fetch the job description, they'll notice the queue has vanished.

Lighterweight and generally better would be to use redis or another key/value store to hold the job state (with a deleted or absent record meaning a cancelled or nonexistent job) and to use rabbitmq to notify about new/removed/changed records in the key/value store.



回答3:

At least two ways to achieve your target:

  • basic.reject will requeue message if requeue=true is set (otherwise it will reject message).
    (supported since RabbitMQ 2.0.0; see http://www.rabbitmq.com/blog/2010/08/03/well-ill-let-you-go-basicreject-in-rabbitmq/).

  • basic.recover will ask broker to redeliver unacked messages on channel.



回答4:

You need to subscribe to all the queues to which messages have been routed, and consume them with ack.

For instance if you publish to a topic exchange with "test" as the routing key, and there are 3 persistent queues which subscribe to "test" you would need to consume those three queues. It might be better to add another queue which your consumer processes would also listen too, and tell them to ignore those messages.

An alternative, since you are using RabbitMQ, is to write a custom exchange plugin that will accept some out of band instruction to clear all queues. For instance you might have that exchange read a special message header that tells it to clear all queues to which this message is destined. This does require writing Erlang code, but there are 4 different exchange types implemented so you would only need to copy the most similar one and write the code for the new bahaviours. If you only use custom headers for this, then the body of the message can be a normal message for the consumers.

To sum up:

1) the publisher needs to consume the messages itself 2) the publisher can send a special message in a special queue to tell consumers to ignore the message 3) the publisher can send a special message to a custom exchange that will clear any existing messages from the queues before sending this special message to consumers.