Akka 2: How to pause processing of messages?

2020-06-12 04:04发布

问题:

On my journey to grasp the Actor model using Akka many questions pop up. Here is another one. Say we have an Actor which has to stop processing messages for a given time because of some business logic or available resources. Cases where this might occur could be:

  • Throttling. There might be an Actor which sends e-mails but is restricted to sent only one e-mail per second.

  • The Actor might employ some system which can only process x-messages simultaneity. This could be an AsyncHttpClient which has a fixed thread pool and I do not want to overload it.

  • Some external resource is unavailable which is required to process messages (read: external REST-API)

It is highly possible that my brain is not yet actor-ready and I just need a hint how to tackle such problems in an actor-y way.

回答1:

General Answer

Actors process messages always as fast as they can, where processing means taking them out of their mailbox and passing them into the actor’s behavior. The behavior is thus the place where your answer lies: change it to something more appropriate during periods of time which require non-nominal actions.

Throttling

If one component is processing messages at a lower rate than they are produced, it will have to drop messages eventually. Either use a bounded mailbox or put a manager in front which keeps track of the worker’s progress and goes into “reply with negative result” mode during periods of stress.

When an actor wants to throttle its own output rate, use the context.system.scheduler.

This should answer your first two points.

Recovery

During periods where a required resource is unavailable, you have two options depending on the requirements: either queue messages internally, or go into “out-of-order” reply mode. You can also mix, i.e. queue with certain time and space limits and fail when the limits are hit.

Further Considerations

Always keep the units of work processed by actors so small that the actor can react within its latency requirements. The latter could be very relaxed (running for hours uninterruptibly) or very strict (must process messages at kHz rate).



回答2:

case object NextEmail
class EmailActor extends Actor {

self ! NextEmail

  def receive = {
    case NextEmail =>
      sendEmailIfAnyToSend
      context.system.scheduler.scheduleOnce(3 seconds, self, NextEmail)                 
  }
}


标签: actor akka