IBM MQ Message Listener

2019-04-30 05:40发布

问题:

Hi does anyone know how to create a message listener using IBM MQ? I know how to do it using the JMS spec but I am not sure how to do it for IBM MQ. Any links or pointers are greatly appreciated.

回答1:

Although there is a WMQ Java API as noted by the previous responders, WMQ supports JMS as well so here are some resources to get you started there.

Take a look at this article: IBM WebSphere Developer Technical Journal: Running a standalone Java application on WebSphere MQ V6.0

Also, if you have installed the full WMQ client and not just grabbed the jars then you will have lots of sample code installed. By default, these will live in C:\Program Files\IBM\WebSphere MQ\tools\jms or /opt/mqm/samp depending on your platform.

If you need the WMQ Client install media, get it here. Note that this is the WMQ v7 client and not the v6 client. It is compatible with the v6 QMgr but since v6 is end-of-life as of September 2011 you should be doing new development on the v7 client and, if possible, a v7 QMgr. There are a lot of functional and performance enhancements available if both sides are v7.

You can get the product manual here if you need it.

Finally, please be sure when you get a JMS exception to print the linked exception. This is not a WMQ thing, rather it's a JMS thing. Sun provided a multi-level data structure for JMS exceptions and the really interesting parts are often in the nested level. This is not a big deal and can be implemented in a few lines:

try {
  .
  . code that might throw a JMSException
  .
} catch (JMSException je) {
  System.err.println("caught "+je);
  Exception e = je.getLinkedException();
  if (e != null) {
    System.err.println("linked exception: "+e);
  } else {
    System.err.println("No linked exception found.");
  }
}

This helps to determine the difference between a JMS error versus a transport error. For example a JMS security error might be a WMQ 2035, or it might be the JSSE configuration, or the app might not have access to something in the file system. Only one of these is worth spending a lot of time digging through the WMQ error logs for and only by printing the linked exception will you be able to tell if it's that one.



回答2:

Take a look at IBM Help: Writing WebSphere MQ base Java applications

IBM has an API for interacting with queues. Here's their sample:

import com.ibm.mq.*;            // Include the WebSphere MQ classes for Java package


public class MQSample
{
  private String qManager = "your_Q_manager";  // define name of queue
                                               // manager to connect to.
  private MQQueueManager qMgr;                 // define a queue manager
                                               // object
  public static void main(String args[]) {
     new MQSample();
  }

  public MQSample() {
   try {

      // Create a connection to the queue manager

      qMgr = new MQQueueManager(qManager);

      // Set up the options on the queue we wish to open...
      // Note. All WebSphere MQ Options are prefixed with MQC in Java.

      int openOptions = MQC.MQOO_INPUT_AS_Q_DEF |
                        MQC.MQOO_OUTPUT ;

      // Now specify the queue that we wish to open,
      // and the open options...

      MQQueue system_default_local_queue =
              qMgr.accessQueue("SYSTEM.DEFAULT.LOCAL.QUEUE",
                               openOptions);

      // Define a simple WebSphere MQ message, and write some text in UTF format..

      MQMessage hello_world = new MQMessage();
      hello_world.writeUTF("Hello World!");

      // specify the message options...

      MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the // defaults,
                                                           // same as MQPMO_DEFAULT

      // put the message on the queue

      system_default_local_queue.put(hello_world,pmo);

      // get the message back again...
      // First define a WebSphere MQ message buffer to receive the message into..

      MQMessage retrievedMessage = new MQMessage();
      retrievedMessage.messageId = hello_world.messageId;

      // Set the get message options...

      MQGetMessageOptions gmo = new MQGetMessageOptions(); // accept the defaults
                                                           // same as  MQGMO_DEFAULT
      // get the message off the queue...

      system_default_local_queue.get(retrievedMessage, gmo);

      // And prove we have the message by displaying the UTF message text

      String msgText = retrievedMessage.readUTF();
      System.out.println("The message is: " + msgText);
      // Close the queue...
      system_default_local_queue.close();
      // Disconnect from the queue manager

      qMgr.disconnect();
    }
      // If an error has occurred in the above, try to identify what went wrong
      // Was it a WebSphere MQ error?
    catch (MQException ex)
    {
      System.out.println("A WebSphere MQ error occurred : Completion code " +
                         ex.completionCode + " Reason code " + ex.reasonCode);
    }
      // Was it a Java buffer space error?
    catch (java.io.IOException ex)
    {
      System.out.println("An error occurred whilst writing to the message buffer: " + ex);
    }
  }
} // end of sample

I'm not sure if the IBM jars are located at the base Maven repo. I know in the past I've had to extract them from a local IBM install and put them in an local SVN repo. I'm using the following jars:

<dependency>
    <groupId>com.ibm</groupId>
    <artifactId>com.ibm.mq</artifactId>
    <version>5.3.00</version>
    <scope>compile</scope>
</dependency>
    <dependency>
    <groupId>com.ibm</groupId>
    <artifactId>com.ibm.mq.pcf</artifactId>
    <version>5.3.00</version>
    <scope>compile</scope>
</dependency>
    <dependency>
    <groupId>com.ibm</groupId>
    <artifactId>com.ibm.mqbind</artifactId>
    <version>5.3.00</version>
    <scope>compile</scope>
</dependency>
    <dependency>
    <groupId>com.ibm</groupId>
    <artifactId>com.ibm.mqjms</artifactId>
    <version>5.3.00</version>
    <scope>compile</scope>
</dependency>


回答3:

Just in case anyone will google stackoverflow for MQ Listener like i did... It might be not the answer due to JMS realization, but this is what I was looking for. Something like this:

MQQueueConnectionFactory cf = new MQQueueConnectionFactory();
MQQueueConnection conn = (MQQueueConnection)cf.createQueueConnection();
MQQueueSession session = (MQQueueSession)conn.createSession(false, 1);

Queue queue = session.createQueue("QUEUE");

MQQueueReceiver receiver = (MQQueueReceiver)session.createReceiver(queue);

receiver.setMessageListener(new YourListener());

conn.start();

YourListener should implement MessageListener interface and you will receive you messages into onMessage(Message msg) method.



回答4:

Take a look at the sample provided above.

Specifically at the lines

MQGetMessageOptions gmo = new MQGetMessageOptions();       
system_default_local_queue.get(retrievedMessage, gmo);

You can configure the get to wait for a specified time before throwing a MQRC_NO_MSG_AVAILABLE exception. Or you can wait forever.

gmo.waitInterval= qTimeout;
gmo.options = MQC.MQGMO_WAIT

So you can create a thread that keeps looking for new messages then passes them off to a handler. The getting and putting do not need to be in the same thread or even application.

I hope this helps answer your question.



回答5:

An important point in addition to the existing answers: JMS provides MessageListener, a class that allows you to receive messages as asynchronous callbacks.

The native API has no equivalent feature! You have to repeatedly call get(...) as appropriate.



回答6:

in the loop before getting the message you can specify as following

gmo.options = MQC.MQGMO_WAIT
gmo.waitInterval = MQConstants.MQWI_UNLIMITED;

this makes the loop will wait until there is a message in the queue. To me, it is similar to MessageListerner



回答7:

Hello, here is the working example of message listener with IBM MQ. Here I used spring also to create beans etc...

package queue.app;

import javax.annotation.PostConstruct;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.ibm.mq.jms.MQQueue;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;


@Component
public class QueueConsumer implements MessageListener{

    private Logger logger = Logger.getLogger(getClass());

    MQQueueConnectionFactory qcf = new MQQueueConnectionFactory();
    QueueConnection qc;
    Queue queue;
    QueueSession queueSession;
    QueueReceiver qr;

    @Value("${jms.hostName}")
    String jmsHost;
    @Value("${jms.port}")
    String jmsPort;
    @Value("${jms.queue.name}")
    String QUEUE_NAME;
    @Value("${jms.queueManager}")
    String jmsQueueMgr;
    @Value("${jms.username}")
    String jmsUserName;
    @Value("${jms.channel}")
    String jmsChannel;

    @PostConstruct
    public void init() throws Exception{
        qcf.setHostName (jmsHost);
        qcf.setPort (Integer.parseInt(jmsPort));
        qcf.setQueueManager (jmsQueueMgr);
        qcf.setChannel (jmsChannel);
        qcf.setTransportType (WMQConstants.WMQ_CM_CLIENT);
        qc = qcf.createQueueConnection ();

        queue = new MQQueue(QUEUE_NAME);
        qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
        queueSession = qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
        qr = queueSession.createReceiver(queue);
        qr.setMessageListener(this);
        qc.start();

    }


    @Override
    public void onMessage(Message message) {
        logger.info("Inside On Message...");
        long t1 = System.currentTimeMillis();
        logger.info("Message consumed at ...."+t1);

        try{
            if(message instanceof TextMessage) {
                logger.info("String message recieved >> "+((TextMessage) message).getText());
            }

        }catch(Exception e){
            e.printStackTrace();
        }

    }
}

Below are the dependencies i have..

<dependency>
            <groupId>com.sun.messaging.mq</groupId>
            <artifactId>fscontext</artifactId>
            <version>4.2</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>jms</artifactId>
            <version>1.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>3.2.17.RELEASE</version>
        </dependency>


        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>com.ibm.mq</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>com.ibm.mq.allclient</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>com.ibm.mq.jmqi</artifactId>
            <version>1.0</version>
        </dependency>
        <dependency>
            <groupId>com.ibm</groupId>
            <artifactId>com.ibm.mqjms</artifactId>
            <version>1.0</version>
        </dependency>


标签: java jms ibm-mq