Actorpublisher as source in handleMessagesWithSink

2019-09-04 07:48发布

问题:

I'm new to AKKA Streams. (Using Akka v 2.4.4) I am trying to create a Websocket which can push new notifications to subscribed clients. My strategy is to implement a ActorPublisher, which I later can send a message to, and then get it pushed to clients.

To get started I copied an example of a ActorPublisher:

case class Tick()

class TickActor extends ActorPublisher[Int] {
  import scala.concurrent.duration._

  implicit val ec = context.dispatcher

  val tick = context.system.scheduler.schedule(1 second, 1 second, self, `Tick())`

  var cnt = 0
  var buffer = Vector.empty[Int]

  override def receive: Receive = {
    case Tick() => {
      cnt = cnt + 1
      if (buffer.isEmpty && totalDemand > 0) {
        onNext(cnt)
      }
      else {
        buffer :+= cnt
        if (totalDemand > 0) {
          val (use,keep) = buffer.splitAt(totalDemand.toInt)
          buffer = keep
          use foreach onNext
        }
      }
    }
  }

  override def postStop() = tick.cancel()
}

My problem is that I don't know how to use it as source.

I have tried the following:

val source: Source[Strict, ActorRef] = Source.actorPublisher(Props[TickActor]).map(i => TextMessage(i.toString))
  optionalHeaderValueByType[akka.http.scaladsl.model.ws.UpgradeToWebSocket]() {
    case Some(upgrade) =>
      complete(
        upgrade.handleMessagesWithSinkSource(Sink.ignore,source))
    case None =>
      reject(akka.http.scaladsl.server.ExpectedWebSocketRequestRejection)
  }

But when I connect with a client I get the following ClassCastException: java.lang.ClassCastException: java.lang.Integer cannot be cast to scala.runtime.Nothing$

If I change the Source to:

val src: Source[Strict, NotUsed] = Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current.nextInt()))
      .filter(i => i > 0 && i % 2 == 0).map(i => TextMessage(i.toString))

It runs just fine.

I struggling a bit connecting the dots, so hopefully you can lead me in the correct direction.

回答1:

I tried your example and was able to reproduce the problem. I made only one change to fix the problem. That is added the type parameter and it makes sense now because somewhere in the akka stream, there is a code like elem.asInstanceOf[T]. So, when the type is missing from the actorPublisher, then the type is inferred as Nothing

val source = Source.actorPublisher[Int](Props[TickActor]).map(i => TextMessage(i.toString))