Published and waiting for response on a topic with

2019-08-27 11:16发布

问题:

When a publisher expects an answer to a message, how to ensure it will get only relevant answers (to its own messages) when you scale it out?

We have a client process that publishes a message for a server process to answer. Additionally, we have a "listener" process that just needs to consume both the questions and the answers without publishing anything. Also, the server process might be broken to several ones on the future, creating a message cascade. We can't use the request/response, since we need the listener and then again, when we will have the cascade... Besides, we will have several question/answer categories, and request/response in EasyNetQ doesn't support topics.

Our solution with EasyNetQ was simple topic-based publish/subscribe: client publishes to "question" topic, subscribes to "answer", server subscribes to "question" and publishes to "answer", and the listener just subscribes to both.

The problem is when you scale out the client. Two instances of it now publish questions, but since they are both subscribed to a single "answer" topic, one might get the answer to the question published by the other instance, and not get his own.

The solution we found is to have the client use a uniquely named queue when subscribing to "answer" - this way each client will get all of the answers, and just needs to ignore the ones that are not his. However, this solution has some performance drawbacks and also results in uniquely named queues accumulating in RabbitMQ each time a client crashes (or just restarted during development, etc.).

Client, sending an object msg:

string corrId = Guid.NewGuid().ToString();

// Register the corrId in a dictionary
//...

var myMessage = new MyMessage {correlationId =corrId, realMessage = msg};
easyNetQBus.Subscribe<MyMessage>("mqClient"+uniqueSuffix, HandleMsg, x => x.WithTopic("answer"));
easyNetQBus.Publish(myMessage, "question");

// In HandleMsg, we see if we have issued questions with the correlation id that came with the answer (lookup in the dictionary) and if not, ignore it

Server:

easyNetQBus.Subscribe<MyMessage>("mqServer", HandleMsg, x => x.WithTopic("question"));

// In HandleMsg, we publish the answer back to "answer" with the correlation id from the question

Is there another pattern we should be using? We could put inside each message a unique topic/queue to send the answer to, but this complicates the lives of the listener and the flexibility of the future participants in the cascade I mentioned...