How to continuously read JMS Messages in a thread

2019-08-06 12:58发布

问题:

I've written a Continuous JMS Message reveiver : Here, I'm using CLIENT_ACKNOWLEDGE because I don't want this thread to acknowledge the messages.

(...)
connection.start();
session = connection.createQueueSession(true, Session.CLIENT_ACKNOWLEDGE);
queue = session.createQueue(QueueId);
receiver = session.createReceiver(queue);
While (true) {
  message = receiver.receive(1000);
  if ( message != null ) {
    // NB : I can only pass Strings to the other thread
    sendMessageToOtherThread( message.getText() , message.getJMSMessageID() ); 
  }
  // TODO Implement criteria to exit the loop here
}

In another thread, I'll do something as follows (after successful processing) :
This is in a distinct JMS Connection executed simultaneously.

public void AcknowledgeMessage(String messageId) {
  if (this.first) {
    this.connection.start();
    this.session = this.connection.createQueueSession( false, Session.AUTO_ACKNOWLEDGE );
    this.queue = this.session.createQueue(this.QueueId);
  }
  QueueReceiver receiver = this.session.createReceiver(this.queue, "JMSMessageID='" + messageId + "'");
  Message AckMessage = receiver.receive(2000);
  receiver.close();
}

It appears that the message is not found (AckMessage is null after timeout) whereas it does exist in the Queue. I suspect the message to be blocked by the continuous input thread.. indeed, when firing the AcknowledgeMessage() alone, it works fine.

Is there a cleaner way to retrieve 1 message ? based on its QueueId and messageId
Also, I feel like there could be a risk of memory leak in the continuous reader if it has to memorize the Messages or IDs during a long time.. justified ?

If I'm using a QueueBrowser to avoid impacting the Acknowledge Thread, it looks like I cannot have this continuous input feed.. right ?

More context : I'm using ActiveMQ and the 2 threads are 2 custom "Steps" of a Pentaho Kettle transformation.
NB : Code samples are simplified to focus on the issue.

回答1:

Well, you can't read that message twice, since you have already read it in the first thread.

ActiveMQ will not delete the message as you have not acknowledge it, but it won't be visible until you drop the JMS connection (I'm not sure if there is a long timeout here as well in ActiveMQ).

So you will have to use the original message and do: message.acknowledge();. Note, however, that sessions are not thread safe, so be careful if you do this in two different threads.