How to store a list of Akka actors in Play framewo

2019-05-14 16:04发布

问题:

I've got a Play framework 2 application that can receive data and send it to multiple clients via WebSockets. I use Akka actors to work with WebSockets, just like in this documentation. I also have a WebSocketRouter class that extends UntypedActor and contains routing logic (decides, which clients to pass the data the system receives to). I know that i can use the Router functionality of Akka, but that is not the issue at the moment for me. The issue is that i have to store a list of all active clients. Right now i store it in a static list of the WebSocketRouter class. That was the fastest way to write a proof-of-concept prototype, but it is not thread-safe and does not seem to be "the Akka way". Below is a simplified code sample:

WebSocketController:

//This controller handles the creation of WebSockets.
public class WebSocketController extends Controller {
    public static WebSocket<String> index() {
        return WebSocket.withActor(new F.Function<ActorRef, Props>() {
            public Props apply(ActorRef out) throws Throwable {
                return MessageSender.props(out);
            }
        });
    }
}

MessageSender :

//Hold a reference to the auto-created Actor that handles WebSockets 
//and also registers and unregisters itself in the router.
public class  MessageSender extends UntypedActor {

    public static Props props(ActorRef out) {
        return Props.create(MessageSender.class, out);
    }

    private final ActorRef out;

    public MessageSender(ActorRef out) {
        this.out = out;
    }

    @Override
    public void preStart() {
        WebSocketRouter.addSender(getSelf());
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            out.tell(message, getSelf());
        }
        else {
            unhandled(message);
        }
    }

    public void postStop() {
        WebSocketRouter.removeSender(getSelf());
    }
}

WebSocketRouter:

public class WebSocketRouter extends UntypedActor {
    private static ArrayList<ActorRef> senders;
    static {
        senders = new ArrayList<>();
    }

    public static void addSender(ActorRef actorRef){
        senders.add(actorRef);
    }

    public static void removeSender(ActorRef actorRef){
        senders.remove(actorRef);
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            for (ActorRef sender : senders) {
                sender.tell(message, getSelf());
            }
        }
    }
}

Once again, i know this is a bad solution and i'm seeking a better one. I have thought of creating a thread-safe singleton class that would hold current connections. I also thought about holding the list of current connections in an instance of some Akka actor and modifying the list via Akka messages, but for this way to work I'd have to store an ActorRef to that actor statically, so that it could be accessed from different ActorSystems.

What is the best way to solve my problem that would fit best into Akka ideology?

回答1:

Instead of having a static reference to an Actor (WebSocketRouter in your case), why not come up with some messages to send it? That way, the actor can maintain its own internal state in a consistent way. State change through messages is one of the main benefits of the Actor Model.

Before I get into code, I'm sorry if this isn't 100% accurate, I've only used the Scala version of Akka and am basing this off a quick scan of the Akka Documentation.

So in your case, I would define a few objects in order to express Join/Leave...

public class JoinMessage { } 
public class ExitMessage { } 

Note that ExitMessage is really only needed if you intend to keep your WebSocket open and have the user stop listening to the router. Otherwise, the router can detect when the Actor has been terminated.

And then you would change your MessageSender actor to send these messages whenever they join or leave a chat room....

public class MessageSender extends UntypedActor {

    public static Props props(ActorRef out) {
        return Props.create(MessageSender.class, out);
    }

    private final ActorRef out;
    private final ActorRef router;

    public MessageSender(ActorRef out) {
        this.out = out;
        this.router= getContext().actorSelection("/Path/To/WebSocketRouter");
    }

    @Override
    public void preStart() {
        router.tell(new JoinMessage(), getSelf());
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof String) {
            out.tell(message, getSelf());
        } else {
            unhandled(message);
        }
    }    
}

And then your router can change to manage state internally rather than exposing internal methods on the Actor (which as you know is not good)....

public class WebSocketRouter extends UntypedActor {
    private final Set<ActorRef> senders = new HashSet<>();

    private void addSender(ActorRef actorRef){
        senders.add(actorRef);
    }

    private void removeSender(ActorRef actorRef){
        senders.remove(actorRef);
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof JoinMessage) {
            addSender(sender);
            getContext().watch(sender); // Watch sender so we can detect when they die.
        } else if (message instanceof Terminated) {
            // One of our watched senders has died.
            removeSender(sender);
        } else if (message instanceof String) {
            for (ActorRef sender : senders) {
                sender.tell(message, getSelf());
            }
        }
    }
}

Again, this code is to give you an idea of how to accomplish this task by taking advantage of the Actor Model. Sorry if the Java isn't 100% accurate, but hopefully you can follow my intent.