Consuming messages from MQ and merging in Spring J

2019-08-30 02:57发布

问题:

I use spring + MQ + Websphere Application server.

I want to consume messages from MQ asynchronously and combine the messages to have the List of messages to be easy to persist N number of Entities to Database in every single commit. (without stressing my target Oracle database with too many commits)

I use The DefaultMessageListenerContainer and I made the onMessage method synchronized to add on the messages(Till batch size) and I create the Thread to wait for the condition to meet(time/size) and push the messages to another thread which does Business Logic and DB persist.

Condition for the thread start:

Once the first message arrived inside onMessage method Thread has to wait to see 25 messages are received within 1000 milliseconds and If 25 messages are not reached within 1000 milliseconds it pushes the available number messages to another thread.

Issue:

I see the Thread is started only during the server stat up and not when onMessage method is invoked first time.

Any suggestions/ other way please to achieve collecting messages from Queue?

applicationContext.xml

<bean id="myMessageListener" class="org.mypackage.MyMessageListener">

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destinationName" ref="queue"/>
    <property name="messageListener" ref="myMessageListener"/>
    <property name ="concurrentConsumers" value ="10"/>
    <property name ="maxConcurrentConsumers" value ="50"/>        
</bean>

Listener :

package org.mypackage.MyMessageListener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.mypackage.service.MyService;

public class MyMessageListener implements MessageListener {

    private volatile long startTime = 0;
    private volatile int messageCount;
    private volatile List<String> messagesFromQueue = null;

    private int batchSize = 25;
    private long maximumBatchWaitTime = 1000;

    @Autowired
    private MyService myService;

    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            boolean threadRun = true;
                while (threadRun) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        System.out.println("InterruptedException is caught inside run method");
                    }
                    if ((messageCount >0 && messageCount == batchSize)) {
                        System.out.println("----Batch size Reached----");
                        threadRun = false;
                        processMsgsFromQueue(messagesFromQueue);
                    } else {

                        if (maximumBatchWaitTime > (System.currentTimeMillis() - startTime)) {
                              System.out.println("----Time limit is not reached----");
                              threadRun = true;
                        } else {
                              threadRun = false;
                              System.out.println("----Time limit is reached----");
                              processMsgsFromQueue(messagesFromQueue);
                        }
                    }
               }
          }
      });

    {
       thread.start();
    }

    @Override
    public synchronized void onMessage(Message message) {
        if (messageCount == 0) {
            startTime = System.currentTimeMillis();
            messagesFromQueue = new ArrayList<String>();
            System.out.println("----First Message Arrived at----"+startTime);
        }
        try {
            messageCount++;
            TextMessage tm = (TextMessage) message;
            String msg = tm.getText();
            messagesFromQueue.add(msg);

            if (messageCount == 0) {
                thread.start();
            }

        } catch (JMSException e1) {
             e1.printStackTrace();
        }
    }

    private void processMsgsFromQueue(List<String> messageFromQueue) {
       System.out.println("Inside processMsgsFromQueue");
       messageCount = 0;
       messagesFromQueue =  null;
       if(!messageFromQueue.isEmpty()) {
        this.myService.insertMsgsFromQueueToDB(messageFromQueue);
       }
   }
}

回答1:

you need to synchronize access to messagesFromQueue too.

List messagesFromQueue = Collections.synchronizedList(new ArrayList());
      ...
  synchronized (messagesFromQueue) {
      Iterator i = messagesFromQueue.iterator(); // Must be in synchronized block
      while (i.hasNext())
      ...
  }

https://docs.oracle.com/javase/7/docs/api/java/util/Collections.html#synchronizedList(java.util.List)

on each call to processMsgsFromQueue you'll have a NullPointerException!!

    private void processMsgsFromQueue(List<String> messageFromQueue) {
       System.out.println("Inside processMsgsFromQueue");
       messageCount = 0;
       messagesFromQueue =  null;
       if(!messageFromQueue.isEmpty()/*messageFromQueue is null!!*/) {
        this.myService.insertMsgsFromQueueToDB(messageFromQueue);
       }
   }

it is better to persist messages and when commit is ok you clear the list and reset the counter.

package org.mypackage.MyMessageListener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;

import org.mypackage.service.MyService;

public class MyMessageListener implements MessageListener {

    private volatile long startTime = 0;
    private volatile int messageCount;
    private volatile List<String> messagesFromQueue = null;

    private int batchSize = 25;
    private long maximumBatchWaitTime = 1000;

    @Autowired
    private MyService myService;

    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            boolean threadRun = true;
                while (threadRun) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        System.out.println("InterruptedException is caught inside run method");
                    }
                    if ((messageCount >0 && messageCount == batchSize)) {
                        System.out.println("----Batch size Reached----");
                        threadRun = false;
                        processMsgsFromQueue(messagesFromQueue);
                    } else {

                        if (maximumBatchWaitTime > (System.currentTimeMillis() - startTime)) {
                              System.out.println("----Time limit is not reached----");
                              threadRun = true;
                        } else {
                              threadRun = false;
                              System.out.println("----Time limit is reached----");
                              processMsgsFromQueue(messagesFromQueue);
                        }
                    }
               }
          }
      });


    @Override
    public synchronized void onMessage(Message message) {
        if (messageCount == 0) {
            startTime = System.currentTimeMillis();
            messagesFromQueue = new ArrayList<String>();
            System.out.println("----First Message Arrived at----"+startTime);
        }
        try {
            messageCount++;
            TextMessage tm = (TextMessage) message;
            String msg = tm.getText();
            messagesFromQueue.add(msg);

            if (thread.getState() == Thread.State.NEW) {
                thread.start();
            }

        } catch (JMSException e1) {
             e1.printStackTrace();
        }
    }

    private void processMsgsFromQueue(List<String> messageFromQueue) {
       System.out.println("Inside processMsgsFromQueue");
       if(!messageFromQueue.isEmpty()) {
        this.myService.insertMsgsFromQueueToDB(messageFromQueue);
       }
       messageCount = 0;
       messagesFromQueue =  null;
   }
}