RabbitMQ plugin to remove duplicate messages

2020-06-04 08:09发布

问题:

I have a RabbitMQ queues for documents generation. Basically, each document has type and state (new, processing, ready), so I use topic exchange with routing keys like type.state. Every time document changes I send the message with last document description to the exchange and it works good enough.

However sometimes document can be processed twice:

  1. User send new document. So new message report.new is sent to exchange.
  2. While worker hasn't started document processing (the queue hasn't yet reached) user updated the document. The new message report.new for the same document is sent.
  3. So now worker get the first message and start his work, while the document was changed and so this work is totally senseless.

For now I'm just add small code into workers, comparing last_modified document key from the message with the one from the database and ack the message if they are not the same. However I don't think this is the best solution.

My idea is to add ID to message headers and have some RabbitMQ plugin which will remove older messages with the same ID from the queue.

Thanks.

P.S. Maybe another MQ engine can be useful here? E.g. maybe ActiveMQ has such a feature?

回答1:

Ok, i've read about RabbitMQ inner architecture and find out it's impossible. So the way around for somebody looking for it.

  1. Send only document ID in message body
  2. Create a key-value store for worker (i use memcached for this). Key is ID value is timestamp of last worker run for this ID.
  3. When worker receives the message it checks if message timestamp greater then the one from key-value store. If it is, then update timestamp in the store and run the task, otherwise just skip it.


回答2:

You can check this plugin I wrote which allows to de-duplicate messages published within the broker.

You can de-duplicate on the exchange or at the queue according to your needs. Only thing your publisher needs to do is to set the x-deduplicate-message message header with the ID of your message.



回答3:

As you wrote, ActiveMQ has "duplicate message detection", but it works differently. It does not remove old message from the queue but it does not add new message to it instead. So it works the same as plugin for RabbitMQ.