Receiving multiple messages from MQ asynchronously

2020-01-30 02:45发布

问题:

I use Spring + Hibernate + JPA in my application.

I need to read the message from Websphere MQ and insert the message to DB. Sometimes there may be continuous messages available and sometimes very less number of messages and sometimes we can expect no message from Queue.

Currently I'm reading the message one by one and inserting them to Database. But it does not help much in terms of performance.

I mean when I have chunk of messages(Example 300k messages in Queue) I could not insert them faster. Number of entities inserted to DB per second is not so high. Because I do commit for every single entity.

I want to use hibernate batch processing, so that I can insert list of entities in a single commit. (Example: 30 to 40 messages per commit)

Questions:

  1. How to receive multiple messages from Queue? (I have checked that BatchMessageListenerContainer may be helpful. But I could not get some reference)

  2. Should I separate the db insertion process out side onMessage method? So that thread will be released to pool and be available for picking next messages from Queue?

  3. Parallel threads usage?

Current implementation:

Message Listener:

<bean id="myMessageListener" class="org.mypackage.MyMessageListener">

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationName" ref="queue"/>
    <property name="messageListener" ref="myMessageListener"/>
    <property name ="concurrentConsumers" value ="10"/>
    <property name ="maxConcurrentConsumers" value ="50"/>        
</bean>

Listener Class:

package org.mypackage.MyMessageListener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.mypackage.service.MyService;

public class MyMessageListener implements MessageListener {

    @Autowired
    private MyService myService;

    @Override
    public void onMessage(Message message) {
        try {
             TextMessage textMessage = (TextMessage) message;
             // parse the message
             // Process the message to DB
        } catch (JMSException e1) {
             e1.printStackTrace();
        }
    }
}

回答1:

It's not clear what your requirements are.

The Spring Batch project provides a BatchMessageListenerContainer.

Message listener container adapted for intercepting the message reception with advice provided through configuration. To enable batching of messages in a single transaction, use the TransactionInterceptor and the RepeatOperationsInterceptor in the advice chain (with or without a transaction manager set in the base class). Instead of receiving a single message and processing it, the container will then use a RepeatOperations to receive multiple messages in the same thread. Use with a RepeatOperations and a transaction interceptor. If the transaction interceptor uses XA then use an XA connection factory, or else the TransactionAwareConnectionFactoryProxy to synchronize the JMS session with the ongoing transaction (opening up the possibility of duplicate messages after a failure). In the latter case you will not need to provide a transaction manager in the base class - it only gets on the way and prevents the JMS session from synchronizing with the database transaction.