ActiveMQ: starting consumer without broker

2020-07-22 19:50发布

问题:

I am writing a JMS client that consumes from a Queue. My broker is activemq, if it matters.

One requirement is that the client should start even if the broker is down. In that case it should behave as if there where no messages in the Queue, and once the broker is up and messages start coming behave accordingly.

The problem is that in my code:

connectionFactory = new ActiveMQConnectionFactory(url);
Connection connection = connectionFactory.createConnection();
connection.start()

If the broker is down, then it gets stuck in connection.start(). While what I would like to have is connection.start() to return silently and to continue to try to connect in the background and consume messages while it can and be silent when it can't.

How can I achieve this.

回答1:

Use separate threads for consuming from the queue and starting the connection. You will need to use a concurrent queue implementation.

Thread 1:

  • Instantiate queue
  • Start Thread 2
  • Try to connect/block
  • Add messages to queue

Thread 2 (or a pool of some sort):

  • Start client
  • Read from queue/block
  • Handle messages


回答2:

There are two ways to work around this type of scenario, one of which is described in AMQ-2114:

  1. By using the broker URI that was suggested in AMQ-2114. By using the broker URI that is suggested in AMQ-2114, an embedded broker is created with a network connection to a remote broker. However, as was pointed out, that broker URI does not use the failover connector.

  2. By using an embedded broker that is manually configured to use a network of brokers. I have used this in the past where an ActiveMQ broker instance is embedded inside your Java app and it is configured with a network connection to a remote broker. This will allow your client to connect to the embedded broker without hanging and will send messages to the embedded broker. Messages will flow from the embedded broker to the remote broker, but only when there is consumer demand for the messages on the remote broker (i.e., consumer connects to the remote broker and requests messages). The connection to the embedded broker can use the failover transport and the network connection to the remote broker can also use the failover transport.

The example I give for this solution is a use case is a Java app on a sales person's laptop. Such an app needs to continue working even if the sales person is not connected to their office network. The solution here is to embed a broker in the Java app so that the app will not hang when sending messages, even if it is not connected to the office network. When the app is again connected to the office network, it will automatically connect to another broker and the messages will flow.

Bruce



回答3:

Just for posterity, I would highly recommend using Spring's DefaultMessageListenerContainer for consuming messages, once configured using the following code snippet:

<jms:listener-container>
    <jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>
</jms:listener-container>

it would take care of connecting to the jms queue infrastructure with your specific requirements(if the JMS infrastructure is down, the application context would still be loaded up, but a retry thread would attempt to recover the connection every few seconds).



回答4:

I think ActiveMQ have protocols (many protocols....) to change connection behavior. Try using 'failover://' or something like that in your url and see how it works.



回答5:

I'm to right now fighting this problem too. See AMQ-2114.

I don't quite understand accepted answer, so I propose bounty.

I'm using C++ version of ActiveMQ. But I believe this should not differ a lot.

I thought about calling connection.start() in separate thread and call and call connection->createSession() in TransportListener::transportResumed(). But createSession hangs up.

Bottom line, I don't have working solution and I happy to know other solution to the problem.