How to escalate top-most supervisors in Akka?

2019-07-12 01:53发布

I have the following top-level (“parent-most”) actor:

// Groovy pseudo-code
class Master extends UntypedActor {
    ActorRef child1
    ActorRef child2
    ActorRef child3
    ActorRef backup

    @Override
    void onReceive(Object message) throws Exception {
        if(message instanceof Terminated) {
            Terminated terminated = message as Terminated
            if(terminated.actor != backup) {
                terminated.actor = backup
            } else {
                // TODO: What to do here? How to escalate from here?
            }
        } else {
            child1.tell(new DoSomething(message), getSelf())
            child2.tell(new DoSomethingElse(message), getSelf())
            child3.tell(new DoSomethingElser(message, getSelf())
        }
    }

    @Override
    SupervisorStrategy supervisorStrategy() {
        new OneForOneStrategy(10, Duration.minutes(“1 minute”, new Future<Throwable, SupervisorStrategy.Directive> {
            @Override
            Directive apply(Throwable t) throws Exception {
                if(isRecoverable(t) {   // Don’t worry about how/where this is defined or how it works
                    SupervisorStrategy.stop()
                } else {
                    SupervisorStrategy.escalate()
                }
            }
        })
    }
}

As you can see, it supervises three children, and when those 3 children throw “recoverable” exceptions, they are stopped and are replaced with a backup. So far, so good.

The problem I’m now facing is that if the backup actors throws any throwable whatsoever, I want to consider this Master actor (and really, my app in general) to be in a state where it cannot continue processing any input, and to escalate the exception to the guardian-level.

I’m brand new to Akka and not sure where to put this code, and what it should look like. Again, I just need logic that says:

  • If the backup actor throws any throwable, escalate the exception to the Master’s parent, which should really be an Akka “guaradian” actor/construct

The first part of this is that we need to know when an exception is thrown from the backup; I can handle this part, so let’s pretend our strategy now looks like this:

@Override
SupervisorStrategy supervisorStrategy() {
    new OneForOneStrategy(10, Duration.minutes(“1 minute”, new Future<Throwable, SupervisorStrategy.Directive> {
        @Override
        Directive apply(Throwable t) throws Exception {
            if(wasThrownFromBackup(t)) {
                SupervisorStrategy.escalate()
            } else if(isRecoverable(t) {
                SupervisorStrategy.stop()
            } else {
                SupervisorStrategy.escalate()
            }
        }
    })
}

But as you can see, I’m still struggling to implement the escalation “out of the actor system”. Ideas? Java code example greatly preferred as Scala looks like hieroglyphics to me.

1条回答
你好瞎i
2楼-- · 2019-07-12 02:29

Have a look at the 'Reaper' pattern here http://letitcrash.com/post/30165507578/shutdown-patterns-in-akka-2 Sorry it is in Scala but I think it is easy enough to translate to Java.

Also have a look here, https://groups.google.com/forum/#!topic/akka-user/QG_DL7FszMU

You should set in your configuration

akka.actor.guardian-supervisor-strategy = "akka.actor.StoppingSupervisorStrategy"

This will cause any 'top level' actor which escalates to be stopped by the system. Then you implement another top level actor called 'Reaper' (or whatever you want to call it) which has just one job, watch the main top level actor and take action (e.g. context.system.shutdown()) when the top level actor stops.

I don't know the akka java API so can't provide you with an exact example, but in Scala, from the LetItCrash blog above, it looks like:

import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer

object Reaper {
  // Used by others to register an Actor for watching
  case class WatchMe(ref: ActorRef)
}

abstract class Reaper extends Actor {
  import Reaper._

  // Keep track of what we're watching
  val watched = ArrayBuffer.empty[ActorRef]

  // Derivations need to implement this method.  It's the
  // hook that's called when everything's dead
  def allSoulsReaped(): Unit

  // Watch and check for termination
  final def receive = {
    case WatchMe(ref) =>
      context.watch(ref)
      watched += ref
    case Terminated(ref) =>
      watched -= ref
      if (watched.isEmpty) allSoulsReaped()
  }
}

class ProductionReaper extends Reaper {
  // Shutdown
  def allSoulsReaped(): Unit = context.system.shutdown()
}

In your application startup, you create your master actor, create your reaper, send a WatchMe(masterActor) message to the reaper.

查看更多
登录 后发表回答