Working with custom mailboxes and persistent actor

2019-07-23 16:24发布

问题:

Questions

  1. How do you get the and log messages in the receiveCommand of a persistent actor with AtLeastOnceDelivery inside a custom mailbox enqueue and dequeue?
    1. How do you get the name of the persistent actor inside your custom mailbox enqueue and dequeue?

I have a custom mailbox with the following MessageQueue:

The issue is that messages received by a persistent actor in the receiveCommand are not logged at all in the dequeue or in the enqueue. Only the persisting is logged. Additionally, the owner.get.path.name in my code below never gets the persistent actor's names. It gets names like recoveryPermitter and inmemory-snapshot-store.

class CustomMailbox(val backend: MessageQueue, owner: Option[ActorRef], system: Option[ActorSystem]) extends MessageQueue {

    override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
        // Simulate DROP
        println(s"messageType in  ${owner.get.path.name}"+handle.message.getClass.toString)
        val canEnqueue = Math.random() > 0.95
        println(s"enqueing for ${owner.get.path}: $handle")
        backend.enqueue(receiver, handle)
    }

    override def dequeue(): Envelope = {
        val mb:UnboundedMailbox.MessageQueue = backend.asInstanceOf[UnboundedMailbox.MessageQueue]
        val peek = mb.queue.peek()
        println(s"peeking for ${owner.get.path.name}: $peek")
        if(peek != null) {
            println(s"messageType in degueue ${owner.get.path.name}"+peek.message.getClass.toString)
            val canDequeue = Math.random() > 0.9
                println(s"dequeing for ${owner.get.path}: $peek")
                backend.dequeue()
        }

        else
        {
            null
        }
    }

    override def numberOfMessages: Int = backend.numberOfMessages

    override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = backend.cleanUp(owner, deadLetters)

    override def hasMessages: Boolean = backend.hasMessages
}

For example, I have an persistent actor with the name childclient and the below receiveCommand and updateState methods: My enqueue and dequeue in my custom mailbox above would log something like:

Enqueueing

 enqueing for akka://WebShop/system/inmemory-journal: Envelope(WriteMessages(Vector(AtomicWrite(List(PersistentImpl(Goods(goods,5),1,my-Client-id,,false,Actor[akka://WebShop/user/webShopActor#-659139690],be7c37ad-9c0e-4d77-84ec-d9fd94b5f623)))),Actor[akka://WebShop/user/webShopActor/clientChild#-1139440822],2),Actor[akka://WebShop/user/webShopActor/clientChild#-1139440822])

dequeueing/peeking

peeking for inmemory-journal: 
Envelope(WriteMessages(Vector(AtomicWrite(List(PersistentImpl(Goods(goods,5),1,my-Client-id,,false,Actor[akka://WebShop/user/webShopActor#-659139690],be7c37ad-9c0e-4d77-84ec-d9fd94b5f623)))),Actor[akka://WebShop/user/webShopActor/clientChild#-1139440822],2),Actor[akka://WebShop/user/webShopActor/clientChild#-1139440822])

This shows that the mailbox is processing the persisting the Goods event after receiving the BuyGoods command message in my ChildClient. It only logs the persisting of Goods but it does not log the enqueuing or dequeuing(peeking) of BuyGoods.

 def updateState(evt: Evt): Unit = evt match {
    case ReduceBalance(deliveryId, amount) if state.CashBal - amount < 0 ⇒
      confirmDelivery(deliveryId)
      throw InsufficientBalance(s"Client cannot withdraw $amount from ${state.CashBal}")
    case e: ReduceBalance ⇒ state = ClientState(state.CashBal - e.amount)
      confirmDelivery(e.deliveryId)
      deliver(paymentsActor)(deliveryId => PaymentMessage(deliveryId, e.amount))
    case e: Goods =>
      deliver(stockActor)(deliveryId => ReduceStock(deliveryId, e.name, e.amount))
    case e: AddCredit =>
      state = ClientState(state.CashBal + e.amount)
    case e: PaymentAccepted =>
      log.info("client confirmed payment:" + e)
      confirmDelivery(e.deliveryId)
  }


  override def receiveCommand: Receive = {
    //request for goods from stockActor

    case c: BuyGoods =>
      log.info("Client Recieved: " + c)
      persistAsync(Goods(c.name, c.Amount))(updateState)
    //payments actor confirms payment

  }