How can I pull messages from Activemq Asynchronous

2020-06-23 09:52发布

问题:

I want to write code for pulling messages from Activemq.I don't want to pull all the messages from Activemq at a time,because my requirement is whenever my Java Application receives 1 message from Activemq,based on message body I will find corresponding HTTP Link and forward to that Link. For this entire logic I wrote 2 .java files names are

MessageConsumer.java

MyListener.java

MessageConsumer.java file only for connection establishing.The corresponding code is in below.

 package PackageName;
 import java.io.IOException;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.jms.*;
 import org.apache.activemq.ActiveMQConnectionFactory;
 public class MessageConsumer extends HttpServlet {
@Override
protected void service(HttpServletRequest arg0, HttpServletResponse arg1)
    throws ServletException, IOException {
    try {
      //creating connectionfactory object for way
      ConnectionFactory connectionFactory=new        
      ActiveMQConnectionFactory("admin","admin","tcp://localhost:61617");
     //establishing the connection b/w this Application and Activemq
     Connection connection=connectionFactory.createConnection();
     Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
     Queue queue=session.createQueue("MessageTesing");
     javax.jms.MessageConsumer consumer=session.createConsumer(queue);
     //fetching queues from Activemq
     MessageListener listener = new MyListener();
    consumer.setMessageListener(listener);
    connection.start();
    System.out.println("Press a key to terminate");
    }
  catch (Exception e) {
    // TODO: handle exception
}
 }
}

MyListener.java file is for triggering corresponding Applications.code is in below

package PackageName;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyListener implements MessageListener {
public void onMessage(Message msg) {
    try {
        TextMessage msg1=(TextMessage)msg;
        //just for your understanding I mention dummy code
        System.out.println(msg1.getText());
        if (msg1.getText()=="Google") {
            System.out.println("Forwarding http link to Google");
        }
        else {
            System.out.println("Forwarding http link to Facebook");
        }
    } catch (JMSException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
}

in my post, I am triggering Google and Facebook links.But As far my requirements I will call my own Applications.each Application taking more than 20 min.So I want to pull messages one by one.once previous message process completed then only it will receive another message from Activemq .

But right know I am getting all the messages at a time.How can I fix this.I seen Activemq-Hellowworld program.I didn't understand.

Sorry I am new to Java technology.can anyone help me.

Thanks.

回答1:

If you are using a MessageListener, then you are actually receiving messages asynchronously (in another thread).

You are probably looking for synchronous message reception, so try this in your main thread:

final QueueReceiver queueReceiver = queueSession.createReceiver(queue);
queueConnection.start();

while (true) {
  Message message = queueReceiver.receive();
  // Process your message: insert the code from MyListener.onMessage here

  // Possibly add an explit message.acknowledge() here, 
  // if you want to make sure that in case of an exception no message is lost
  // (requires Session.CLIENT_ACKNOWLEDGE, when you create the queue session)

  // Possibly terminate program, if a certain condition/situation arises
}

without a MessageListener.

receive() blocks until a message is available, so your main thread (and thus your program) waits in the receive method. If a message arrives, it will receive and process it.

Update

If you use

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

then you should call

message.acknowledge()

after the message has been processed completely.

Whereas in case of Session.AUTO_ACKNOWLEDGE the message is removed from the queue immediately (and is therefore lost, if the program terminates whilte processing the message).



回答2:

Instead of using a MessageListener you could use the receive() method in the MessageConsumer object. This way you only get one message each time you call the receive() method.

MessageConsumer consumer = session.createConsumer(destination); 
Message message = consumer.receive(1000);