Akka Camel - JMS messages lost - should wait for i

2019-07-07 15:54发布

问题:

My experimental application is quite simple, trying what can be done with Actors and Akka.

After JVM start, it creates actor system with couple of plain actors, JMS consumer (akka.camel.Consumer) and JMS producer (akka.camel.Producer). It sends couple of messages between actors and also JMS producer -> JMS server -> JMS consumer. It basically talks to itself via JMS service.

From time to time I was experiencing weird behaviour: it seemed that from time to time, first of messages which where supposed to be sent to JMS server was somehow lost. By looking at my application logs, I could see that applications is trying to send the message but it was never received by JMS server. (For each run I have to start JVM&Application again).

Akka Camel Documentation mentions that it's possible that some components may not be fully initialized at the begining: "Some Camel components can take a while to startup, and in some cases you might want to know when the endpoints are activated and ready to be used."

I tried to implement following to wait for Camel initialization

val system = ActorSystem("actor-system")
val camel = CamelExtension(system)

val jmsConsumer = system.actorOf(Props[JMSConsumer])
val activationFuture = camel.activationFutureFor(jmsConsumer)(timeout = 10 seconds, executor = system.dispatcher)
val result = Await.result(activationFuture,10 seconds)

which seems to help with this issue. (Although, when removing this step now, I'm not able to recreate this issue any more... :/).

My question is whether this is correct way to ensure all components are fully initialized?

Should I use

val future = camel.activationFutureFor(actor)(timeout = 10 seconds, executor = system.dispatcher)
Await.result(future, 10 seconds)

for each akka.camel.Producer and akka.camel.Consumer actor to be sure that everything is initialized properly?

Is that all I should to do, or something else should be done as well? Documentation is not clean on that and it's not easy to test as issue was happening only occasionaly...

回答1:

You need to initialize the camel JMS component and also Producer before sending any messages.

import static java.util.concurrent.TimeUnit.SECONDS;

import scala.concurrent.Future;

import scala.concurrent.duration.Duration;

import akka.dispatch.OnComplete;

ActorRef producer = system.actorOf(new Props(SimpleProducer.class), "simpleproducer"); 
Timeout timeout = new Timeout(Duration.create(15, SECONDS));

Future<ActorRef> activationFuture = camel.activationFutureFor(producer,timeout,  system.dispatcher());

activationFuture.onComplete(new OnComplete<ActorRef>() {
            @Override
            public void onComplete(Throwable arg0, ActorRef arg1)
                    throws Throwable {

                producer.tell("First!!");
            }
            },system.dispatcher());