Node.js, Socket.io, Redis pub/sub high volume, low

2019-01-29 14:57发布

When conjoining socket.io/node.js and redis pub/sub in an attempt to create a real-time web broadcast system driven by server events that can handle multiple transports, there seems to be three approaches:

  1. 'createClient' a redis connection and subscribe to channel(s). On socket.io client connection, join the client into a socket.io room. In the redis.on("message", ...) event, call io.sockets.in(room).emit("event", data) to distribute to all clients in the relevant room. Like How to reuse redis connection in socket.io?

  2. 'createClient' a redis connection. On socket.io client connection, join the client into a socket.io room and subscribe to relevant redis channel(s). Include redis.on("message", ...) inside the client connection closure and on receipt of message call client.emit("event", data) to raise the event on the specific client. Like the answer in Examples in using RedisStore in socket.io

  3. Use the RedisStore baked into socket.io and 'broadcast' from the single "dispatch" channel in Redis following the socketio-spec protocol.

Number 1 allows handling the Redis sub and associated event once for all clients. Number 2 offers a more direct hook into Redis pub/sub. Number 3 is simpler, but offers little control over the messaging events.

However, in my tests, all exhibit unexpectedly low performance with more than 1 connected client. The server events in question are 1,000 messages published to a redis channel as quickly as possible, to be distributed as quickly as possible. Performance is measured by timings at the connected clients (socket.io-client based that log timestamps into a Redis list for analysis).

What I surmise is that in option 1, server receives the message, then sequentially writes it to all connected clients. In option 2, server receives each message multiple times (once per client subscription) and writes it to the relevant client. In either case, the server doesn't get to the second message event until it's communicated to all connected clients. A situation clearly exacerbated with rising concurrency.

This seems at odds with the perceived wisdom of the stacks capabilities. I want to believe, but I'm struggling.

Is this scenario (low latency distribution of high volume of messages) just not an option with these tools (yet?), or am I missing a trick?

1条回答
Fickle 薄情
2楼-- · 2019-01-29 15:22

I thought this was a reasonable question and had researched it briefly a while back. I spent a little time searching for examples that you may be able to pick up some helpful tips from.

Examples

I like to begin with straight forward examples:

The light sample is a single page (note you'll want to replace redis-node-client with something like node_redis by Matt Ranney:

/*
 * Mclarens Bar: Redis based Instant Messaging
 * Nikhil Marathe - 22/04/2010

 * A simple example of an IM client implemented using
 * Redis PUB/SUB commands so that all the communication
 * is offloaded to Redis, and the node.js code only
 * handles command interpretation,presentation and subscribing.
 * 
 * Requires redis-node-client and a recent version of Redis
 *    http://code.google.com/p/redis
 *    http://github.com/fictorial/redis-node-client
 *
 * Start the server then telnet to port 8000
 * Register with NICK <nick>, use WHO to see others
 * Use TALKTO <nick> to initiate a chat. Send a message
 * using MSG <nick> <msg>. Note its important to do a
 * TALKTO so that both sides are listening. Use STOP <nick>
 * to stop talking to someone, and QUIT to exit.
 *
 * This code is in the public domain.
 */
var redis = require('./redis-node-client/lib/redis-client');

var sys = require('sys');
var net = require('net');

var server = net.createServer(function(stream) {
    var sub; // redis connection
    var pub;
    var registered = false;
    var nick = "";

    function channel(a,b) {
    return [a,b].sort().join(':');
    }

    function shareTable(other) {
    sys.debug(nick + ": Subscribing to "+channel(nick,other));
    sub.subscribeTo(channel(nick,other), function(channel, message) {
        var str = message.toString();
        var sender = str.slice(0, str.indexOf(':'));
        if( sender != nick )
        stream.write("[" + sender + "] " + str.substr(str.indexOf(':')+1) + "\n");
    });
    }

    function leaveTable(other) {
    sub.unsubscribeFrom(channel(nick,other), function(err) {
        stream.write("Stopped talking to " + other+ "\n");
    });
    }

    stream.addListener("connect", function() {
    sub = redis.createClient();
    pub = redis.createClient();
    });

    stream.addListener("data", function(data) {
    if( !registered ) {
        var msg = data.toString().match(/^NICK (\w*)/);
        if(msg) {
        stream.write("SERVER: Hi " + msg[1] + "\n");
        pub.sadd('mclarens:inside', msg[1], function(err) {
            if(err) {
            stream.end();
            }
            registered = true;
            nick = msg[1];
// server messages
            sub.subscribeTo( nick + ":info", function(nick, message) {
            var m = message.toString().split(' ');
            var cmd = m[0];
            var who = m[1];
            if( cmd == "start" ) {
                stream.write( who + " is now talking to you\n");
                shareTable(who);
            }
            else if( cmd == "stop" ) {
                stream.write( who + " stopped talking to you\n");
                leaveTable(who);
            }
            });
        });
        }
        else {
        stream.write("Please register with NICK <nickname>\n");
        }
        return;
    }

    var fragments = data.toString().replace('\r\n', '').split(' ');
    switch(fragments[0]) {
    case 'TALKTO':
        pub.publish(fragments[1]+":info", "start " + nick, function(a,b) {
        });
        shareTable(fragments[1]);
        break;
    case 'MSG':
        pub.publish(channel(nick, fragments[1]),
            nick + ':' +fragments.slice(2).join(' '),
              function(err, reply) {
              if(err) {
                  stream.write("ERROR!");
              }
              });
        break;
    case 'WHO':
        pub.smembers('mclarens:inside', function(err, users) {
        stream.write("Online:\n" + users.join('\n') + "\n");
        });
        break;
    case 'STOP':
        leaveTable(fragments[1]);
        pub.publish(fragments[1]+":info", "stop " + nick, function() {});
        break;
    case 'QUIT':
        stream.end();
        break;
    }
    });

    stream.addListener("end", function() {
    pub.publish(nick, nick + " is offline");
    pub.srem('mclarens:inside', nick, function(err) {
        if(err) {
        sys.debug("Could not remove client");
        }
    });
    });
});

server.listen(8000, "localhost");

Documents

There's a ton of documentation out there, and the apis are rapidly changing on this type of stack so you'll have to weigh the time relevance of each doc.

Related Questions

Just a few related questions, this is a hot topic on stack:

Notable tips (ymmv)

Turn off or optimize socket pooling, use efficient bindings, monitor latency, and make sure you're not duplicating work (ie no need to publish to all listeners twice).

查看更多
登录 后发表回答