Phoenix Channels - Multiple channels per socket

2019-01-28 08:00发布

问题:

I'm writing an application using Elixir Channels to handle realtime events. I understand that there will be 1 socket open per client and can multiplex multiple channels over it. So my app is a chat application where users are part of multiple group chats. I have 1 Phoenix Channel called MessageChannel where the join method will handle dyanamic topics.

def join("groups:" <> group_id, payload, socket) do
....

Let's say John joins groups/topics A and B while Bob only join group/topic B. When john sends a message to group/topic A, broadcast!/3 will also send that message to Bob too correct? Because handle_in doesn't have a context of which topic/group the message was sent to.

How would I handle it so that Bob doesn't receive the events that was sent to group A. Am I designing this right?

回答1:

Because handle_in doesn't have a context of which topic/group the message was sent to.

When Phoenix.Channel.broadcast/3 is called, apparently it does have the topic associated with the message (which is not obvious from the signature). You can see the code starting on this line of channel.ex:

def broadcast(socket, event, message) do
    %{pubsub_server: pubsub_server, topic: topic} = assert_joined!(socket)
    Server.broadcast pubsub_server, topic, event, message
end

So when the call to broadcast/3 is made using the socket, it pattern matches out the current topic, and then makes a call to the underlying Server.broadcast/4.

(If you're curious like I was, this in turn makes a call to the underlying PubSub.broadcast/3 which does some distribution magic to route the call to your configured pubsub implementation server, most likely using pg2 but I digress...)

So, I found this behavior not obvious from reading the Phoenix.Channel docs, but they do state it explicitly in the phoenixframework channels page in Incoming Events:

broadcast!/3 will notify all joined clients on this socket's topic and invoke their handle_out/3 callbacks.

So it's only being broadcasted "on this socket's topic". They define topic on that same page as:

topic - The string topic or topic:subtopic pair namespace, for example “messages”, “messages:123”

So in your example, the "topics" are actually the topic:subtopic pair namespace strings: "groups:A" and "groups:B". John would have to subscribe to both of these topics separately on the client, so you would actually have references to two different channels, even though they're using the same socket. So assuming you're using the javascript client, the channel creation looks something like this:

let channelA = this.socket.channel("groups:A", {});
let channelB = this.socket.channel("groups:B", {});

Then when you go to send a message on the channel from a client, you are using only the channel that has a topic that gets pattern matched out on the server as we saw above.

channelA.push(msgName, msgBody);


回答2:

Actually, the socket routing is done based on how to define your topics in your projects Socket module with the channel API. For my Slack clone, I use three channels. I have a system level channel to handle presence update, a user channel, and a room channel.

Any given user is subscribed to 0 or 1 channels. However, users may be subscribed to a number of channels.

For messages going out to a specific room, I broadcast them over the room channel.

When I detect unread messages, notifications, or badges for a particular room, I use the user channel. Each user channel stores the list of rooms the user has subscribed too (they are listed on the client's side bar).

The trick to all this is using a couple channel APIs, mainly intercept, handle_out, My.Endpoint.subscribe, and handle_info(%Broadcast{},socket).

  • I use intercept to catch broadcasted messages that I want to either ignore, or manipulate before sending them out.
  • In the user channel, I subscribe to events broadcast from the room channel
  • When you subscribe, you get a handle_info call with the %Broadcast{} struct that includes the topic, event, and payload of the broadcasted message.

Here are couple pieces of my code:

defmodule UcxChat.UserSocket do
  use Phoenix.Socket
  alias UcxChat.{User, Repo, MessageService, SideNavService}
  require UcxChat.ChatConstants, as: CC

  ## Channels
  channel CC.chan_room <> "*", UcxChat.RoomChannel    # "ucxchat:"
  channel CC.chan_user <> "*", UcxChat.UserChannel  # "user:"
  channel CC.chan_system <> "*", UcxChat.SystemChannel  # "system:"
  # ...
end

# user_channel.ex
 # ...
 intercept ["room:join", "room:leave", "room:mention", "user:state", "direct:new"]
 #...
 def handle_out("room:join", msg, socket) do
    %{room: room} = msg
    UserSocket.push_message_box(socket, socket.assigns.channel_id, socket.assigns.user_id)
    update_rooms_list(socket)
    clear_unreads(room, socket)
    {:noreply, subscribe([room], socket)}
  end
  def handle_out("room:leave" = ev, msg, socket) do
    %{room: room} = msg
    debug ev, msg, "assigns: #{inspect socket.assigns}"
    socket.endpoint.unsubscribe(CC.chan_room <> room)
    update_rooms_list(socket)
    {:noreply, assign(socket, :subscribed, List.delete(socket.assigns[:subscribed], room))}
  end

  # ...
  defp subscribe(channels, socket) do
    # debug inspect(channels), ""
    Enum.reduce channels, socket, fn channel, acc ->
      subscribed = acc.assigns[:subscribed]
      if channel in subscribed do
        acc
      else
        socket.endpoint.subscribe(CC.chan_room <> channel)
        assign(acc, :subscribed, [channel | subscribed])
      end
    end
  end
  # ...
end

I also use the user_channel for all events related to a specific user like client state, error messages, etc.



回答3:

Disclaimer: I have not looked at the internal workings of a channel, this information is completely from my first experience of using channels in an application.

When someone joins a different group (based on the pattern matching in your join/3), a connection over a separate channel (socket) is made. Thus, broadcasting to A will not send messages to members of B, only A.

It seems to me the Channel module is similar to a GenServer and the join is somewhat like start_link, where a new server (process) is spun up (however, only if it does not already exist).

You can really ignore the inner workings of the module and just understand that if you join a channel with a different name than already existing ones, you are joining a unique channel. You can also just trust that if you broadcast to a channel, only members of that channel will get the message.

For instance, in my application, I have a user channel that I want only a single user to be connected to. The join looks like def join("agent:" <> _agent, payload, socket) where agent is just an email address. When I broadcast a message to this channel, only the single agent receives the message. I also have an office channel that all agents join and I broadcast to it when I want all agents to receive the message.

Hope this helps.