How do you process messages in parallel while ensu

2019-01-20 00:11发布

问题:

Let's say you have an entity, say, "Person" in your system and you want to process events that modify various Person entities. It is important that:

  • Events for the same Person are processed in FIFO order
  • Multiple Person event streams be processed in parallel by different threads/processes

We have an implementation that solves this using a shared database and locks. Threads compete to acquire the lock for a Person and then process events in order after acquiring the lock. We'd like to move to a message queue to avoid polling and locking, which we feel would reduce load on the DB and simplify the implementation of the consumer code.

I've done some research into ActiveMQ, RabbitMQ, and HornetQ but I don't see an obvious way to implement this.

ActiveMQ supports consumer subscription wildcards, but I don't see a way to limit the concurrency on each queue to 1. If I could do that, then the solution would be straightforward:

  • Somehow tell broker to allow a concurrency of 1 for all queues starting with: /queue/person.
  • Publisher writes event to queue using Person ID in the queue name. e.g.: /queue/person.20
  • Consumers subscribe to the queue using wildcards: /queue/person.>
  • Each consumer would receive messages for different person queues. If all person queues were in use, some consumers may sit idle, which is ok
  • After processing a message, the consumer sends an ACK, which tells the broker it's done with the message, and allows another message for that Person queue to be sent to another consumer (possibly the same one)

ActiveMQ came close: You can do wildcard subscriptions and enable "exclusive consumer", but that combination results in a single consumer receiving all messages sent to all matching queues, reducing your concurrency to 1 across all Persons. I feel like I'm missing something obvious.

Questions:

  • Is there way to implement the above approach with any major message queue implementation? We are fairly open to options. The only requirement is that it run on Linux.
  • Is there a different way to solve the general problem that I'm not considering?

Thanks!

回答1:

It looks like JMSXGroupID is what I'm looking for. From the ActiveMQ docs:

http://activemq.apache.org/message-groups.html

Their example use case with stock prices is exactly what I'm after. My only concern is what happens if the single consumer dies. Hopefully the broker will detect that and pick another consumer to associate with that group id.



回答2:

One general way to solve this problem (if I got your problem right) is to introduce some unique property for Person (say, database-level id of Person) and use hash of that property as index of FIFO queue to put that Person in.
Since hash of that property can be unwieldy big (you can't afford 2^32 queues/threads), use only N the least significant bits of that hash. Each FIFO queue should have dedicated worker that will work upon it -- voila, your requirements are satisfied!

This approach have one drawback -- your Persons must have well-distributed ids to make all queues work with more-or-less equal load. If you can't guarantee that, consider using round-robin set of queues and track which Persons are being processed now to ensure sequential processing for same person.



回答3:

If you already have a system that allows shared locks, why not have a lock for every queue, which consumers must acquire before they read from the queue?