RabbitMQ的/ AMQP:单队列,多个消费者对于同样的信息?RabbitMQ的/ AMQP:单

2019-05-14 06:04发布

我刚开始使用的RabbitMQ和AMQP一般。

  • 我有消息的队列
  • 我有多个消费者,我想做不同的事情有相同的消息

大多数RabbitMQ的文档似乎把重点放在循环,即在一个单一的消息由单个消费者消费,与正在蔓延每个消费者之间的负载。 这的确是我的行为作证。

一个例子:生产者有一个单一的队列中,并且将消息发送每2秒:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

而这里的消费者:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

如果我开始消费了两次, 我可以看到,每一个消费者在消费的循环交替行为的消息。 例如,我会看到在一个终端,2,4,6中的其他消息1,3,5。

我的问题是:

  • 我能得到每个消费者收到相同的消息? 即,两个消费者得到消息1,2,3,4,5,6? 这是什么叫AMQP / RabbitMQ的说话吗? 它是如何正常配置?

  • 这是常见的做? 如果我只是有交换路由信息分为两个独立的队列,单次消费,而不是?

Answer 1:

我能得到每个消费者收到相同的消息? 即,两个消费者得到消息1,2,3,4,5,6? 这是什么叫AMQP / RabbitMQ的说话吗? 它是如何正常配置?

不,如果消费者是在同一个队列。 从的RabbitMQ的AMQP概念指南:

必须了解的是,在AMQP 0-9-1,消息负荷消费者之间的平衡是非常重要的。

这似乎意味着, 循环行为中的队列是给定的 ,而不是配置。 即,单独的队列,需要以具有相同的消息ID由多个消费者处理。

这是常见的做? 如果我只是有交换路由信息分为两个独立的队列,单次消费,而不是?

不,这不是,单个队列/多消费者之间每一个消费者处理相同的消息ID是不可能的。 具有交换路由信息到成两个独立的队列确实比较好。

正如我不需要太复杂的路由, 扇出交流会很好地处理了这一点。 我没有过多地关注在交易所早前为节点AMQP有一个“默认交换”让您直接发布消息到连接的概念,但大多数AMQP的消息发布到特定的交易所。

这里是我的扇出交换,发送和接收:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   

    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })

    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})


Answer 2:

刚才看了RabbitMQ的教程 。 您发布消息交换,不排队; 然后将其路由到适当的队列。 在你的情况,你应该绑定单独的队列为每个消费者。 这样,他们可以完全独立使用消息。



Answer 3:

近一答案几乎是正确的 - 我有一个生成需要与不同的消费者落得所以这个过程是非常简单的信息吨的应用程序。

如果你想多个用户对同一消息,请执行下列步骤。

创建多个队列,每个应用程序,将接收该消息,在每个队列属性,“绑定”与amq.direct交换的路由选择标记。 更改您发布的应用程序发送给amq.direct并使用路由标签(不是队列)。 然后将AMQP消息复制到每个队列具有相同结合。 工程liek一个魅力:)

例如:可以说我有一个JSON字符串我产生,我将它发布到使用路由标签“新销售订单”的“amq.direct”交流,我有我的order_printer应用程序的队列打印顺序,我有一个排队我的计费系统,该系统将发送命令的副本和发票的客户端和我有一个Web归档系统在哪里存档的历史/合规性原因的订单,我有一个客户端web界面,订单跟踪其他信息来约订单。

所以我的队列是:order_printer,order_billing,order_archive和order_tracking所有具有约束力的标签“新销售订单”的约束对他们来说,所有4将得到JSON数据。

这是在没有发布应用程序知道或关心的接收应用程序发送数据的理想方式。



Answer 4:

发送模式是一比一的关系。 如果你想“送”到一个以上的接收器,你应该使用发布/订阅模式。 见http://www.rabbitmq.com/tutorials/tutorial-three-python.html了解更多详情。



Answer 5:

是每位消费者都能得到相同的消息。 看看http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html HTTP://www.rabbitmq。 COM /教程/教程五python.html

对于不同的方式来路由消息。 我知道他们是为Python和Java,但其良好的理解的原则,决定你在做什么,然后找到如何做到这一点的JS。 它听起来就像你想要做一个简单的扇出( 教程3 ),这将消息发送给连接到该交换机的所有队列。

与你在做什么,你想要做什么区别基本上是你要建立和交换或类型扇出。 扇出excahnges发送的所有邮件到所有连接队列。 每个队列都会有一个消费者,将分别有机会获得所有信息。

是的,这通常是完成的,它是AMPQ的特点之一。



Answer 6:

RabbitMQ的/ AMQP:单队列,多个消费者对于同样的信息和页面刷新。

rabbit.on('ready', function () {    });
    sockjs_chat.on('connection', function (conn) {

        conn.on('data', function (message) {
            try {
                var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));

                if (obj.header == "register") {

                    // Connect to RabbitMQ
                    try {
                        conn.exchange = rabbit.exchange(exchange, { type: 'topic',
                            autoDelete: false,
                            durable: false,
                            exclusive: false,
                            confirm: true
                        });

                        conn.q = rabbit.queue('my-queue-'+obj.agentID, {
                            durable: false,
                            autoDelete: false,
                            exclusive: false
                        }, function () {
                            conn.channel = 'my-queue-'+obj.agentID;
                            conn.q.bind(conn.exchange, conn.channel);

                            conn.q.subscribe(function (message) {
                                console.log("[MSG] ---> " + JSON.stringify(message));
                                conn.write(JSON.stringify(message) + "\n");
                            }).addCallback(function(ok) {
                                ctag[conn.channel] = ok.consumerTag; });
                        });
                    } catch (err) {
                        console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
                    }

                } else if (obj.header == "typing") {

                    var reply = {
                        type: 'chatMsg',
                        msg: utils.escp(obj.msga),
                        visitorNick: obj.channel,
                        customField1: '',
                        time: utils.getDateTime(),
                        channel: obj.channel
                    };

                    conn.exchange.publish('my-queue-'+obj.agentID, reply);
                }

            } catch (err) {
                console.log("ERROR ----> " + err.stack);
            }
        });

        // When the visitor closes or reloads a page we need to unbind from RabbitMQ?
        conn.on('close', function () {
            try {

                // Close the socket
                conn.close();

                // Close RabbitMQ           
               conn.q.unsubscribe(ctag[conn.channel]);

            } catch (er) {
                console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
            }
        });
    });


Answer 7:

为了得到你想要的行为,只是有每个消费者从自己的队列消费。 你必须以得到消息到所有的队列在一次使用非直接交换型(题目,头,扇出)。



Answer 8:

正如我估计你的情况是:

  • 我有消息队列(源接收消息,让我们将其命名为Q111)

  • 我有多个消费者,我想做不同的事情有相同的消息。

在这里你的问题,而3条消息由该队列收到消息1被消费者接受方的一个,其他消费者B和C的消耗消息2和3。凡为您需要在那里的RabbitMQ传递的同一副本的设置所有这些三个消息(1,2,3)到全部三个连接消费者(A,B,C)同时进行。

虽然可以由许多配置,以实现这一目标,一个简单的方法是使用下面的两个步骤的概念:

  • 从期望的队列(Q111)使用动态的RabbitMQ铲到拾取消息和发布到扇出交换机(专用交换创建专用于该目的)。
  • 现在重新配置你的客户A,B&C(谁在听队列(Q111))从此扇出交换直接使用排他性匿名队列为每个消费者听。

注意:在使用这个概念不从源队列(Q111)直接消费,因为已经习惯消费的消息被铲到你的扇出交换。

如果你认为这不符合您的具体要求...随意张贴您的建议:-)



Answer 9:

有一个在这种情况下我haven`t的答案在这里发现一个有趣的选择。

您可以在一个消费“重新排队”功能NACK消息到另一个处理它们。 一般来说它是不是一个正确的做法,但也许这将是对别人不够好。

https://www.rabbitmq.com/nack.html

和循环的提防(当所有concumers NACK +重新排队消息)!



Answer 10:

如果你碰巧使用amqplib库,因为我,他们有一个简便的事例中的一个实现的发布/订阅RabbitMQ的教程中 ,你可能会发现得心应手。



Answer 11:

我想你应该检查使用扇出器发送邮件。 您威尔·接收相同的消息型动物的消费者,该表的RabbitMQ下的一方法是在创建这个新的消费者/用户的每一个型动物队列。

这是链接,看到在javascript教程示例https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html



文章来源: RabbitMQ / AMQP: single queue, multiple consumers for same message?