-->

How to fetch messages from an Azure Service Bus Qu

2020-07-11 08:44发布

问题:

We're trying to consume Azure Service Bus in a Node application. Our requirement is to fetch multiple messages from a queue.

Since Azure SDK for Node doesn't support batch retrieval, we decided to use AMQP. While we're able to fetch messages using Peek Messages as described here (https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response#message-operations).

What we are noticing is that as soon as messages are fetched, they are getting removed from the queue. I am wondering if anyone has an insight into how we can fetch messages in "PeekLock" mode using AMQP and Node. For AMQP, we're using amqp10 node package (https://www.npmjs.com/package/amqp10).

Here's our code for peeking at the messages:

const AMQPClient = require('amqp10/lib').Client,
Policy = require('amqp10/lib').Policy;

const protocol = 'amqps';
const keyName = 'RootManageSharedAccessKey';
const sasKey = 'My Shared Access Key'
const serviceBusHost = 'account-name.servicebus.windows.net';
const uri = protocol + '://' + encodeURIComponent(keyName) + ':' + encodeURIComponent(sasKey) + '@' + serviceBusHost;
const queueName = 'test1';
var client = new AMQPClient(Policy.ServiceBusQueue);
client.connect(uri)
.then(function () {
    return Promise.all([
        client.createReceiver(queueName),
        client.createSender(queueName)
    ]);
})
.spread(function(receiver, sender) {
    console.log(receiver);
    console.log(sender);
    console.log('--------------------------------------------------------------------------');
    receiver.on('errorReceived', function(err) {
        // check for errors
        console.log(err);
    });
    receiver.on('message', function(message) {
        console.log('Received message');
        console.log(message);
        console.log('------------------------------------');
    });

    return sender.send([], {
        operation: 'com.microsoft:peek-message',
        'message-count': 5
    });
})
.error(function (e) {
    console.warn('connection error: ', e);
});

回答1:

By default receiver works in auto-settle mode, you have to change it to settle on disposition:

const { Constants } = require('amqp10')

// 
// ...create client, connect, etc...
//

// Second parameter of createReceiver method enables overwriting policy parameters
const receiver = client.createReceiver(queueName, {
  attach: {
    rcvSettleMode: Constants.receiverSettleMode.settleOnDisposition
  }
})

Do not forget to accept/reject/release a message after processing it:

receiver.on('message', msg => {
  //
  // ...do something smart with a message...
  //

  receiver.accept(msg) // <- manually settle a message
})