Akka Actor how to only process the latest message

2020-02-26 02:19发布

问题:

Say I am sending messages to an Actor, when it is processing one message several more messages may arise. Now when it is ready to process the next message I want it to only process the latest message as the previous ones have become obsolete. How can I best achieve this?

Using the scala Actors library I was able to achieve this by first checking from my sender as follows:

if (myActor.getState != Runnable)
  myActor ! message     

But I don't think I can do such a test in the Akka system

回答1:

There is no need to implement your own mailbox. At all.

Removed a lot of text and let this piece of code speak for itself:

// Either implement "equals" so that every job is unique (by default) or do another comparison in the match.
class Work 
case class DoWork(work: Work)

class WorkerActor extends Actor {
  // Left as an exercise for the reader, it clearly should do some work.
  def perform(work: Work): Unit = ()

  def lookingForWork: Receive = {
    case w: Work =>
      self forward DoWork(w)
      context become prepareToDoWork(w)
  }

  def prepareToDoWork(work: Work): Receive = {
    case DoWork(`work`) =>
      // No new work, so perform this one
      perform(work)
      // Now we're ready to look for new work
      context become lookingForWork
    case DoWork(_) =>
      // Discard work that we don't need to do anymore
    case w2: Work =>
      // Prepare to do this newer work instead
      context become prepareToDoWork(w2) 
  }

  //We start out as looking for work
  def receive = lookingForWork
}

This means that work will only be performed if there is no newer work in the mailbox.



回答2:

You could implement your own mailbox, this approach will not affect your actor implementation. See this answer for solution with changes in actor implementation instead of custom mailbox implementation.

Implementation of mailbox that drops old messages on enqueue:

package akka.actor.test 

import akka.actor.{ ActorRef, ActorSystem }
import com.typesafe.config.Config
import akka.dispatch.{Envelope, MessageQueue}

class SingleMessageMailbox extends akka.dispatch.MailboxType {

  // This constructor signature must exist, it will be called by Akka
  def this(settings: ActorSystem.Settings, config: Config) = this()

  // The create method is called to create the MessageQueue
  final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
    new MessageQueue {
      val message = new java.util.concurrent.atomic.AtomicReference[Envelope]

      final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit =
        Option(message.get) foreach {deadLetters.enqueue(owner, _)}

      def enqueue(receiver: ActorRef, handle: Envelope): Unit =
        for {e <- Option(message.getAndSet(handle))} 
          receiver.asInstanceOf[InternalActorRef].
            provider.deadLetters.
            tell(DeadLetter(e.message, e.sender, receiver), e.sender)

      def dequeue(): Envelope = message.getAndSet(null)

      def numberOfMessages: Int = Option(message.get).size

      def hasMessages: Boolean = message.get != null
    }
}

Note that I have to add this class into package akka.actor to send old message to dead letters using InternalActorRef like implemented for BoundedQueueBasedMessageQueue.

If you want to just skip old messages you could implement enqueue like this:

def enqueue(receiver: ActorRef, handle: Envelope): Unit = message.set(handle)

Usage:

object Test extends App {
  import akka.actor._
  import com.typesafe.config.ConfigFactory

  // you should use your config file instead of ConfigFactory.parseString
  val actorSystem: ActorSystem =
    ActorSystem("default", ConfigFactory.parseString(
"""
  akka.daemonic=on
  myMailbox.mailbox-type = "akka.actor.test.SingleMessageMailbox"
"""))

  class EchoActor extends Actor {
    def receive = {
      case m => println(m); Thread.sleep(500)
    }
  }

  val actor = actorSystem.actorOf(Props[EchoActor].withMailbox("myMailbox"))

  for {i <- 1 to 10} {
    actor ! i
    Thread.sleep(100)
  }

  Thread.sleep(1000)

}

Test:

$ sbt run
1
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
5
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
[INFO] <dead letters log>
10

See also akka/Mailboxes.



标签: scala akka