ActiveMQ connections/sessions in PooledConnectionF

2019-07-16 04:17发布

问题:

I have a 2 broker cluster on AMQ 5.12.0 that I’m sending messages through from a Tomcat 8 web app. I have the following Camel (2.15.2) / Spring (4.2.1) config. which I’m using to configure an AMQ PooledConnectionFactory.

<camel:camelContext id="camel-client">
    <camel:template id="camelTemplate"/>
</camel:camelContext>

<bean id="jmsConnectionFactory"
      class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL"
              value="failover:(tcp://dev2.wtgroupllc.com:61626?keepAlive=true&amp;daemon=true&amp;connectionTimeout=3000,tcp://dev3.wtgroupllc.com:61626?keepAlive=true&amp;daemon=true&amp;connectionTimeout=3000)?timeout=1000" />
</bean>

<bean id="pooledConnectionFactory"
      class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop">
    <property name="maxConnections" value="3" />
    <property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>

<bean id="jmsConfig"
      class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="pooledConnectionFactory"/>
    <property name="deliveryPersistent" value="true" />
   <property name="concurrentConsumers" value="0"/>
</bean>

<bean id="activemq"
      class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="jmsConfig"/>
</bean>

When the web app starts I grab the ProducerTemplate from the beanFactory and use it to send a message to a “primer” queue in order to start up to maxConnection connections. Subsequently I send real messages using the ProducerTemplate (“camelTemplate”). As long as only 1 of the brokers is running the camelTemplate is able to utilize a connection from the pool…

10:21:12,991 DEBUG http-nio-29080-exec-1 support.DefaultListableBeanFactory:250 - Returning cached instance of singleton bean 'camelTemplate'
10:21:12,998 TRACE http-nio-29080-exec-1 activemq.ActiveMQSession:1952 - ID:momo-59846-1442942456946-1:2:1 sending message: ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:momo-59846-1442942456946-1:2:1:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:momo-59846-1442942456946-1:2:1:1, destination = queue://camel0, transactionId = null, expiration = 0, timestamp = 1442942472997, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {breadcrumbId=ID-momo-59844-1442942455140-0-7}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = test msg 0}
10:21:13,084 DEBUG http-nio-29080-exec-1 support.DefaultListableBeanFactory:250 - Returning cached instance of singleton bean 'camelTemplate'
22-Sep-2015 10:21:13.083 INFO [http-nio-29080-exec-1] null.null Sent: test msg 0

As soon as a start the other broker all subsequent attempts to send a message create new connections similar to below…

10:23:27,804 DEBUG http-nio-29080-exec-5 support.DefaultListableBeanFactory:250 - Returning cached instance of singleton bean 'camelTemplate'
10:23:27,864 DEBUG ActiveMQ InactivityMonitor WriteCheckTimer transport.AbstractInactivityMonitor:149 - WriteChecker: 10000ms elapsed since last write check.
10:23:27,864 TRACE ActiveMQ InactivityMonitor WriteCheckTimer transport.AbstractInactivityMonitor:224 - tcp://dev2.wtgroupllc.com/207.99.46.157:61626@59848 message sent since last write check, resetting flag.
10:23:27,898 TRACE http-nio-29080-exec-5 util.ThreadPoolUtils:109 - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@14c1e2[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] with await termination: 0 millis
10:23:27,903 DEBUG http-nio-29080-exec-5 util.ThreadPoolUtils:136 - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@14c1e2[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.001 seconds.
10:23:27,904 DEBUG http-nio-29080-exec-5 failover.FailoverTransport:377 - Stopped tcp://dev2.wtgroupllc.com:61626?keepAlive=true&daemon=true&connectionTimeout=3000
10:23:27,905 TRACE http-nio-29080-exec-5 thread.PooledTaskRunner:95 - Shutdown timeout: 0 task: org.apache.activemq.transport.failover.FailoverTransport$2@181b67d
10:23:27,906 DEBUG http-nio-29080-exec-5 util.ThreadPoolUtils:54 - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5d8504[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
10:23:27,907 TRACE http-nio-29080-exec-5 util.ThreadPoolUtils:57 - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5d8504[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2] is shutdown: true and terminated: true.
10:23:27,908 DEBUG http-nio-29080-exec-5 tcp.TcpTransport:525 - Stopping transport tcp://dev2.wtgroupllc.com/207.99.46.157:61626@59848
10:23:27,909 DEBUG http-nio-29080-exec-5 thread.TaskRunnerFactory:92 - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@be26e6[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
10:23:27,911 TRACE http-nio-29080-exec-5 thread.TaskRunnerFactory:151 - Execute[ActiveMQ Task] runnable: org.apache.activemq.transport.tcp.TcpTransport$1@a7d84a
10:23:27,912 TRACE http-nio-29080-exec-5 thread.TaskRunnerFactory:186 - Created thread[ActiveMQ Task-1]: Thread[ActiveMQ Task-1,5,main]
10:23:27,914 TRACE ActiveMQ Task-1 tcp.TcpTransport:540 - Closing socket Socket[addr=dev2.wtgroupllc.com/207.99.46.157,port=61626,localport=59848]
10:23:27,916 DEBUG ActiveMQ Task-1 tcp.TcpTransport:543 - Closed socket Socket[addr=dev2.wtgroupllc.com/207.99.46.157,port=61626,localport=59848]
10:23:27,918 DEBUG http-nio-29080-exec-5 util.ThreadPoolUtils:54 - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@be26e6[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 1]
10:23:27,919 TRACE http-nio-29080-exec-5 util.ThreadPoolUtils:57 - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@be26e6[Shutting down, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1] is shutdown: true and terminated: false.
10:23:27,922 TRACE http-nio-29080-exec-5 pool.PooledConnectionFactory:133 - Connection has expired: ConnectionPool[null] and will be destroyed
10:23:27,923 TRACE http-nio-29080-exec-5 pool.PooledConnectionFactory:120 - Destroying connection: ConnectionPool[null]
10:23:28,006 TRACE http-nio-29080-exec-5 util.ThreadPoolUtils:109 - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@41500f[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] with await termination: 0 millis
10:23:28,007 DEBUG http-nio-29080-exec-5 util.ThreadPoolUtils:136 - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@41500f[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.001 seconds.
10:23:28,009 DEBUG http-nio-29080-exec-5 failover.FailoverTransport:377 - Stopped tcp://dev2.wtgroupllc.com:61626?keepAlive=true&daemon=true&connectionTimeout=3000
10:23:28,010 TRACE http-nio-29080-exec-5 thread.PooledTaskRunner:95 - Shutdown timeout: 0 task: org.apache.activemq.transport.failover.FailoverTransport$2@1b3f63d
10:23:28,011 DEBUG http-nio-29080-exec-5 util.ThreadPoolUtils:54 - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@34a41[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
10:23:28,013 TRACE http-nio-29080-exec-5 util.ThreadPoolUtils:57 - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@34a41[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2] is shutdown: true and terminated: true.
10:23:28,014 DEBUG http-nio-29080-exec-5 tcp.TcpTransport:525 - Stopping transport tcp://dev2.wtgroupllc.com/207.99.46.157:61626@59850
10:23:28,015 DEBUG http-nio-29080-exec-5 thread.TaskRunnerFactory:92 - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@632e39[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
10:23:28,016 TRACE http-nio-29080-exec-5 thread.TaskRunnerFactory:151 - Execute[ActiveMQ Task] runnable: org.apache.activemq.transport.tcp.TcpTransport$1@1a540cd
10:23:28,018 TRACE http-nio-29080-exec-5 thread.TaskRunnerFactory:186 - Created thread[ActiveMQ Task-1]: Thread[ActiveMQ Task-1,5,main]
10:23:28,020 TRACE ActiveMQ Task-1 tcp.TcpTransport:540 - Closing socket Socket[addr=dev2.wtgroupllc.com/207.99.46.157,port=61626,localport=59850]
10:23:28,021 DEBUG ActiveMQ Task-1 tcp.TcpTransport:543 - Closed socket Socket[addr=dev2.wtgroupllc.com/207.99.46.157,port=61626,localport=59850]
10:23:28,022 DEBUG http-nio-29080-exec-5 util.ThreadPoolUtils:54 - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@632e39[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
10:23:28,024 TRACE http-nio-29080-exec-5 util.ThreadPoolUtils:57 - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@632e39[Shutting down, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1] is shutdown: true and terminated: false.
10:23:28,026 TRACE http-nio-29080-exec-5 pool.PooledConnectionFactory:133 - Connection has expired: ConnectionPool[null] and will be destroyed
10:23:28,027 TRACE http-nio-29080-exec-5 pool.PooledConnectionFactory:120 - Destroying connection: ConnectionPool[null]
10:23:28,108 TRACE http-nio-29080-exec-5 util.ThreadPoolUtils:109 - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@b69a99[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] with await termination: 0 millis
10:23:28,109 DEBUG http-nio-29080-exec-5 util.ThreadPoolUtils:136 - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@b69a99[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.001 seconds.
10:23:28,110 DEBUG http-nio-29080-exec-5 failover.FailoverTransport:377 - Stopped tcp://dev2.wtgroupllc.com:61626?keepAlive=true&daemon=true&connectionTimeout=3000
10:23:28,111 TRACE http-nio-29080-exec-5 thread.PooledTaskRunner:95 - Shutdown timeout: 0 task: org.apache.activemq.transport.failover.FailoverTransport$2@c69b46
10:23:28,112 DEBUG http-nio-29080-exec-5 util.ThreadPoolUtils:54 - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@12d12b9[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]
10:23:28,113 TRACE http-nio-29080-exec-5 util.ThreadPoolUtils:57 - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@12d12b9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2] is shutdown: true and terminated: true.
10:23:28,114 DEBUG http-nio-29080-exec-5 tcp.TcpTransport:525 - Stopping transport tcp://dev2.wtgroupllc.com/207.99.46.157:61626@59847
10:23:28,116 DEBUG http-nio-29080-exec-5 thread.TaskRunnerFactory:92 - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@142c77b[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
10:23:28,118 TRACE http-nio-29080-exec-5 thread.TaskRunnerFactory:151 - Execute[ActiveMQ Task] runnable: org.apache.activemq.transport.tcp.TcpTransport$1@1a0337b
10:23:28,119 TRACE http-nio-29080-exec-5 thread.TaskRunnerFactory:186 - Created thread[ActiveMQ Task-1]: Thread[ActiveMQ Task-1,5,main]
10:23:28,120 TRACE ActiveMQ Task-1 tcp.TcpTransport:540 - Closing socket Socket[addr=dev2.wtgroupllc.com/207.99.46.157,port=61626,localport=59847]
10:23:28,121 DEBUG ActiveMQ Task-1 tcp.TcpTransport:543 - Closed socket Socket[addr=dev2.wtgroupllc.com/207.99.46.157,port=61626,localport=59847]
10:23:28,122 DEBUG http-nio-29080-exec-5 util.ThreadPoolUtils:54 - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@142c77b[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
10:23:28,124 TRACE http-nio-29080-exec-5 util.ThreadPoolUtils:57 - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@142c77b[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] is shutdown: true and terminated: true.
10:23:28,149 TRACE http-nio-29080-exec-5 pool.PooledConnectionFactory:133 - Connection has expired: ConnectionPool[null] and will be destroyed
10:23:28,150 TRACE http-nio-29080-exec-5 pool.PooledConnectionFactory:120 - Destroying connection: ConnectionPool[null]
10:23:28,152 DEBUG http-nio-29080-exec-5 thread.TaskRunnerFactory:92 - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@1e5d48e[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
10:23:28,156 DEBUG http-nio-29080-exec-5 failover.FailoverTransport:789 - Reconnect was triggered but transport is not started yet. Wait for start to connect the transport.
10:23:28,158 DEBUG http-nio-29080-exec-5 failover.FailoverTransport:352 - Started unconnected
10:23:28,159 DEBUG http-nio-29080-exec-5 failover.FailoverTransport:782 - Waking up reconnect task
10:23:28,160 TRACE http-nio-29080-exec-5 thread.TaskRunnerFactory:186 - Created thread[ActiveMQ Task-1]: Thread[ActiveMQ Task-1,5,main]
10:23:28,164 TRACE http-nio-29080-exec-5 pool.PooledConnectionFactory:107 - Created new connection: ConnectionPool[ActiveMQConnection {id=ID:momo-59846-1442942456946-1:4,clientId=null,started=false}]
10:23:28,166 TRACE http-nio-29080-exec-5 failover.FailoverTransport:612 - Waiting for transport to reconnect..: ConnectionInfo {commandId = 1, responseRequired = true, connectionId = ID:momo-59846-1442942456946-1:4, clientId = ID:momo-59846-1442942456946-0:4, clientIp = null, userName = null, password = *****, brokerPath = null, brokerMasterConnector = false, manageable = true, clientMaster = true, faultTolerant = true, failoverReconnect = false}
10:23:28,185 TRACE ActiveMQ Task-1 thread.PooledTaskRunner:132 - Running task iteration 0 - org.apache.activemq.transport.failover.FailoverTransport$2@1fb47e7
10:23:28,186 DEBUG ActiveMQ Task-1 failover.FailoverTransport:817 - urlList connectionList:[tcp://dev2.wtgroupllc.com:61626?keepAlive=true&daemon=true&connectionTimeout=3000, tcp://dev3.wtgroupllc.com:61626?keepAlive=true&daemon=true&connectionTimeout=3000], from: [tcp://dev2.wtgroupllc.com:61626?keepAlive=true&daemon=true&connectionTimeout=3000, tcp://dev3.wtgroupllc.com:61626?keepAlive=true&daemon=true&connectionTimeout=3000]
10:23:28,187 DEBUG ActiveMQ Task-1 failover.FailoverTransport:1027 - Attempting  0th  connect to: tcp://dev2.wtgroupllc.com:61626?keepAlive=true&daemon=true&connectionTimeout=3000
10:23:28,188 TRACE ActiveMQ Task-1 transport.AbstractInactivityMonitor:407 - Starting connection check task for: tcp://dev2.wtgroupllc.com:61626?keepAlive=true&daemon=true&connectionTimeout=3000
10:23:28,281 DEBUG ActiveMQ Task-1 transport.WireFormatNegotiator:82 - Sending: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
10:23:28,283 DEBUG ActiveMQ Task-1 failover.FailoverTransport:1037 - Connection established
10:23:28,284  INFO ActiveMQ Task-1 failover.FailoverTransport:1068 - Successfully connected to tcp://dev2.wtgroupllc.com:61626?keepAlive=true&daemon=true&connectionTimeout=3000
10:23:28,284 TRACE ActiveMQ Task-1 thread.TaskRunnerFactory:186 - Created thread[ActiveMQ Task-2]: Thread[ActiveMQ Task-2,5,main]
10:23:28,285 TRACE ActiveMQ Task-1 thread.PooledTaskRunner:50 - Run task done: org.apache.activemq.transport.failover.FailoverTransport$2@1fb47e7

Please advise how I can get pooled connections to be used in a failover: configuration. Many thanks.