AMQP/RabbitMQ - Process messages sequentially

2019-05-16 09:18发布

I have one direct exchange. There is also one queue, bound to this exchange.

I have two consumers for that queue. The consumers are manually ack'ing the messages once they've done the corresponding processing.

The messages are logically ordered/sorted, and should be processed in that order. Is it possible to enforce that all messages are received and processed sequentially accross consumer A and consumer B? In other words, prevent A and B from processing messages at the same time.

Note: the consumers are not sharing the same connection and/or channel. This means I cannot use <channel>.basicQoS(1);.

Rationale of this question: both consumers are identicall. If one goes down, the other queue starts processing messages and everything keeps working without any required intervention.

2条回答
▲ chillily
2楼-- · 2019-05-16 10:01

Usually the point of a MQ system is to distribute workload. Of course, there are some situations where processing of message N depends on result of processing the message N-1, or even the N-1 message itself.

If A and B can't process messages at the same time, then why not just have A or just B? As I see it, you are not saving anything with having 2 consumers in a way that one can work only when the other one is not...

In your case, it would be best to have one consumer but to actually do the parallelisation (not a word really) on the processing part.

Just to add that RMQ is distributing messages evenly to all consumers (in round-robin fashion) regardless on any criteria. Of course this is when prefetch is set to 1, which by default it is. More info on that here, look for "fair dispatch".

查看更多
Bombasti
3楼-- · 2019-05-16 10:05

One approach to handling failover in a case where you want redundant consumers but need to process messages in a specific order is to use the exclusive consumer option when setting up the bind to the queue, and to have two consumers who keep trying to bind even when they can't get the exclusive lock.

The process is something like this:

  1. Consumer A starts first and binds to the queue as an exclusive consumer. Consumer A begins processing messages from the queue.
  2. Consumer B starts next and attempts to bind to the queue as an exclusive consumer, but is rejected because the queue already has an exclusive consumer.
  3. On a recurring basis, consumer B attempts to get an exclusive bind on the queue but is rejected.
  4. Process hosting consumer A crashes.
  5. Consumer B attempts to bind to the queue as an exclusive consumer, and succeeds this time. Consumer B starts processing messages from the queue.
  6. Consumer A is brought back online, and attempts an exclusive bind, but is rejected now.
  7. Consumer B continues to process messages in FIFO order.

While this approach doesn't provide load sharing, it does provide redundancy.

查看更多
登录 后发表回答