RabbitMQ 3.5 now supports message priority; However, I am unable to build a working example. I've placed my code below. It includes the output that I expect and the output I actually. I'd be interested in more documentation, and/or a working example.
So my question in short: How do I get message priority to work in Rabbit 3.5.0.0?
Publisher:
using System;
using RabbitMQ.Client;
using System.Text;
using System.Collections.Generic;
class Publisher
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
IDictionary <String , Object> args = new Dictionary<String,Object>() ;
args.Add(" x-max-priority ", 10);
channel.QueueDeclare("task_queue1", true, false, true, args);
for (int i = 1 ; i<=10; i++ )
{
var message = "Message";
var body = Encoding.UTF8.GetBytes(message + " " + i);
var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
properties.Priority = Convert.ToByte(i);
channel.BasicPublish("", "task_queue1", properties, body);
}
}
}
}
}
Consumer:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
using System.Collections.Generic;
namespace Consumer
{
class Worker
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
IDictionary<String, Object> args = new Dictionary<String, Object>();
channel.BasicQos(0, 1, false);
var consumer = new QueueingBasicConsumer(channel);
IDictionary<string, object> consumerArgs = new Dictionary<string, object>();
channel.BasicConsume( "task_queue1", false, "", args, consumer);
Console.WriteLine(" [*] Waiting for messages. " +
"To exit press CTRL+C");
while (true)
{
var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message);
channel.BasicAck(ea.DeliveryTag, false);
}
}
}
}
}
}
Actual output:
[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 1
[x] Received Message 2
[x] Received Message 3
[x] Received Message 4
[x] Received Message 5
[x] Received Message 6
[x] Received Message 7
[x] Received Message 8
[x] Received Message 9
[x] Received Message 10
Expected output:
[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 10
[x] Received Message 9
[x] Received Message 8
[x] Received Message 7
[x] Received Message 6
[x] Received Message 5
[x] Received Message 4
[x] Received Message 3
[x] Received Message 2
[x] Received Message 1
UPDATE #1. I found an example in Java here. However it's the Rabbit 3.4.x.x. addin that was incorporated into 3.5. The only difference I can see is that they express the priority as an int and mine is a byte. But I feel like that's a red herring. I'm at a bit of a loss here.
Another possibility (for future searchers)
The "Push" method of message delivery doesn't seem to respect Priority.
http://rabbitmq.docs.pivotal.io/35/rabbit-web-docs/dotnet-api-guide.html.html
The below is a quote from the URL above. I've bolded the important part.
Retrieving Messages By Subscription ("push API")
Another way to receive messages is to set up a subscription using the IBasicConsumer interface. The messages will then be delivered automatically as they arrive, rather than having to be requested proactively. One way to implement a consumer is to use the convenience class EventingBasicConsumer, which dispatches deliveries and other consumer lifecycle events as C# events:
By changing to the "pull" method, Priority seems to be respected. However, in the quote below (from the same url above), it looks like there is a trade-off (that I've bolded)
Fetching Individual Messages ("pull API") To retrieve individual messages, use IModel.BasicGet. The returned value is an instance of BasicGetResult, from which the header information (properties) and message body can be extracted:
Since noAck = false above, you must also call IModel.BasicAck to acknowledge that you have successfully received and processed the message:
Note that fetching messages using this API is relatively inefficient. If you'd prefer RabbitMQ to push messages to the client, see the next section.
(The "next" section in this case takes you to the "push" method at the top of this post)
A similar RabbitMq Priority Queue Implementation in Node JS
Install amqplib
In order to test, we are required to have amqplib installed
Publisher (send.js)
Subscriber (receive.js)
Run:
It will create a queue named 'hello' and will flood it with '1000' sample messages using default AMQP exchange.
It will act as a consumer to subscribe to messages waiting in the queue.
Well I solved it. It was a dumb mistake. I wrote:
It should have been
I'll leave this up so other people can have a working example of Rabbitmq 3.5's Priority Queues in C#.