I am trying to use Akka's ConsistentHashingRoutingLogic
to guarantee that messages with the same key are routed to the same Actor. It is important that messages with the same key are processed in FIFO ordering. Messages with different keys can be routed to different Actors and processed in parallel freely. I am not using Akka in distributed mode.
The messages are actually JSON messages being read from a RabbitMQ broker so my Master actor receives an AMQP message and uses the routing key as the message key. The same key is also in the message itself. The actor is part of a Spring application.
My Master Actor looks like this:
@Named("MessageHandlerMaster")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MessageHandlerMaster extends UntypedActor {
private static final Logger log = LoggerFactory.getLogger(MessageHandlerMaster.class);
private Router router;
@Autowired
public MessageHandlerMaster(final SpringProps springProps) {
List<Routee> routees = Stream.generate(() -> {
ActorRef worker = getContext().actorOf(springProps.create(MessageHandlerWorker.class));
getContext().watch(worker);
return new ActorRefRoutee(worker);
}).limit(5) //todo: configurable number of workers
.collect(Collectors.toList());
router = new Router(new ConsistentHashingRoutingLogic(getContext().system()), routees);
}
public void onReceive(Object message) {
if (message instanceof Message) {
Message amqpMessage = (Message) message;
String encoding = getMessageEncoding(amqpMessage);
try {
String json = new String(amqpMessage.getBody(), encoding);
String routingKey = amqpMessage.getMessageProperties().getReceivedRoutingKey();
log.debug("Routing message based on routing key " + routingKey);
router.route(new ConsistentHashingRouter.ConsistentHashableEnvelope(json, routingKey), getSender());
} catch (UnsupportedEncodingException e) {
log.warn("Unknown content encoding sent in message! {}", encoding);
}
} else if (message instanceof Terminated) {
//if one of the routee's died, remove it and replace it
log.debug("Actor routee terminated!");
router.removeRoutee(((Terminated) message).actor());
ActorRef r = getContext().actorOf(Props.create(MessageHandlerWorker.class));
getContext().watch(r);
router = router.addRoutee(new ActorRefRoutee(r));
}
}
private static String getMessageEncoding(Message message) {
String encoding = message.getMessageProperties().getContentEncoding();
if ((encoding == null) || (encoding.equals(""))) {
encoding = "UTF-8";
}
return encoding;
}
}
I am initially getting the master once by:
this.master = actorSystem.actorOf(springProps.create(MessageHandlerMaster.class), "master");
and then just submitting the messages to it by:
master.tell(message, ActorRef.noSender());
But when I print the logs from my worker's onReceive()
I see that different dispatcher threads are being used sometimes for the same key.
Also it is not clear why sometimes the same dispatcher thread is being used for the Master actor and for a Worker actor. Shouldn't this be asynchronous message passing between threads?
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.359 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.360 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-9] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerMaster - Routing message based on routing key 10420186
16:45:13.361 [aggregator-akka.actor.default-dispatcher-10] DEBUG c.u.o.a.actors.MessageHandlerWorker - Handling message for 10420186
As you can see here, the dispatcher thread for the Worker processing message with key 10420186 was sometimes 9 and sometimes 10. The Master actor sometimes also used these 2 threads.
How can I be sure that the ConsistentHashingRoutingLogic
is actually working and the same thread processes messages with the same key? Am I doing something wrong in my router initialisation?