I am working with a data publisher from a java library that I do not control. The publisher library uses a typical callback setup; somewhere in the library code (the library is java but I will describe in scala for terseness):
type DataType = ???
trait DataConsumer {
def onData(data : DataType) : Unit
}
The user of the library is required to write a class that implements the onData
method and pass that into a DataProducer
, the library code looks something like:
class DataProducer(consumer : DataConsumer) {...}
The DataProducer
has its own internal thread I cannot control, and accompanying data buffer, that is calling onData
whenever there is another DataType
object to consume.
So, my question is: how do I write a layer that will convert/translate the original library pattern into an akka stream Source object?
Thank you in advance.
There are various ways this can be solved. One is to use an ActorPublisher: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html#Integrating_with_Actors where you can just change the callback so that it sends a message to the actor. Depending how the callback works, you might be able to use mapAsync, too (converting a callback to a Future). That will only work if one request produces exactly one callback call.
Callback --> Source
Elaborating on Endre Varga's answer, below is the code that will create the DataConsumer
callback function which will send messages into an akka stream Source
.
Caution: There is a lot more to creating a functional ActorPublish than I am indicating below. In particular, buffering needs to be done to handle the case where the DataProducer
is calling onData
faster than the Sink
is signalling demand (see this example). The below code just sets up the "wiring".
import akka.actor.ActorRef
import akka.actor.Actor.noSender
import akka.stream.Actor.ActorPublisher
/**Incomplete ActorPublisher, see the example link.*/
class SourceActor extends ActorPublisher[DataType] {
def receive : Receive = {
case message : DataType => deliverBuf() //defined in example link
}
}
class ActorConsumer(sourceActor : ActorRef) extends DataConsumer {
override def onData(data : DataType) = sourceActor.tell(data, noSender)
}
//setup the actor that will feed the stream Source
val sourceActorRef = actorFactory actorOf Props[SourceActor]
//setup the Consumer object that will feed the Actor
val actorConsumer = ActorConsumer(sourceActorRef)
//setup the akka stream Source
val source = Source(ActorPublisher[DataType](sourceActorRef))
//setup the incoming data feed from 3rd party library
val dataProducer = DataProducer(actorConsumer)
Callback --> Whole Stream
The original question ask specifically for a callback to Source, but dealing with callbacks is easier to handle if the entire stream is already available (not just the Source). That is because the stream can be materialized into an ActorRef
using the Source#actorRef function. As an example:
val overflowStrategy = akka.stream.OverflowStrategy.dropHead
val bufferSize = 100
val streamRef =
Source
.actorRef[DataType](bufferSize, overflowStrategy)
.via(someFlow)
.to(someSink)
.run()
val streamConsumer = new DataConsumer {
override def onData(data : DataType) : Unit = streamRef ! data
}
val dataProducer = DataProducer(streamConsumer)