Properly Shutting Down ActiveMQ and Spring Default

2019-04-30 02:13发布

问题:

Our system will not shutdown when a "Stop" command is issued from the Tomcat Manager. I have determined that it is related to ActiveMQ/Spring. I have even figured out how to get it to shutdown, however my solution is a hack (at least I hope this isn't the "correct" way to do it). I would like to know the proper way to shutdown ActiveMQ so that I can remove my hack.

I inherited this component and I have no information about why certain architectural decisions were made, after a lot of digging I think I understand his thoughts, but I could be missing something. In other words, the real problem could be in the way that we are trying to use ActiveMQ/Spring.

We run in ServletContainer (Tomcat 6/7) and use ActiveMQ 5.9.1 and Spring 3.0.0 Multiple instances of our application can run in a "group", with each instance running on it's own server. ActiveMQ is used to facilitate communication between the multiple instances. Each instance has it's own embedded broker and it's own set of queues. Every queue on every instance has exactly 1 org.springframework.jms.listener.DefaultMessageListenerContainer listening to it, so 5 queues = 5 DefaultMessageListenerContainers for example.

Our system shut down properly until we fixed a bug by adding queuePrefetch="0" to the ConnectionFactory. At first I assumed that this change was incorrect in some way, but now that I understand the situation, I am confident that we should not be using the prefetch functionality.

I have created a test application to replicate the issue. Note that the information below makes no mention of message producers. That is because I can replicate the issue without ever sending/processing a single message. Simply creating the Broker, ConnectionFactory, Queues and Listeners during boot, is enough to keep the system from stopping properly.

Here is my sample configuration from my Spring XML. I will be happy to provide my entire project if someone wants it:

<amq:broker persistent="false" id="mybroker"> 
 <amq:transportConnectors> 
  <amq:transportConnector uri="tcp://0.0.0.0:61616"/> 
 </amq:transportConnectors> 
</amq:broker> 

<amq:connectionFactory id="ConnectionFactory" brokerURL="vm://localhost?broker.persistent=false" > 
 <amq:prefetchPolicy> 
  <amq:prefetchPolicy queuePrefetch="0"/> 
 </amq:prefetchPolicy> 
</amq:connectionFactory> 

<amq:queue id="lookup.mdb.queue.cat" physicalName="DogQueue"/> 
<amq:queue id="lookup.mdb.queue.dog" physicalName="CatQueue"/> 
<amq:queue id="lookup.mdb.queue.fish" physicalName="FishQueue"/> 

<bean id="messageListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer" abstract="true"> 
 <property name="connectionFactory" ref="ConnectionFactory"/> 
</bean> 

<bean parent="messageListener" id="cat"> 
 <property name="destination" ref="lookup.mdb.queue.dog"/> 
 <property name="messageListener"> 
  <bean class="com.acteksoft.common.remote.jms.WorkerMessageListener"/> 
 </property> 
 <property name="concurrentConsumers" value="200"/> 
 <property name="maxConcurrentConsumers" value="200"/> 
</bean> 

<bean parent="messageListener" id="dog"> 
 <property name="destination" ref="lookup.mdb.queue.cat"/> 
 <property name="messageListener"> 
  <bean class="com.acteksoft.common.remote.jms.WorkerMessageListener"/> 
 </property> 
 <property name="concurrentConsumers" value="200"/> 
 <property name="maxConcurrentConsumers" value="200"/> 
</bean> 

<bean parent="messageListener" id="fish"> 
 <property name="destination" ref="lookup.mdb.queue.fish"/> 
 <property name="messageListener"> 
  <bean class="com.acteksoft.common.remote.jms.WorkerMessageListener"/> 
 </property> 
 <property name="concurrentConsumers" value="200"/> 
 <property name="maxConcurrentConsumers" value="200"/> 
</bean> 

My hack involves using a ServletContextListener to manually stop the objects. The hacky part is that I have to create additional threads to stop the DefaultMessageListenerContainers. Perhaps I'm stopping the objects in the wrong order, but I've tried everything that I can imagine. If I attempt to stop the objects in the main thread, then they hang indefinitely.

Thank you in advance!

UPDATE I have tried the following based on boday's recommendation but it didn't work. I have also tried to specify the amq:transportConnector uri as tcp://0.0.0.0:61616?transport.daemon=true

  <amq:broker persistent="false" id="mybroker" brokerName="localhost">
   <amq:transportConnectors>
    <amq:transportConnector uri="tcp://0.0.0.0:61616?daemon=true"/>
   </amq:transportConnectors>
  </amq:broker>

  <amq:connectionFactory id="connectionFactory" brokerURL="vm://localhost" >
   <amq:prefetchPolicy>
    <amq:prefetchPolicy queuePrefetch="0"/>
   </amq:prefetchPolicy>
  </amq:connectionFactory>

At one point I tried to add similar properties to the brokerUrl parameter in the amq:connectionFactory element and the shutdown worked properly, however after further testing I learned that the properties were resulting in an exception to be thrown from VMTransportFactory. This resulted in improper initialization and the basic message functionality didn't work.

回答1:

try setting daemon=true on your TCP transport, this allows the process to run as a deamon thread which won't block the shutdown of your container

see http://activemq.apache.org/tcp-transport-reference.html



回答2:

In case anyone else is wondering, as far as I can see it's not possible to have a daemon ListenerContainer using ActiveMQ.

When the ActiveMQConnection is started, it creates a ThreadPoolExecutor with non-daemon thread. This is seemingly to avoid issues when failing over the connection from one broker to another.

https://issues.apache.org/jira/browse/AMQ-796

executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
        //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
        //thread.setDaemon(true);
        return thread;
    }
});