RabbitMQ 3.5 and Message Priority

2020-03-03 05:13发布

RabbitMQ 3.5 now supports message priority; However, I am unable to build a working example. I've placed my code below. It includes the output that I expect and the output I actually. I'd be interested in more documentation, and/or a working example.

So my question in short: How do I get message priority to work in Rabbit 3.5.0.0?

Publisher:

using System;
using RabbitMQ.Client;
using System.Text;
using System.Collections.Generic;

class Publisher
{

    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                IDictionary <String , Object> args = new Dictionary<String,Object>() ;
                args.Add(" x-max-priority ", 10);
                channel.QueueDeclare("task_queue1", true, false, true, args);

                for (int i = 1 ; i<=10; i++ )
                {
                    var message = "Message";
                    var body = Encoding.UTF8.GetBytes(message + " " + i);
                    var properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);
                    properties.Priority = Convert.ToByte(i);
                    channel.BasicPublish("", "task_queue1", properties, body);
                }
            }
        }
    }
}

Consumer:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading;
using System.Collections.Generic;

namespace Consumer
{ 
    class Worker
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    IDictionary<String, Object> args = new Dictionary<String, Object>();                      
                    channel.BasicQos(0, 1, false);
                    var consumer = new QueueingBasicConsumer(channel);
                    IDictionary<string, object> consumerArgs = new Dictionary<string, object>();
                    channel.BasicConsume( "task_queue1", false, "", args, consumer);
                    Console.WriteLine(" [*] Waiting for messages. " +
                                      "To exit press CTRL+C");
                    while (true)
                    {
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine(" [x] Received {0}", message);
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                }
            }
        }
    }
}

Actual output:

[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 1
[x] Received Message 2
[x] Received Message 3
[x] Received Message 4
[x] Received Message 5
[x] Received Message 6
[x] Received Message 7
[x] Received Message 8
[x] Received Message 9
[x] Received Message 10

Expected output:

[*] Waiting for messages. To exit press CTRL+C
[x] Received Message 10
[x] Received Message 9
[x] Received Message 8
[x] Received Message 7
[x] Received Message 6
[x] Received Message 5
[x] Received Message 4
[x] Received Message 3
[x] Received Message 2
[x] Received Message 1

UPDATE #1. I found an example in Java here. However it's the Rabbit 3.4.x.x. addin that was incorporated into 3.5. The only difference I can see is that they express the priority as an int and mine is a byte. But I feel like that's a red herring. I'm at a bit of a loss here.

标签: c# rabbitmq
3条回答
Ridiculous、
2楼-- · 2020-03-03 06:09

Another possibility (for future searchers)

The "Push" method of message delivery doesn't seem to respect Priority.

http://rabbitmq.docs.pivotal.io/35/rabbit-web-docs/dotnet-api-guide.html.html

The below is a quote from the URL above. I've bolded the important part.

Retrieving Messages By Subscription ("push API")

Another way to receive messages is to set up a subscription using the IBasicConsumer interface. The messages will then be delivered automatically as they arrive, rather than having to be requested proactively. One way to implement a consumer is to use the convenience class EventingBasicConsumer, which dispatches deliveries and other consumer lifecycle events as C# events:

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                {
                    var body = ea.Body;
                    // ... process the message
                    ch.BasicAck(ea.DeliveryTag, false);
                };  
String consumerTag = channel.BasicConsume(queueName, false, consumer);

By changing to the "pull" method, Priority seems to be respected. However, in the quote below (from the same url above), it looks like there is a trade-off (that I've bolded)

Fetching Individual Messages ("pull API") To retrieve individual messages, use IModel.BasicGet. The returned value is an instance of BasicGetResult, from which the header information (properties) and message body can be extracted:

bool noAck = false;
BasicGetResult result = channel.BasicGet(queueName, noAck);
if (result == null) {
    // No message available at this time.
} else {
    IBasicProperties props = result.BasicProperties;
    byte[] body = result.Body;
    ...

Since noAck = false above, you must also call IModel.BasicAck to acknowledge that you have successfully received and processed the message:

    ...
    // acknowledge receipt of the message
    channel.BasicAck(result.DeliveryTag, false);
}

Note that fetching messages using this API is relatively inefficient. If you'd prefer RabbitMQ to push messages to the client, see the next section.

(The "next" section in this case takes you to the "push" method at the top of this post)

查看更多
家丑人穷心不美
3楼-- · 2020-03-03 06:11

A similar RabbitMq Priority Queue Implementation in Node JS

Install amqplib

In order to test, we are required to have amqplib installed

npm install amqplib

Publisher (send.js)

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

function bail(err, conn) {
  console.error(err);
  if (conn) conn.close(function() { process.exit(1); });
}

function on_connect(err, conn) {
  if (err !== null) return bail(err);

  // name of queue
  var q = 'hello';
  var msg = 'Hello World!';
  var priorityValue = 0;

  function on_channel_open(err, ch) {
    if (err !== null) return bail(err, conn);
    // maxPriority : max priority value supported by queue
    ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) {
      if (err !== null) return bail(err, conn);

      for(var index=1; index<=100; index++) {
             priorityValue = Math.floor((Math.random() * 10));
             msg = 'Hello World!' + ' ' + index + ' ' + priorityValue;
             ch.publish('', q, new Buffer(msg), {priority: priorityValue});
             console.log(" [x] Sent '%s'", msg);
      }

      ch.close(function() { conn.close(); });
    });
  }

  conn.createChannel(on_channel_open);
}

amqp.connect(on_connect);

Subscriber (receive.js)

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

function bail(err, conn) {
  console.error(err);
  if (conn) conn.close(function() { process.exit(1); });
}

function on_connect(err, conn) {
  if (err !== null) return bail(err);
  process.once('SIGINT', function() { conn.close(); });

  var q = 'hello';

  function on_channel_open(err, ch) {
    ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) {
      if (err !== null) return bail(err, conn);
      ch.consume(q, function(msg) { // message callback
        console.log(" [x] Received '%s'", msg.content.toString());
      }, {noAck: true}, function(_consumeOk) { // consume callback
        console.log(' [*] Waiting for messages. To exit press CTRL+C');
      });
    });
  }

  conn.createChannel(on_channel_open);
}

amqp.connect(on_connect);

Run:

node send.js

It will create a queue named 'hello' and will flood it with '1000' sample messages using default AMQP exchange.

node receive.js

It will act as a consumer to subscribe to messages waiting in the queue.

查看更多
成全新的幸福
4楼-- · 2020-03-03 06:12

Well I solved it. It was a dumb mistake. I wrote:

args.Add(" x-max-priority ", 10);

It should have been

args.Add("x-max-priority", 10);

I'll leave this up so other people can have a working example of Rabbitmq 3.5's Priority Queues in C#.

查看更多
登录 后发表回答