How to selectively delete messages from an AMQP (R

2019-03-27 17:17发布

问题:

I'd like to selectively delete messages from an AMQP queue without even reading them.

The scenario is as follows:

Sending side wants to expire messages of type X based on a fact that new information of type X arrived. Because it's very probable that the subscriber didn't consume latest message of type X yet, publisher should just delete previous X-type messages and put a newest one into the queue. The whole operation should be transparent to the subscriber - in fact he should use something as simple as STOMP to get the messages.

How to do it using AMQP? Or maybe it's more convenient in another messaging protocol?

I'd like to avoid a complicated infrastructure. The whole messaging needed is as simple as above: one queue, one subscriber, one publisher, but the publisher must have an ability to ad-hoc deleting the messages for a given criteria.

The publisher client will use Ruby but actually I'd deal with any language as soon as I discover how to do it in the protocol.

回答1:

You cannot currently do this in RabbitMQ (or more generally, in AMQP) automatically. But, here's an easy workaround.

Let's say you want to send three types of messages: Xs, Ys and Zs. If I understand your question correctly, when an X message arrives, you want the broker to forget all other X messages that haven't been delivered.

This is fairly easy to do in RabbitMQ:

  • the producer declares three queues: X, Y, and Z (they're automatically bound to the default exchange with their names as routing keys, which is exactly what we want),
  • when publishing a message, the producer first purges the relevant queue (so, if it's publishing an X message, it first purges the X queue); this effectively removes the outdated messages,
  • the consumer simply consumes from the queue it wants (X for X messages, Y for Y messages, etc.); from its point of view, it just has to do a basic.get to get the next relevant message.

This implies a race condition when two producers send the same type of message at the about the same time. The result is that its possible for the a queue to have two (or more) messages at the same time, but since the number of messages is upper-bounded by the number of producers, and since the superfluous messages are purged on the next publish, this shouldn't be much of a problem.

To summarize, this solution has just one extra step from the optimal solution, namely purge queue X before publishing a message of type X.

If you need any help setting up this configuration, the perfect place to ask for advice is the rabbitmq-discuss mailing list.



回答2:

You do not want a message queue, you want a key-value database. For instance you could use Redis or Tokyo Tyrant to get a simple network-accessible key-value database. Or just use a memcache.

Each message type is a key. When you write a new message with the same key, it overwrites the previous value so the reader of this database will never be able to get out of date information.

At this point, you only need a message queue to establish the order in which keys should be read, if that is important. Otherwise, just continually scan the database. If you do continually scan the database, it is best to put the database near the readers to reduce network traffic.

I would probably do something like this key: typecode value: lastUpdated, important data

Then I would send messages that contain typecode, lastUpdated That way the reader can compare lastupdated for that key to the one that they last read from the database and skip reading it because they are already up to date.

If you really need to do this with AMQP, then use RabbitMQ and a custom exchange type, specifically a Last Value Cache Exchange. Example code is here https://github.com/squaremo/rabbitmq-lvc-plugin



回答3:

This question has high visibility due the title of it. Going through the description dwells with more specific scenario. So for those users who are looking to actually delete the next (remember FIFO) message from the queue, you can make use of rabbitmqadmin and issue the below command:

rabbitmqadmin get queue=queuename requeue=false count=1

This command is essentially consuming the message and doing nothing. A complete command with flag to take backup of the message(s) might look like the below one. Make sure to add any other parameters as per your requirement.

sudo python rabbitmqadmin -V virtualhostname -u user -p pass get queue=queuename requeue=false count=1 payload_file=~/origmsg



回答4:

It seems to work also from the RabbitMQ Web-UI, if you just want to remove first n messages from the queue

  • select the queue from tab "Queues", scroll down to section "Get messages"
  • set parameter "Requeue=No" and number of messages you want to remove from the queue
  • press "Get messages" button