How to use stackable trait pattern with Akka actor

2019-02-01 09:13发布

问题:

I'm trying to implement a Pub/Sub trait to mix into other akka actors using a stackable trait.

Here is what I came up with:

trait PubSubActor extends Actor {
  abstract override def receive = 
    super.receive orElse {
      case Subscribe(topic) => /* ... */
      case Publish(topic, msg) => /* ... */
    }
}

class MyActor extends Actor with PubSubActor {
  override def receive = {
    case SomeMessage(a, b, c) => /* ... */
  }
}

At which point, the compiler throws back an error: error: overriding method receive in trait MyActor... method receive needs `abstract override' modifiers.

Can you explain to me why this isn't working? How can I fix it so it works?

Thanks!

UPDATE

The following works:

trait PubSubActor extends Actor {
  abstract override def receive = 
    super.receive orElse {
      case Subscribe(topic) => /* ... */
      case Publish(topic, msg) => /* ... */
    }
}

class MyActor extends Actor {
  override def receive = {
    case SomeMessage(a, b, c) => /* ... */
  }
}

class MyActorImpl extends MyActor with PubSubActor

But why? Why can I get the behavior I want this way but not the other? Any reasons? I can't seem to figure out the underlying difference between these two samples that makes the difference.

回答1:

There's a simple and concise solution:

Define a Receiving trait that chains multiple receive functions using orElse :

trait Receiving { 
  var receivers: Receive = Actor.emptyBehavior 
  def receiver(next: Actor.Receive) { receivers = receivers orElse next }
  def receive = receivers // Actor.receive definition
}

Using this in actors is easy:

trait PubSubActor extends Receiving {
  receiver {
    case Publish => /* I'm the first to handle messages */
  }
}

class MyActor extends PubSubActor with Receiving {
  receiver {
    case SomeMessage => /* PubSubActor didn't handle, I receive the message */ 
  }
}

First PubSubActor's receive will be called. If message wasn't handled it will be passed to MyActor's receive.



回答2:

You can certainly achieve what you are looking for using Akka's composable actor feature. This is described a bit in Extending Actors using PartialFunction chaining.

First, the infrastructure code (straight from the docs):

class PartialFunctionBuilder[A, B] {
  import scala.collection.immutable.Vector

  // Abbreviate to make code fit
  type PF = PartialFunction[A, B]

  private var pfsOption: Option[Vector[PF]] = Some(Vector.empty)

  private def mapPfs[C](f: Vector[PF] => (Option[Vector[PF]], C)): C = {
    pfsOption.fold(throw new IllegalStateException("Already built"))(f) match {
      case (newPfsOption, result) => {
        pfsOption = newPfsOption
        result
      }
    }
  }

  def +=(pf: PF): Unit =
    mapPfs { case pfs => (Some(pfs :+ pf), ()) }

  def result(): PF =
    mapPfs { case pfs => (None, pfs.foldLeft[PF](Map.empty) { _ orElse _ }) }
}

trait ComposableActor extends Actor {
  protected lazy val receiveBuilder = new PartialFunctionBuilder[Any, Unit]
  final def receive = receiveBuilder.result()
}

Then the behaviors you want to be able to compose into actors:

trait PubSubActor { self:ComposableActor =>
  receiveBuilder += {
    case Subscribe(topic) => /* ... */
    case Publish(topic, msg) => /* ... */
  }
}

trait MyActor  { self:ComposableActor =>
  receiveBuilder += {
    case SomeMessage(a, b, c) => /* ... */
  }
}

And lastly, an actual actor that uses these composable behaviors:

class MyActorImpl extends ComposableActor with PubSubActor with MyActor


回答3:

Try it the other way around:

object Subscription {
  case object Subscribe
  case object Unsubscribe
}

trait Subscription {
  this: Actor =>

  import Subscription._

  var subscribers = Set.empty[ActorRef]

  def receive: Receive = {
    case Subscribe => subscribers += sender
    case Unsubscribe => subscribers -= sender
  }
}

class MyActor extends Actor with Subscription {
  def receive = super.receive orElse {
     case msg => // handle msg
  }
}

Note that this still makes use of the stackable trait pattern, which is hidden by the fact that I've omitted the core. So something like this would still work (at least I think I will, ATM I have no time to check if it compiles).

class Core extends Actor {
  def receive = Actor.emptyBehavior
}

class MyActor extends Core with Subscription

BTW, you can read more about the pattern (not related to Actors) here.



回答4:

For the first, excuse me for my English I think the point is that abstract override modifier requires presence of concrete implementation of receive method but in the first construction

class MyActor extends Actor with PubSubActor {
  override def receive = {
    case SomeMessage(a, b, c) => /* ... */
 }}

it is not done

The reason is that scala compiler makes linearization for inheritance so in receive's method chain we have the following sequence:

 1)  override def receive = {
        case SomeMessage(a, b, c) => /* ... */
      }
 2) abstract override def receive = super.receive orElse {
        case Subscribe(topic) => /* ... */
        case Publish(topic, msg) => /* ... */
      }

 3) then Actor.receive - it hasn't an implementation

So PubSubActor.receive cannot be called because it use super.receive, in its turn super.receive relies on Actor.receive, but Actor.receive hasn't an implementation.

In the second construction

class MyActor extends Actor {
  override def receive = {
    case SomeMessage(a, b, c) => /* ... */
  }}

class MyActorImpl extends MyActor with PubSubActor

we have receive's method chain

1)

abstract override def receive = super.receive orElse {
   case Subscribe(topic) => /* ... */
   case Publish(topic, msg) => /* ... */
}

2)

override def receive = {
   case SomeMessage(a, b, c) => /* ... */
}

3) then Actor.receive - it hasn't an implementation

So PubSubActor.receive can successfully call super.receive

Additional info:

Stackable traits

Scala language specification, see 5.1.2