Broadcast rabbitMq messages with ServiceStack

2019-05-11 23:36发布

问题:

Is there way to make method

myMessageService.CreateMessageQueueClient().Publish(myMessage);

broadcast messages to all receivers?

回答1:

The problem is, that RegisterHandler<T> internally uses the type of T to build the queue name it listens to. So the only chance you have is to go off the track with the following solution by utilizing a custom fanout exchange to multiple queues:

var fanoutExchangeName = string.Concat(QueueNames.Exchange,
                                       ".",
                                       ExchangeType.Fanout);

At some point of your system you have to ensure the exchange with the following code:

var rabbitMqServer = new RabbitMqServer();
var messageProducer = (RabbitMqProducer) rabbitMqServer.CreateMessageProducer();
var channel = messageProducer.Channel; // you can use any logic to acquire a channel here - this is just a demo
channel.ExchangeDeclare(fanoutExchangeName,
                        ExchangeType.Fanout,
                        durable: true,
                        autoDelete: false,
                        arguments: null);

Now we can publish messages to this fanout:

var message = new Message<T>(yourInstance);
messageProducer.Publish(QueueNames<T>.In,    // routing key
                        message,             // message
                        fanoutExchangeName); // exchange

So now the message gets published to our exchange, but we need to bind queues to the exchange in the consuming components, which we do with:

var rabbitMqServer = new RabbitMqServer();
var messageQueueClient = (RabbitMqQueueClient) rabbitMqServer.CreateMessageQueueClient();
var channel = messageQueueClient.Channel; // you just need to get the channel

var queueName = messageQueueClient.GetTempQueueName();
channel.QueueBind(queueName,          // queue
                  fanoutExchangeName, // exchange
                  QueueName<T>.In);   // routing key

The queue is automatically deleted after the last (and only) consumer disconnects and will not survive a restart of RabbitMq.

The hacky part is now the listening though ...

var consumer = new RabbitMqBasicConsumer(channel);
channel.BasicConsume(queueName,
                     false,
                     consumer);

Task.Run(() =>
            {
                while (true)
                {
                    BasicGetResult basicGetResult;
                    try
                    {
                        basicGetResult = consumer.Queue.Dequeue();
                    }
                    catch (EndOfStreamException endOfStreamException)
                    {
                        // this is ok
                        return;
                    }
                    catch (OperationInterruptedException operationInterruptedException)
                    {
                        // this is ok
                        return;
                    }
                    catch (Exception ex)
                    {
                        throw;
                    }
                    var message = basicGetResult.ToMessage<T>();
                    // TODO processing
                }
            });

This solution does not provide any auto-reconnect, filters, or other stuff though.

A basic walkthrough is available here.

Edit: One thing that just got to my mind: You can use a ServiceStack.Messaging.MessageHandler<T> instance to provide replies and retries with ease.