Properly iterating over queues from ActiveMQ Desti

2020-04-17 06:55发布

问题:

For some reason in the following code, destinationSource.getQueues() is returning a CopyOnWriteArraySet instead of a simple Set. This is a problem because the for loop begins to process before the Set is full and due to the nature of CopyOnWriteArraySet it will only process the items in the Set before the loop. I know I can throw a Thread.sleep() in there but that doesn't fix the underlying problem. Is there any reason it would be returned as a CopyOnWriteArraySet instead of a Set? Also is there any way to iterate over a CopyOnWriteArraySet to ensure all items would be covered, even ones added during the iteration?

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();
activeMQConnection.start();
DestinationSource destinationSource = activeMQConnection.getDestinationSource();

Set<ActiveMQQueue> queues = destinationSource.getQueues();

for(ActiveMQQueue queue : queues) {
  queueNames.add(queue.getPhysicalName());
}

activeMQConnection.close()

Edit: Here is the solution I came up with, while its not perfect it ensures that you will get all the queues up until there is more than 1 second between queues being added.

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);

    ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection();

    activeMQConnection.start();

    DestinationSource destinationSource = activeMQConnection.getDestinationSource();

    Set<ActiveMQQueue> queues = destinationSource.getQueues();

    do {
        for(ActiveMQQueue queue : queues) {
            String physcialName = queue.getPhysicalName();
            if(!queueNames.contains(physcialName)) {
                queueNames.add(physcialName);
            }
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log(e.toString());
        }
    }while(queueNames.size() < queues.size());

    activeMQConnection.close();

回答1:

I had the same issue with getting all queues from a connection. Whenever i got the queues from the DestinationSource and then iterated afterwards (foreach) over this set, i got different number of queues (In the iteration loop i always get more queues than in the initial set).

DestinationSource ds = connection.getDestinationSource();
Set<ActiveMQQueue> queues = ds.getQueues();
log.debug("Found '" + queues.size() + "' queues");
for (ActiveMQQueue queue : queues) {...}

Then, i added a listener to the destination source like this

DestinationSource ds = connection.getDestinationSource();
Set<ActiveMQQueue> queues = ds.getQueues();
// Add listener:
ds.setDestinationListener(event -> event.hashCode());
log.debug("Found '" + queues.size() + "' queues");
for (ActiveMQQueue queue : queues) {...}

From now on, i always get the right number of queues and can iterate over the complete set.

Allthough, i don't really know why ;)



回答2:

As you say, it is the nature of CopyOnWriteArraySet to behave this way. The list of queues can change concurrently with your thread. By returning you a CopyOnWriteArraySet ActiveMQ is giving you a data structure that is safe to use in your thread (no change of ConcurrentModificationException) and one that will remain up-to-date.

Since new queues could be added at any time there is no way to "wait" until they are all done.

If you want to know when new queues are added and then do something in response, the best way is to listen for the appropriate ActiveMQ advisory message. This facility will let you respond to message queue additions, consumer and producer additions, as well as removals of the same. Think link has a code example.