How to send message to different Queue hosted in d

2020-04-21 00:04发布

问题:

My Apache-camel based app is consuming message from one of IBM queue, for e.g. below is the details for connection factory

hostname=host1000
QManager=QM1000
Port="some port"
Channel="common channel"

Camel flow to consume and process and send the response to ReplyQueue which is coming from message header.

 from(wmq:queue:<INPUT_QUEUE>)
.bean("processBean")
.bean("beanToSendMsgToReplyQueue")

In camel header I am getting below JMSReplyQueue. You can see that it's a different Queue manager and this queue manager is from different host but in a clusters environment.

JMSReplyTo = queue://QM1012/TEST.REPLY?targetClient=1

Also queue manager is coming in between. like

queue://<queue-manager>//<queue-name>?<other parameters>

Below is the exception which I am getting while sending message.

ERROR o.apache.camel.processor.DefaultErrorHandler:215 - Failed delivery for (MessageId: ID-xxxxxxxxx-0-4 on ExchangeId: ID-xxxxxx-42443-1492594420697-0-1). Exhausted after delivery attempt: 1 caught: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: wmq://queue://QM1012/TEST.REPLY?targetClient=1 due to: Failed to resolve endpoint: wmq://queue://TAP2001R5/TEST?targetClient=1 due to: There are 1 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. Unknown parameters=[{targetClient=1}]. Processed by failure processor: FatalFallbackErrorHandler[Pipeline[[Channel[sendTo(Endpoint[wmq://queue:BACKOUT_Q])], Channel[DelegateSync[com.xxx.yyy.listener.XXXOnExceptionProcessor@21c66ee4]], Channel[Stop]]]]

Can anyone please help me on sending message to different queue-manager queue which is there in different host but all are in same cluster. Also queue-manager name is coming in middle of the string, so how to resolve that. Please let me know if you need more details.

Update -1: tried with same queue manager and without parameters

JMSReplyTo = queue://QM1000/QUEUE_V1 below exception i am getting

org.springframework.jms.InvalidDestinationException: JMSWMQ2008: Failed to open MQ queue 'QM1000/QUEUE_V1'.; nested exception is com.ibm.msg.client.jms.DetailedInvalidDestinationException: JMSWMQ2008: Failed to open MQ queue 'QM1000/QUEUE_V1'. JMS attempted to perform an MQOPEN, but WebSphere MQ reported an error. Use the linked exception to determine the cause of this error. Check that the specified queue and queue manager are defined correctly.; nested exception is com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2189' ('MQRC_CLUSTER_RESOLUTION_ERROR').

Update-2

I am able to send message to JMSReplyTo using plain javax.jms.* and com.ibm.mq.jms.* api's, but not via Apache camel. Can anyone from Camel user/developer group help me to process the same using camel component.

@Override
public void process(Exchange exchange)
    throws Exception {

    QueueConnection m_connection = this.connectionFactory.createQueueConnection();
    //m_connection.start();
    boolean transacted = false;

    QueueSession session = m_connection.createQueueSession(transacted, QueueSession.AUTO_ACKNOWLEDGE);
    TextMessage outMessage = session.createTextMessage();
    outMessage.setText(exchange.getIn().getBody());
    MQQueue mq = new MQQueue(
        "queue://QM1012/TEST.REPLY");
    QueueSender queueSender = session.createSender((MQQueue) mq);
    queueSender.send(outMessage);

    /* producerTemplate.send("wmq:" + "queue://QM1012/TEST.REPLY", exchange); */
}

回答1:

You want to communicate with two different queue managers, so you'll need to define two Camel JMS component instances accordingly. Camel cannot magically know what QM1000 or QM1012 means and how the QMs can be accessed.

You first need the two JMS connection factory instances for the two WMQ QMs. How to get these depends on your execution environment. On JEE servers the connection pools can be accessed using JNDI after being configured. Have a look at your appserver documentantion on how to setup JMS pools. If you run stand-alone have a look at Spring JMS connection caching or Atomikos if you want XA transactions. Lets assume the CF for QM1000 is sourceCF and for QM1012 is targetCF.

Now you can define two instances of Camel JMS Component, one for each QM. Inject the connection factory into the JMS Components (.setConnectionFactory(...)). Assume you define a Camel JMS Component with id "jmssource", inject sourceCF. In JMS component id "jmstarget" inject targetCF. How to do that depends on your environment (JEE/CDI, Spring, plain Java). Have a look around stackoverflow, there are examples.

You can now specify Camel JMS producers and consumers on a Camel route using the syntax:

.from("jmssource:INPUT_QUEUE")
  ...
  (do some processing)
  ...
  .to("jmstarget:QUEUE_V1")

You cannot use Camel's JMS reply-to logic (using the JMSReplyTo header) to reply to another queue manager. I think this is not allowed by the JMS standard. You need to do the reply explicitly by sending to the reply queue.

For setting the targetClient option a destination resolver may be useful:

import org.springframework.jms.support.destination.DynamicDestinationResolver;
import org.springframework.jms.support.destination.DestinationResolver;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import com.ibm.mq.jms.JMSC;
import com.ibm.mq.jms.MQDestination;

public class WMQDestinationResolver extends DynamicDestinationResolver implements DestinationResolver {
  private int targetClient = JMSC.MQJMS_CLIENT_JMS_COMPLIANT;

  public void setTargetClient(int targetClient) {
    this.targetClient = targetClient;
  }

  public Destination resolveDestinationName(Session session, String destinationName, boolean isPubSubDomain) throws JMSException {
    Destination destination = super.resolveDestinationName(session, destinationName, isPubSubDomain);
    if (destination instanceof MQDestination) {
      MQDestination mqDestination = (MQDestination) destination;
      mqDestination.setTargetClient(targetClient);
    }
    return destination;
  }
}


回答2:

First, thanks all for your support. My use case is as follows(which is there above too).

Connect to a MQ host(hostname, queueManager, port, channel) using Apache Camel and consume message from queue which is belongs to the same host/Qmanager. message is coming with replyToQueue (JMSReplyTo) header value. The value of ReplyToQueue (JMSReplyTo) is as follows

for e.g.

queue://Different_QueueManager_in_Cluster/TEST.REPLY?mdReadEnabled=true&messageBody=0&mdWriteEnabled=true&XMSC_WMQ_REPLYTO_STYLE=1&targetClient=1

Now question is, how to send reply message to a different queue with different queue manager while connection object is connecting to above mentioned host and queue manager.

NOTE: All MQ queue managers are in clustered environment.

Solution 1: for e.g.

form(wmq:queue:INPUT_MSG_Q)
 .bean(requestProcessor)
 .bean(responseProcessor)

Apache Camel by default handle ReplyToQ (JMSReplyTo). If you don't want to send reply to ReplyToQ (JMSReplyTo) then use disableReplyTo=true while consuming

NOTE: While sending to queue://Different_QueueManager_in_Cluster/TEST.REPLY, using same connection/connection factory, MQ cluster will check that message has to go to specified queue manager with specified queue in Cluster. Regarding following parameters ?mdReadEnabled=true&messageBody=0&mdWriteEnabled=true&XMSC_WMQ_REPLYTO_STYLE=1&targetClient=1, Apache Camel is able to resolve automatically without using any 3nd party resolver while auto reply to JMSReplyTo.

Solution 2:

Disable auto reply using disableReplyTo=true and get the queue details from header and send message using plain javax.jms.* and com.ibm.mq.jms.* api's. Code for the same as follows.

@Override
public void process(Exchange exchange)
    throws Exception {

    QueueConnection m_connection = this.connectionFactory.createQueueConnection();
    //m_connection.start();
    boolean transacted = false;

    QueueSession session = m_connection.createQueueSession(transacted, QueueSession.AUTO_ACKNOWLEDGE);
    TextMessage outMessage = session.createTextMessage();
    outMessage.setText(exchange.getIn().getBody());
    MQQueue mq = new MQQueue(
        "queue://Different_QueueManager_in_Cluster/TEST.REPLY");
    QueueSender queueSender = session.createSender((MQQueue) mq);
    queueSender.send(outMessage);

    /* producerTemplate.send("wmq:" + "queue://Different_QueueManager_in_Cluster/TEST.REPLY", exchange); */
}

for parameters, use destination resolver as mentioned by @Sebastian Brandt (post)