Akka: waiting for multiple messages

2019-07-24 17:57发布

问题:

Hi akka gurus:) Can you guide me in this one?

What I'm trying to do - Actor A asks Actor B for a message and wait's for one to arrive back. But, somehow Actor B gives back to A not one message, but 4 of them. Actor A Future completes properly, but 3 of rest messages are counted as dead letters. Why? Is this right? I mean, Actor A has a proper handler, why the letters are dead then? :-(

[INFO] [11/22/2013 22:00:38.975] [ForkJoinPool-2-worker-7] [akka://actors/user/a] Got result pong [INFO] [11/22/2013 22:00:38.976] [actors-akka.actor.default-dispatcher-4] [akka://actors/deadLetters] Message [java.lang.String] from Actor[akka://actors/user/b#-759739990] to Actor[akka://actors/deadLetters] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. ...Same message 2 more times...

Please take a look at the code.

package head_thrash

import akka.actor._
import akka.util.Timeout
import scala.concurrent.duration._
import akka.pattern.ask
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object Main extends App {
  val system = ActorSystem("actors")

  val a = system.actorOf(Props[A], "a")
  val b = system.actorOf(Props[B], "b")

  a ! "ping"
  system.awaitTermination()
}

class A extends Actor with ActorLogging {

  implicit val timeout = Timeout(5.seconds)

  def receive = {
    case "ping" => {
      val b = context.actorSelection("../b")
      val future: Future[String] = ask(b, "ping").mapTo[String]
      future.onSuccess {
        case result: String ⇒ {
          log.info("Got result " + result) // <-- got result pong here, that's okay
        }
      }
    }
    case "pong" => {
      log.info("hmmm...")
    }
  }
}

class B extends Actor with ActorLogging {
  def receive = {
    case "ping" => {
      sender ! "pong"
      sender ! "pong" // <-- dead letter!
      sender ! "pong" // <-- dead letter!
      sender ! "pong" // <-- dead letter!
    }
  }
}

That really confuses me. Now you can ask - hey man, why do you need B to send many messages? Well, that is the part of more complicated case - A asks B for message. B answers. Then A waits for another message from B. The tricky part here is plain waiting after the Future completes - and I just can't make my mind to get that model fit into Akka basis.

But fow now, how can I get all 4 messages handled correctly, without dead letters? Thanks :-D

回答1:

Your problem is that actor B is not answering to actor A. If we read the documentation of the ask pattern we find that ask create a temporary one-off actor for receiving a reply to a message and complete a scala.concurrent.Future with it.

This temporary actor does not handle "pong" messages at all, he is just waiting for any answer that you are then casting as a Future of String.

If you want to fix this, you have to modify your actor B so that first it answers to the temporary "ask actor" and then sends messages directly to actor A.

class B extends Actor with ActorLogging {
  def receive = {
    case "ping" => {
      sender ! "pong"  //the sender is the temp ask actor
      val a = context.actorSelection("../a") // get a ref on actor A
      a ! "pong"
      a ! "pong"
      a ! "pong"
    }
  }
}

This is not really clean, but now I hope you understand what is going on.



回答2:

You'll find the code easier to write if you don't wait for the message. Actors work best when you don't think of your app in terms of time or sequence of steps. Just have it able to handle the callback. But use ! not ?. If you need to, use become or an FSM to show the two states.

This isn't the question you asked, but it's something you will want to know. Avoiding an imperative style with actors keeps most of these kind of bugs from happening.