I'm new to AKKA Streams. (Using Akka v 2.4.4) I am trying to create a Websocket which can push new notifications to subscribed clients. My strategy is to implement a ActorPublisher, which I later can send a message to, and then get it pushed to clients.
To get started I copied an example of a ActorPublisher:
case class Tick()
class TickActor extends ActorPublisher[Int] {
import scala.concurrent.duration._
implicit val ec = context.dispatcher
val tick = context.system.scheduler.schedule(1 second, 1 second, self, `Tick())`
var cnt = 0
var buffer = Vector.empty[Int]
override def receive: Receive = {
case Tick() => {
cnt = cnt + 1
if (buffer.isEmpty && totalDemand > 0) {
onNext(cnt)
}
else {
buffer :+= cnt
if (totalDemand > 0) {
val (use,keep) = buffer.splitAt(totalDemand.toInt)
buffer = keep
use foreach onNext
}
}
}
}
override def postStop() = tick.cancel()
}
My problem is that I don't know how to use it as source.
I have tried the following:
val source: Source[Strict, ActorRef] = Source.actorPublisher(Props[TickActor]).map(i => TextMessage(i.toString))
optionalHeaderValueByType[akka.http.scaladsl.model.ws.UpgradeToWebSocket]() {
case Some(upgrade) =>
complete(
upgrade.handleMessagesWithSinkSource(Sink.ignore,source))
case None =>
reject(akka.http.scaladsl.server.ExpectedWebSocketRequestRejection)
}
But when I connect with a client I get the following ClassCastException: java.lang.ClassCastException: java.lang.Integer cannot be cast to scala.runtime.Nothing$
If I change the Source to:
val src: Source[Strict, NotUsed] = Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current.nextInt()))
.filter(i => i > 0 && i % 2 == 0).map(i => TextMessage(i.toString))
It runs just fine.
I struggling a bit connecting the dots, so hopefully you can lead me in the correct direction.