Scala, check if Actor has exited

2019-06-21 13:24发布

问题:

in Scala 2.8 when I start actors, I can communicate via message passing. This in turn means that I can send the ultimate Exit() message or whatever I decide fits my protocol.

But how will I check if an actor has exited? I can easily imagine myself having a task where a master actor starts some worker actors and then simply waits for the answers, each time checking if this was the final answer (i.e. are any Actors still working or did they all exit?).

Of course I can let them all send back an "I'm done" message, and then count them, but this is somehow unsatisfactory.

What is best practise when testing for the completion of worker-actors?

EDIT#1

Hey guys, I'm looking into Futures, but having trouble. Can someone explain why this code doesn't work:

package test
import scala.actors.Futures._

object FibFut extends Application{

    def fib(i:Int):Int = 
        if(i<2)
            1
        else
            fib(i-1)+fib(i-2)

    val f = future{ fib(3) }

    println(f())    

}

It works if I define the function fib inside the future-body. It must be a scope thing, but I don't get any errors with the above, it simply hangs. Anyone?

EDIT#2

It seems that extending Application wasn't a nice way to go. Defining a main method made everything work. The below code is what I was looking for, so Futures get the thumbs up :)

package test

import scala.actors.Futures._

object FibFut {

  def fib(i: Int): Int = if (i < 2) 1 else fib(i - 1) + fib(i - 2)

  def main(args: Array[String]) {

    val fibs = for (i <- 0 to 50) yield future { fib(i) }

    for (future <- fibs) println(future())

  }

}

回答1:

I'm a fan of "I'm done" messages, personally; it's a good way to manage distribution of work, and as a bonus, you already know when all children have finished what they're doing.

But if you really just want to farm out some work once and wait until everything is ready, check out scala.actors.Futures. You can ask it to do some computation:

val futureA = Futures.future {
  val a = veryExpensiveOperation
  (a,"I'm from the future!")
}

and then you can wait for everything to complete, if you have made multiple requests:

Futures.awaitAll(600*1000, futureA, futureB, futureC, futureD)
// Returns once all of A-D have been computed
val actualA = futureA()   // Now we get the value


回答2:

A while ago I wrote a post on linking actors in Scala. Actor linking is an idiomatic [and the easiest] way to monitor actors in Erlang, Scala Actors and other actor libraries. By defalt, when you link 2 actors, and one of them dies, another immediately dies too (unless the actor traps/handles exit signal):

scala> case object Stop
defined module Stop

scala>

scala> val actor1 = actor {
     |    loop {
     |       react {
     |          case Stop =>
     |             println("Actor 1: stop")
     |             exit()
     |          case msg => println(msg)
     |             }
     |         }
     | }
actor1: scala.actors.Actor = scala.actors.Actor$$anon$1@1feea62

scala>

scala> val actor2 = actor {
     |    link(actor1)
     |    self.trapExit = true
     |    loop {
     |       react {
     |          case msg => println(msg)
     |             }
     |         }
     | }
actor2: scala.actors.Actor = scala.actors.Actor$$anon$1@1e1c66a

scala> actor1.start
res12: scala.actors.Actor = scala.actors.Actor$$anon$1@1feea62

scala> actor2.start
res13: scala.actors.Actor = scala.actors.Actor$$anon$1@1e1c66a

scala> actor1 ! Stop
Actor 1: stop

scala> Exit(scala.actors.Actor$$anon$1@1feea62,'normal)  // Actor 2 received message, when Actor1 died

A more sophisticated and flexible way is using supervisors (supervisor behavior in Erlang, actor supervisors in Akka Actors library, etc). A supervisor (being itself an actor) monitors a number of other actors, and restarts them with regards to a specified strategy (restart all actors, if one dies; restart just one actor, when it dies).



回答3:

Okay everyone, i have come up with a solution using the getState function of the actor class. In the solution I used an idea from this thread: Best method to peek into a Scala Actor's Mailbox in which reactWithin(0) is used. I ran into trouble when using react and loop where the program would simply block on big calculations. This was solved by replacing loop with while(true) and reactWithin(int) with receiveWithin(int).

My solution looks as follows (beware, bigass code-lump):

package test

import scala.actors._
import scala.actors.Actor.State._

case class Computation(index: Int, a: () ⇒ Int)
case class Result(i: String)
object Main {
  def main(args: Array[String]) {
    val m = new Master
    m.start
  }
}

class Master extends Actor {

  val N = 40
  var numberOfAnswers = 0

  def fib(x: Int): Int =
    if (x < 2)
      1
    else
      fib(x - 1) + fib(x - 2)

  val computers = for (i ← 0 to N) yield new Computer

  def act {

    for (i ← 0 until computers.size) {
      computers(i).start
      computers(i) ! Computation(i, () => fib(i))
    }

    println("done Initializing actors")
    while (true) {
      receiveWithin(1000) {

        case Result(i) =>
          val numberDone = computers.map(_.getState == Terminated).filter(_ == true).length
          println(i)
          numberOfAnswers += 1

        case TIMEOUT =>
          val allDone = computers.map(_.getState == Terminated).reduceRight(_ && _)
          println("All workers done?:" + allDone)
          println("# of answers:" + numberOfAnswers)
          if (allDone)
            exit()
      }
    }

  }

}

class Computer extends Actor {

  def act {
    loop {
      react {
        case Computation(i, f) ⇒
          sender ! Result("#" + i + " Res:" + f())
          exit()
      }
    }
  }

}

The program calculates the fibonacci numbers (in the worst possible way). The idea is simply to test utilization of multiple threads for big workloads. The following line checks whether some actor has yet to terminate:

computers.map(_.getState == Terminated).reduceRight(_ && _)

where computers is of the type IndexedSeq[Computer]. The trick is that using the TIMEOUT message, I can periodically check if all the work is done and act accordingly (in this case, exit when there are no more active workers). I take advantage of the fact that each worker sends the results before they exit. In that way I know that I will always receive the results and handle them before they will be shown as Terminated.

Can someone comment on the fact that the program "locks up" (stops receiving messages) when I use react and loop instead of while(true) and receive?