How to send a message in a reactive stream from a

2019-03-27 20:10发布

问题:

My application has an Akka-Websocket interface. The web socket consists of an actor-subscriber and an actor publisher. The subscriber handles commands, by sending them to the corresponding actor. The publisher listens on the event stream and publishes update informations back to the stream (and so finally to the client). This works well.

My question: How is it possible for the Subscriber to send an event back to the stream? For example to confirm the execution of a received command.

public class WebSocketApp extends HttpApp {

  private static final Gson gson = new Gson();

  @Override
  public Route createRoute() {
    return get(
        path("metrics").route(handleWebSocketMessages(metrics()))
        );
  }

  private Flow<Message, Message, ?> metrics() {
    Sink<Message, ActorRef> metricsSink = Sink.actorSubscriber(WebSocketCommandSubscriber.props());
    Source<Message, ActorRef> metricsSource = 
        Source.actorPublisher(WebSocketDataPublisherActor.props())
        .map((measurementData) -> TextMessage.create(gson.toJson(measurementData)));
    return Flow.fromSinkAndSource(metricsSink, metricsSource);
  }
}

A nice solution could be, that the subscribing actor (the WebSocketCommandSubscriber actor in the code above) could send a message back to the stream like sender().tell(...)...

回答1:

No, it is not possible, not directly, at least. Streams are always unidirectional - all messages flow in one direction, while demand for them flows in the opposite direction. You need to pass your confirmation messages from the sink to the source for the latter to emit it back to the client, for example, by registering the source actor in the sink actor. It could look like this:

Flow.fromSinkAndSourceMat(metricsSink, metricsSource, (sinkActor, sourceActor) -> {
    sinkActor.tell(new RegisterSource(sourceActor), null);
})

Then, after your sink actor receives RegisterSource message, it can send the confirmation messages to the provided ActorRef, which will then forward them to the output stream.