Apache Camel with IBM MQ

2020-06-14 00:03发布

Hello has anyone ever used Camel with IBM's MQ. We are looking at possibly using the two products together but have no example of the two products working together.

3条回答
放我归山
2楼-- · 2020-06-14 00:44

The best I have been able to get is documented below, illustrated as a Spring XML application context that itself hosts the CAMEL context and routes. This sample works with the IBM native MQ JCA-compliant resource adapter v7.5, CAMEL 2.16, Spring core 4.2. I have deployed it in Glassfish, Weblogic, and JBoss EAP7 servers.

The complexity is bound to handling the flow of MQ reports whose philosophy conflicts with that of a plain JMS reply-to message. For a detailed explanation, please refer to Implementing native websphere MQ with CoD over Camel JMS component

This example based on the CAMEL XML DSL is self-contained and easy to test.

We start with Spring & CAMEL declarations:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util" 
xmlns:context="http://www.springframework.org/schema/context" 
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:jee="http://www.springframework.org/schema/jee"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd   
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
    http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">

The CAMEL context follows with 2 routes: MQ to JMS and JMS to MQ, here chained to form a bridge to ease testing.

<camel:camelContext id="mqBridgeCtxt">
<camel:route id="mq2jms" autoStartup="true">

Weird: on Weblogic, the only way to get (e.g.) 3 listeners is to enforce 3 connections (with 3 Camel:from statements in sequence) with max 1 session each, otherwise an MQ error ensues: MQJCA1018: Only one session per connection is allowed. On JBoss, you can simply adjust concurrentConsumers=...

  <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
        acknowledgementModeName=SESSION_TRANSACTED"/> 

The disable disableReplyTo option above ensures that CAMEL will not produce a reply before we can test the MQ message type to be 1=Request(-reply) or 8=datagram (one way!). That test and reply construction is not illustrated here.

Then we enforce the EIP to InOnly on the next posting to plain JMS to be consistent with the Inbound MQ mode.

  <camel:setExchangePattern pattern="InOnly"/>
  <!-- camel:process ref="reference to your MQ message processing bean fits here" / -->
  <camel:to uri="ref:innerQueue" />
</camel:route>

This ends the MQ-to-jms route; next comes the jms-to-MQ route still in the same CAMEL context:

<camel:route id="jms2mq"  autoStartup="true">
  <camel:from uri="ref:innerQueue" />
  <!-- remove inner message headers and properties to test without inbound side effects! -->
  <camel:removeHeaders pattern="*"/> 
  <camel:removeProperties pattern="*" />
  <!-- camel:process ref="reference to your MQ message preparation bean fits here" / -->

Now comes the request flag for the MQ CoD report to be returned by remote destination. We also enforce the MQ message to be of Datagram type (value 8).

  <camel:setHeader headerName="JMS_IBM_Report_COD"><camel:simple resultType="java.lang.Integer">2048</camel:simple></camel:setHeader>
  <camel:setHeader headerName="JMS_IBM_Report_Pass_Correl_ID"><camel:simple resultType="java.lang.Integer">64</camel:simple></camel:setHeader>
  <camel:setHeader headerName="JMS_IBM_MsgType"><camel:simple resultType="java.lang.Integer">8</camel:simple></camel:setHeader>

The ReplyTo queue can be specified either via the ReplyTo uri option, else as a header as below.

Next we do use CamelJmsDestinationName header to enforce suppressing of the JMS MQ message header MQRFH2 (using targetClient MQ URL option value 1). In other words, we want to send a plain vanilla MQ binary message (i.e. Only the MQMD message descriptor followed by the payload).

  <camel:setHeader headerName="JMSReplyTo"><camel:constant>TEST.REPLYTOQ</camel:constant></camel:setHeader>
  <camel:setHeader headerName="CamelJmsDestinationName"> <camel:constant>queue://MYQMGR/TEST.Q2?targetClient=1</camel:constant></camel:setHeader>

More MQMD fields may be controlled through reserved JMS properties as illustrated below. See restrictions in IBM doc.

  <camel:setHeader headerName="JMS_IBM_Format"><camel:constant>MQSTR   </camel:constant></camel:setHeader>
  <camel:setHeader headerName="JMSCorrelationID"><camel:constant>_PLACEHOLDER_24_CHARS_ID_</camel:constant></camel:setHeader>

The destination queue in the URI is overwritten by the CamelJmsDestinationName above, hence the queue name in the URI becomes a placeholder.

The URI option preserveMessageQos is the one that - as observed - allows sending a message with the ReplyTo data being set (to get the MQ CoD Report), yet prevent CAMEL to instantiate a Reply message listener by enforcing the InOnly MEP.

  <camel:to uri="wmq:queue:PLACEHOLDER.Q.NAME?concurrentConsumers=1&amp;
            exchangePattern=InOnly&amp;preserveMessageQos=true&amp;
            includeSentJMSMessageID=true" />
</camel:route>
</camel:camelContext>

We have not finished, we have still to declare our queue factories for both a native JMS provider and Websphere MQ (via the native IBM WMQ JCA Resource Adapter), to be adjusted to your context. We use here JNDI lookups on administrative objects.

<camel:endpoint id="innerQueue" uri="jmsloc:queue:transitQueue">
</camel:endpoint>

<jee:jndi-lookup id="mqQCFBean" jndi-name="jms/MYQMGR_QCF"/>
<jee:jndi-lookup id="jmsraQCFBean" jndi-name="jms/jmsra_QCF"/>

<bean id="jmsloc" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="jmsraQCFBean" />
</bean>

<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="mqQCFBean" />
</bean>

</beans>

An alternative to fetching the factories (and JCA adapters) from JNDI is to declare the JMS client as Spring beans. In Weblogic and Glassfish, you'll be better inspired by deploying the native IBM JCA resource adapter and creating JNDI resources then referenced in the Spring Context as above, in JBoss a direct MQ client bean declaration suits best as below)

<bean id="mqCFBean" class="com.ibm.mq.jms.MQXAConnectionFactory">
    <property name="hostName" value="${mqHost}"/>
    <property name="port" value="${mqPort}"/>
    <property name="queueManager" value="${mqQueueManager}"/>
    <property name="channel" value="${mqChannel}"/>
    <property name="transportType" value="1"/> <!-- This parameter is fixed and compulsory to work with pure MQI java libraries -->
    <property name="appName" value="${connectionName}"/>
</bean>

<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="mqCFBean"/>
    <property name="transacted" value="true"/>
    <property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>

Comments and improvements welcome.

查看更多
爱情/是我丢掉的垃圾
3楼-- · 2020-06-14 00:44

I have extensive use of IBM MQ's with camel. There is no issue using both together. I will paste a sample configuration from one of my spring context files leveraging a camel Jms Endpoint, A spring connection factory, and an IBM MQ definition.

Camel Route

from("someplace")
    .to("cpaibmmq:queue:myQueueName");

Spring Context

<bean name="cpaibmmq" class="org.apache.camel.component.jms.JmsComponent" destroy-method="doStop">
    <property name="transacted" value="${jms.transacted}" />
    <property name="concurrentConsumers" value="${cpa.concurrentConsumers}" />
    <property name="maxConcurrentConsumers" value="${cpa.concurrentConsumers}" />
    <property name="acceptMessagesWhileStopping" value="${jms.acceptMessagesWhileStopping}" />
    <property name="acknowledgementModeName" value="${jms.acknowledgementModeName}" />
    <property name="cacheLevelName" value="${jms.cacheLevelName}" />
    <property name="connectionFactory" ref="ibmFac1" />
    <property name="exceptionListener" ref="ibmFac1" />
</bean>

<bean id="ibmFac1" class="org.springframework.jms.connection.SingleConnectionFactory" destroy-method="destroy">
    <constructor-arg>
        <bean class="com.ibm.mq.jms.MQQueueConnectionFactory">
            <property name="transportType" value="1" />
            <property name="channel" value="${cpa.wmq.channel}" />
            <property name="hostName" value="${cpa.wmq.hostname}" />
            <property name="port" value="${cpa.wmq.port}" />
            <property name="queueManager" value="${cpa.wmq.mqmanager}" />
        </bean>
    </constructor-arg>
</bean>
查看更多
登录 后发表回答