Akka Actor - wait for some time to expect a messag

2019-04-06 15:36发布

Is it possible to make an Actor wait for X amount of seconds to receive any message, and if a message is received, process it as usual, otherwise send a message to some other Actor (pre-determined in the constructor)?

标签: scala akka actor
3条回答
三岁会撩人
2楼-- · 2019-04-06 16:18

It's possible, have a look at Akka Actor "ask" and "Await" with TimeoutException. But keep in mind that blocking inside an actor is a very bad idea since during that time actor can't handle any other messages. Moreover it blocks one Akka processing thread.

A better approach is to send a message (fire and forget) and schedule some timeout event using Akka scheduler. When the response arrives, cancel that event or set some flag so that it won't trigger if the reply actually came on time.

查看更多
Anthone
3楼-- · 2019-04-06 16:24

Yes, if you want to wait for any message, you simply set a receiveTimeout: http://doc.akka.io/docs/akka/current/scala/actors.html#receive-timeout

(The docs is slightly misleading here, you can set the receiveTimeout after every message also)

查看更多
一纸荒年 Trace。
4楼-- · 2019-04-06 16:34

Might be an overkill, but you might check out the Finite State Machine (FSM) trait.

import akka._
import actor._
import util._
import duration._
import Impatient._

object Impatient {
  sealed trait State
  case object WaitingForMessage extends State
  case object MessageReceived extends State
  case object TimeoutExpired extends State

  sealed trait Data
  case object Unitialized extends Data

  // In
  case object Message
}

class Impatient(receiver: ActorRef) extends Actor with FSM[State, Data] {
  startWith(WaitingForMessage, Unitialized)

  when(WaitingForMessage, stateTimeout = 3 seconds) {
    case Event(StateTimeout, data) => goto(TimeoutExpired) using data // data is usually modified here
    case Event(Message, data) => goto(MessageReceived) using data // data is usually modified here
  }

  onTransition {
    case WaitingForMessage -> MessageReceived => stateData match {
      case data => log.info("Received message: " + data)
    }
    case WaitingForMessage -> TimeoutExpired => receiver ! TimeoutExpired
  }

  when(MessageReceived) {
    case _ => stay
  }

  when(TimeoutExpired) {
    case _ => stay
  }

  initialize
}

Here it is in action:

object Main extends App {
  import akka._
  import actor._
  import Impatient._

  val system = ActorSystem("System")

  val receiver = system.actorOf(Props(new Actor with ActorLogging {
    def receive = {
      case TimeoutExpired => log.warning("Timeout expired")
    }
  }))

  val impatient = system.actorOf(Props(new Impatient(receiver)), name = "Impatient")
  impatient ! Message

  val impatient2 = system.actorOf(Props(new Impatient(receiver)), name = "Impatient2")
  Thread.sleep(4000)
  impatient2 ! Message

  system.shutdown()
}
查看更多
登录 后发表回答