How to enqueue on Oracle AQ table on commit with J

2019-04-02 08:29发布

问题:

I am writing a Java component for an enterprise level product and want to leverage a particular feature of Oracle 11g databases, Active Queues. The exact scenario i nwant to accomplish is - 1. write a message to the oracle active queue/queue table on commit 2. read that message from the queue with a JMS consumer

I followed the demo and tutorial at http://docs.oracle.com/cd/B28359_01/java.111/b31224/streamsaq.htm

and in particular, I'd like to focus on the enqueue part of the code -

    // Create the actual AQMessage instance:
    AQMessage mesg = AQFactory.createAQMessage(msgprop);
    // and add a payload:
    byte[] rawPayload = new byte[500];
    for (int i = 0; i < rawPayload.length; i++) {
        rawPayload[i] = 'b';
    }

    mesg.setPayload(new RAW(rawPayload));

    AQEnqueueOptions opt = new AQEnqueueOptions();
    opt.setRetrieveMessageId(true);
    opt.setDeliveryMode(AQEnqueueOptions.DeliveryMode.PERSISTENT);
    opt.setVisibility(AQEnqueueOptions.VisibilityOption.ON_COMMIT);

    // execute the actual enqueue operation:
    conn.enqueue(queueName, opt, mesg);

This works just fine for me, because we want to make sure the message is only visible to the consumers when the transaction is committed.

The problem - In the demo we create queues of payload-type RAW

doUpdateDatabase(conn,
           "BEGIN "+
           "DBMS_AQADM.CREATE_QUEUE_TABLE( "+
           "   QUEUE_TABLE        =>  '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE',  "+
           "   QUEUE_PAYLOAD_TYPE =>  'RAW', "+
           "   COMPATIBLE         =>  '10.0'); "+
           "END; ");
doUpdateDatabase(conn,
           "BEGIN "+
           "DBMS_AQADM.CREATE_QUEUE( "+
           "    QUEUE_NAME     =>   '"+USERNAME+".RAW_SINGLE_QUEUE', "+
           "    QUEUE_TABLE    =>   '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE'); "+
           "END;  ");
doUpdateDatabase(conn,
           "BEGIN "+
           "  DBMS_AQADM.START_QUEUE('"+USERNAME+".RAW_SINGLE_QUEUE'); "+
           "END; ");

by using queues created in RAW I am able to enqueue messages to the queue, however JMS consumers fail to subscribe to the queue throwing an (null pointer) exception where the consumer expects a parameter for the expected type. In short this code throws a null pointer exception on init.

Properties env = new Properties();
env.load(new FileInputStream(new File("jndi.properties")));
Context ctx = new InitialContext(env);
ConnectionFactory connFactory = (ConnectionFactory)ctx.lookup(connectionFactoryName);
Connection connection = connFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
AQjmsSession queueSession = (AQjmsSession) session;
Queue queue = (Queue) ctx.lookup(queueName);
MessageConsumer receiver = queueSession.createReceiver(queue);

JNDI.properties

java.naming.factory.initial = oracle.jms.AQjmsInitialContextFactory
java.naming.security.principal = username
java.naming.security.credentials = password
db_url = jdbc:oracle:thin:@host:port:dbname

I get a similar exception when trying to setup consumers in Camel.

<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
        <!-- this camel route will read incoming messages from Oracle -->
        <route>
            <from uri="oracleQueue:queue:RAW_SINGLE_QUEUE" />
            <to uri="WebSphereMQ:queue:myWebSphereQueue" />
        </route>
    </camelContext>

    <bean id="connectionFactoryOracleAQQueue" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory">
        <constructor-arg index="0">
            <value>oracle db URL</value>
        </constructor-arg>
        <constructor-arg index="1" type="java.util.Properties">
            <value></value>
        </constructor-arg>
    </bean>

    <bean id="oracleQueueCredentials" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
        <property name="targetConnectionFactory">
            <ref bean="connectionFactoryOracleAQQueue" />
        </property>
        <property name="username">
            <value>username</value>
        </property>
        <property name="password">
            <value>password</value>
        </property>
    </bean>

    <bean id="oracleQueue" class="org.apache.camel.component.jms.JmsComponent">
        <property name="connectionFactory" ref="oracleQueueCredentials" />
    </bean>

With some research I figured the queue payload type might be the issue. Therefore, I changed the queue table create script and used JMS message as the payload type

 doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE_TABLE( "
 + "   QUEUE_TABLE        =>  'RAW_SINGLE_QUEUE_TABLE',  "
 + "   QUEUE_PAYLOAD_TYPE =>  'SYS.AQ$_JMS_MESSAGE', " +
 "   COMPATIBLE         =>  '10.0'); " + "END; ");

In this case the JMS Consumers are able to connect, but the enqueue code now fails - ORA-25215: user_data type and queue type do not match

The question is how can I enqueue messages, visible only on commit, from a Java producer and be able to consume with camel or generic JMS consumer?

constraints (to filter out some of the answers already on the net) - Cannot use PL/SQL, spring transactions, JTA. I've seen examples like How to enqueue a JMS message into Oracle AQ using Java where the queue table is created with the SYS.AQ$_JMS_MESSAGE type but the example producer is a JMS MessageProducer rather than the one in the oracle guide. I am not trying to enqueue JMS messages (AQJmsMessage), rather use the AQMessage type as explained in the Oracle guide, and to use the visible on commit option.

My feeling is that if the issue is based on a mismatch of payload types only, then there must be some configuration on the consumer side to specify the payload type, or on the producer side to be able to write messages in a way JMS consumers will understand. Is there a way to accomplish this?

回答1:

I was able to accomplish this - I had to guess around many parts of the Oracle API, and collecting hints from various blogs. For anyone interested here is way I got it working - 1. I created an Oracle Object on the Oracle Db 2. With this Oracle Object, I created queue tables of the object type as the payload 3. I am now able to enqueue AQMessage types with STRUCT payload, containing the object data 4. And I am able to dequeue with a JMS consumer that understands the ADT payload type (Thanks to the article at http://blog.javaforge.net/post/30858904340/oracle-advanced-queuing-spring-custom-types)

Here are the steps with code - Create the Oracle object. The object can have any primary data type fields like VARCHAR, TIMESTAMP etc and also BLOB, CLOB etc. In this case I provided one of the columns as a blob to make things more complicated.

create or replace type aq_event_obj as object
(
  id       varchar2(100),
  payload  BLOB
);
commit;

Now create the queue table. The payload type of the table is the oracle object.

private void setup(Connection conn) throws SQLException {
    doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE_TABLE( "
            + "   QUEUE_TABLE        =>  'OBJ_SINGLE_QUEUE_TABLE',  " + "   QUEUE_PAYLOAD_TYPE =>  'AQ_EVENT_OBJ', "
            + "   COMPATIBLE         =>  '10.0'); " + "END; ");
    doUpdateDatabase(conn, "BEGIN " + "DBMS_AQADM.CREATE_QUEUE( " + "    QUEUE_NAME     =>   'OBJ_SINGLE_QUEUE', "
            + "    QUEUE_TABLE    =>   'OBJ_SINGLE_QUEUE_TABLE'); " + "END;  ");
    doUpdateDatabase(conn, "BEGIN " + "  DBMS_AQADM.START_QUEUE('OBJ_SINGLE_QUEUE'); " + "END; ");
} 

You can now enqueue AQMessage types in Java with a struct instance of the object

public void enqueueMessage(OracleConnection conn, String correlationId, byte[] payloadData) throws Exception {
    // First create the message properties:
    AQMessageProperties aqMessageProperties = AQFactory.createAQMessageProperties();
    aqMessageProperties.setCorrelation(correlationId);
    aqMessageProperties.setExceptionQueue(EXCEPTION_QUEUE_NAME);

    // Specify an agent as the sender:
    AQAgent aqAgent = AQFactory.createAQAgent();
    aqAgent.setName(SENDER_NAME);
    aqAgent.setAddress(QUEUE_NAME);
    aqMessageProperties.setSender(aqAgent);

    // Create the payload
    StructDescriptor structDescriptor = StructDescriptor.createDescriptor(EVENT_OBJECT, conn);
    Map<String, Object> payloadMap = new HashMap<String, Object>();
    payloadMap.put("ID", correlationId);
    payloadMap.put("PAYLOAD", new OracleAQBLOBUtil().createBlob(conn, payloadData));
    STRUCT struct = new STRUCT(structDescriptor, conn, payloadMap);

    // Create the actual AQMessage instance:
    AQMessage aqMessage = AQFactory.createAQMessage(aqMessageProperties);
    aqMessage.setPayload(struct);

    AQEnqueueOptions opt = new AQEnqueueOptions();
    opt.setDeliveryMode(AQEnqueueOptions.DeliveryMode.PERSISTENT);
    opt.setVisibility(AQEnqueueOptions.VisibilityOption.ON_COMMIT);

    // execute the actual enqueue operation:
    conn.enqueue(QUEUE_NAME, opt, aqMessage);
}

The blob field needed special handling

public class OracleAQBLOBUtil {

    public BLOB createBlob(OracleConnection conn, byte[] payload) throws Exception {
        BLOB blob = BLOB.createTemporary(conn, false, BLOB.DURATION_SESSION);
        OutputStream outputStream = blob.setBinaryStream(1L);
        InputStream inputStream = new ByteArrayInputStream(payload);
        try {
            byte[] buffer = new byte[blob.getBufferSize()];
            int bytesRead = 0;
            while ((bytesRead = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, bytesRead);
            }
            return blob;
        }
        finally {
            outputStream.close();
            inputStream.close();
        }
    }

    public byte[] saveOutputStream(BLOB blob) throws Exception {
        InputStream inputStream = blob.getBinaryStream();
        int counter;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        while ((counter = inputStream.read()) > -1) {
            byteArrayOutputStream.write(counter);
        }
        byteArrayOutputStream.close();
        return byteArrayOutputStream.toByteArray();
    }

}

For the consumer, you need to provide an instance of ORADataFactory that lets the consumer understand the payload type (your custom object).

AQjmsSession queueSession = (AQjmsSession) session;
Queue queue = (Queue) ctx.lookup(queueName);
MessageConsumer receiver = queueSession.createReceiver(queue, new OracleAQObjORADataFactory());

Where the code for OracleAQObjORADataFactory is

import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;

import oracle.jdbc.OracleTypes;
import oracle.jpub.runtime.MutableStruct;
import oracle.sql.BLOB;
import oracle.sql.Datum;
import oracle.sql.ORAData;
import oracle.sql.ORADataFactory;
import oracle.sql.STRUCT;

public class OracleAQObjORADataFactory  implements ORAData, ORADataFactory {

    public static final String EVENT_OBJECT = "SYSTEM.AQ_EVENT_OBJ";
    public static final int _SQL_TYPECODE = OracleTypes.STRUCT;

    protected MutableStruct _struct;

    protected static int[] _sqlType = { java.sql.Types.VARCHAR, java.sql.Types.VARBINARY };
    protected static ORADataFactory[] _factory = new ORADataFactory[2];
    protected static final OracleAQObjORADataFactory  _AqEventObjFactory = new OracleAQObjORADataFactory ();

    public static ORADataFactory getORADataFactory() {
        return _AqEventObjFactory;
    }

    /* constructors */
    protected void _init_struct(boolean init) {
        if (init)
            _struct = new MutableStruct(new Object[2], _sqlType, _factory);
    }

    public OracleAQObjORADataFactory () {
        _init_struct(true);
    }

    public OracleAQObjORADataFactory (String id, byte[] payload) throws SQLException {
        _init_struct(true);
        setId(id);
        setPayload(payload);
    }

    /* ORAData interface */
    public Datum toDatum(Connection c) throws SQLException {
        return _struct.toDatum(c, EVENT_OBJECT);
    }

    /* ORADataFactory interface */
    public ORAData create(Datum d, int sqlType) throws SQLException {
        return create(null, d, sqlType);
    }

    protected ORAData create(OracleAQObjORADataFactory  o, Datum d, int sqlType) throws SQLException {
        if (d == null)
            return null;
        if (o == null)
            o = new OracleAQObjORADataFactory ();
        o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
        return o;
    }

    public String getId() throws SQLException {
        return (String) _struct.getAttribute(0);
    }

    public void setId(String id) throws SQLException {
        _struct.setAttribute(0, id);
    }

    public byte[] getPayload() throws SQLException {
        BLOB blob = (BLOB) _struct.getAttribute(1);
        InputStream inputStream = blob.getBinaryStream();
        return getBytes(inputStream);
    }

    public byte[] getBytes(InputStream body) {
        int c;
        try {
            ByteArrayOutputStream f = new ByteArrayOutputStream();
            while ((c = body.read()) > -1) {
                f.write(c);
            }
            f.close();
            byte[] result = f.toByteArray();
            return result;
        }
        catch (Exception e) {
            System.err.println("Exception: " + e.getMessage());
            e.printStackTrace();
            return null;
        }
    }

    public void setPayload(byte[] payload) throws SQLException {
        _struct.setAttribute(1, payload);
    }

}

You're probably using Camel or Spring in your project, in which case - 1. If you're on Camel 2.10.2 or upwards, you can create a JMS consumer with a custom message lister container (CAMEL-5676) 2. If you're on a previous version then you may not be able to use the endpoint way (i couldn't figure it out), but you can use a JMS request listener

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
                        http://www.springframework.org/schema/jms
                        http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

    <!-- this is just an example, you can also use a datasource as the ctor arg -->
    <bean id="connectionFactoryOracleAQQueue" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory">
        <constructor-arg index="0">
            <value>jdbc:oracle:thin:@blrub442:1522:UB23</value>
        </constructor-arg>
        <constructor-arg index="1" type="java.util.Properties">
            <value></value>
        </constructor-arg>
    </bean>

    <bean id="oracleQueueCredentials" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
        <property name="targetConnectionFactory">
            <ref bean="connectionFactoryOracleAQQueue" />
        </property>
        <property name="username">
            <value>system</value>
        </property>
        <property name="password">
            <value>oracle</value>
        </property>
    </bean>

    <!-- Definitions for JMS Listener classes that we have created -->
    <bean id="aqMessageListener" class="com.misys.test.JmsRequestListener" />

    <bean id="aqEventQueue" class="com.misys.test.OracleAqQueueFactoryBean">
        <property name="connectionFactory" ref="oracleQueueCredentials" />
        <property name="oracleQueueName" value="BOZ_SINGLE_QUEUE" />
    </bean>

    <!-- The Spring DefaultMessageListenerContainer configuration. This bean is automatically loaded when the JMS application context is started -->
    <bean id="jmsContainer" class="com.misys.test.AQMessageListenerContainer" scope="singleton">
        <property name="connectionFactory" ref="oracleQueueCredentials" />
        <property name="destination" ref="aqEventQueue" />
        <property name="messageListener" ref="aqMessageListener" />
        <property name="sessionTransacted" value="false" />
    </bean>

</beans>

The custom message listener container

public class AQMessageListenerContainer extends DefaultMessageListenerContainer {

    @Override
    protected MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {
        return ((AQjmsSession) session).createConsumer(destination, getMessageSelector(),
                OracleAQObjORADataFactory.getORADataFactory(), null, isPubSubNoLocal());
    }
}

and the request listener onMessage method

public void onMessage(Message msg) {
    try {
        AQjmsAdtMessage aQjmsAdtMessage = (AQjmsAdtMessage) msg;
        OracleAQObjORADataFactory obj = (OracleAQObjORADataFactory) aQjmsAdtMessage.getAdtPayload();

        System.out.println("Datetime: " + obj.getId());
        System.out.println("Payload: " + new String(obj.getPayload(), Charset.forName("UTF-8")));
    }
    catch (Exception jmsException) {
        if (logger.isErrorEnabled()) {
            logger.error(jmsException.getLocalizedMessage());
        }
    }
}