Is there way to make method
myMessageService.CreateMessageQueueClient().Publish(myMessage);
broadcast messages to all receivers?
Is there way to make method
myMessageService.CreateMessageQueueClient().Publish(myMessage);
broadcast messages to all receivers?
The problem is, that RegisterHandler<T>
internally uses the type of T
to build the queue name it listens to. So the only chance you have is to go off the track with the following solution by utilizing a custom fanout exchange to multiple queues:
var fanoutExchangeName = string.Concat(QueueNames.Exchange,
".",
ExchangeType.Fanout);
At some point of your system you have to ensure the exchange with the following code:
var rabbitMqServer = new RabbitMqServer();
var messageProducer = (RabbitMqProducer) rabbitMqServer.CreateMessageProducer();
var channel = messageProducer.Channel; // you can use any logic to acquire a channel here - this is just a demo
channel.ExchangeDeclare(fanoutExchangeName,
ExchangeType.Fanout,
durable: true,
autoDelete: false,
arguments: null);
Now we can publish messages to this fanout:
var message = new Message<T>(yourInstance);
messageProducer.Publish(QueueNames<T>.In, // routing key
message, // message
fanoutExchangeName); // exchange
So now the message gets published to our exchange, but we need to bind queues to the exchange in the consuming components, which we do with:
var rabbitMqServer = new RabbitMqServer();
var messageQueueClient = (RabbitMqQueueClient) rabbitMqServer.CreateMessageQueueClient();
var channel = messageQueueClient.Channel; // you just need to get the channel
var queueName = messageQueueClient.GetTempQueueName();
channel.QueueBind(queueName, // queue
fanoutExchangeName, // exchange
QueueName<T>.In); // routing key
The queue is automatically deleted after the last (and only) consumer disconnects and will not survive a restart of RabbitMq.
The hacky part is now the listening though ...
var consumer = new RabbitMqBasicConsumer(channel);
channel.BasicConsume(queueName,
false,
consumer);
Task.Run(() =>
{
while (true)
{
BasicGetResult basicGetResult;
try
{
basicGetResult = consumer.Queue.Dequeue();
}
catch (EndOfStreamException endOfStreamException)
{
// this is ok
return;
}
catch (OperationInterruptedException operationInterruptedException)
{
// this is ok
return;
}
catch (Exception ex)
{
throw;
}
var message = basicGetResult.ToMessage<T>();
// TODO processing
}
});
This solution does not provide any auto-reconnect, filters, or other stuff though.
A basic walkthrough is available here.
Edit:
One thing that just got to my mind: You can use a ServiceStack.Messaging.MessageHandler<T>
instance to provide replies and retries with ease.