Let's say you have an entity, say, "Person" in your system and you want to process events that modify various Person entities. It is important that:
- Events for the same Person are processed in FIFO order
- Multiple Person event streams be processed in parallel by different threads/processes
We have an implementation that solves this using a shared database and locks. Threads compete to acquire the lock for a Person and then process events in order after acquiring the lock. We'd like to move to a message queue to avoid polling and locking, which we feel would reduce load on the DB and simplify the implementation of the consumer code.
I've done some research into ActiveMQ, RabbitMQ, and HornetQ but I don't see an obvious way to implement this.
ActiveMQ supports consumer subscription wildcards, but I don't see a way to limit the concurrency on each queue to 1. If I could do that, then the solution would be straightforward:
- Somehow tell broker to allow a concurrency of 1 for all queues starting with: /queue/person.
- Publisher writes event to queue using Person ID in the queue name. e.g.: /queue/person.20
- Consumers subscribe to the queue using wildcards: /queue/person.>
- Each consumer would receive messages for different person queues. If all person queues were in use, some consumers may sit idle, which is ok
- After processing a message, the consumer sends an ACK, which tells the broker it's done with the message, and allows another message for that Person queue to be sent to another consumer (possibly the same one)
ActiveMQ came close: You can do wildcard subscriptions and enable "exclusive consumer", but that combination results in a single consumer receiving all messages sent to all matching queues, reducing your concurrency to 1 across all Persons. I feel like I'm missing something obvious.
Questions:
- Is there way to implement the above approach with any major message queue implementation? We are fairly open to options. The only requirement is that it run on Linux.
- Is there a different way to solve the general problem that I'm not considering?
Thanks!