Node.js的,Socket.io,Redis的发布/订阅高容量,低延迟的困难(Node.js,

2019-06-24 08:42发布

当试图建立由服务器事件驱动的实时网络广播系统,能够处理多种传输conjoining socket.io/node.js和Redis的发布/订阅,似乎有三种方法:

  1. “createClient”一个redis的连接和订阅通道(一个或多个)。 在socket.io客户端连接,客户端加入到socket.io房间。 在redis.on(“信息”,......)事件,调用io.sockets.in(室).emit(“事件”,数据)分发到所有客户的相关空间。 像如何重用的Redis在socket.io连接?

  2. 'createClient' 一个Redis的连接。 在socket.io客户端连接,客户端加入到socket.io房间,订阅相关Redis的通道(S)。 包括客户端关闭连接的内部和收到的消息调用client.emit(“事件”,数据)的redis.on(“消息”,...),以提高特定的客户端上的事件。 就像在回答在socket.io使用RedisStore例子

  3. 使用RedisStore从单一的“调度”中的Redis继socketio规格协议信道烘焙成socket.io和“广播”。

1号允许所有客户端处理的Redis子和相关的事件一次。 2号提供了一个更直接的勾成的Redis的pub / sub。 3号是简单的,但提供了消息的事件几乎没有控制。

然而,在我的测试中,都表现出超过1级连接的客户端出乎意料的低性能。 有问题的服务器事件,以最快的速度发布到Redis的通道尽可能1000点的消息,将尽快分发成为可能。 性能是通过在连接的客户端的定时(socket.io客户端基于该日志的时间戳到Redis的列表以供分析)进行测定。

我推测的是,在方案1,服务器接收邮件,然后依次写入到所有连接的客户端。 在选项2中,服务器接收每个消息多次(每个客户端订阅一次),并将其写入到相关的客户端。 在这两种情况下,直到它传达给所有连接的客户端服务器不会得到第二个消息事件。 一种情形显然与并发性上升加剧。

这似乎在用的堆叠能力感知智慧的赔率。 我愿意相信,但是我挣扎。

这是情况,使用这些工具(大容量邮件的低延迟分布)只是不是一种选择(没有?),还是我缺少一个把戏?

Answer 1:

我认为这是一个合理的问题,并简要地研究它,而回。 我花了一点时间来寻找例子,你可能能够从拿起一些有用的技巧。

例子

我想首先直截了当的例子:

  • 光IM的示例代码
  • Node.js的+的Redis的Pub / Sub + socket.io演示

光样品是在一个页面(注意,你要的东西,如更换Redis的节点客户端node_redis由马特·兰尼:

/*
 * Mclarens Bar: Redis based Instant Messaging
 * Nikhil Marathe - 22/04/2010

 * A simple example of an IM client implemented using
 * Redis PUB/SUB commands so that all the communication
 * is offloaded to Redis, and the node.js code only
 * handles command interpretation,presentation and subscribing.
 * 
 * Requires redis-node-client and a recent version of Redis
 *    http://code.google.com/p/redis
 *    http://github.com/fictorial/redis-node-client
 *
 * Start the server then telnet to port 8000
 * Register with NICK <nick>, use WHO to see others
 * Use TALKTO <nick> to initiate a chat. Send a message
 * using MSG <nick> <msg>. Note its important to do a
 * TALKTO so that both sides are listening. Use STOP <nick>
 * to stop talking to someone, and QUIT to exit.
 *
 * This code is in the public domain.
 */
var redis = require('./redis-node-client/lib/redis-client');

var sys = require('sys');
var net = require('net');

var server = net.createServer(function(stream) {
    var sub; // redis connection
    var pub;
    var registered = false;
    var nick = "";

    function channel(a,b) {
    return [a,b].sort().join(':');
    }

    function shareTable(other) {
    sys.debug(nick + ": Subscribing to "+channel(nick,other));
    sub.subscribeTo(channel(nick,other), function(channel, message) {
        var str = message.toString();
        var sender = str.slice(0, str.indexOf(':'));
        if( sender != nick )
        stream.write("[" + sender + "] " + str.substr(str.indexOf(':')+1) + "\n");
    });
    }

    function leaveTable(other) {
    sub.unsubscribeFrom(channel(nick,other), function(err) {
        stream.write("Stopped talking to " + other+ "\n");
    });
    }

    stream.addListener("connect", function() {
    sub = redis.createClient();
    pub = redis.createClient();
    });

    stream.addListener("data", function(data) {
    if( !registered ) {
        var msg = data.toString().match(/^NICK (\w*)/);
        if(msg) {
        stream.write("SERVER: Hi " + msg[1] + "\n");
        pub.sadd('mclarens:inside', msg[1], function(err) {
            if(err) {
            stream.end();
            }
            registered = true;
            nick = msg[1];
// server messages
            sub.subscribeTo( nick + ":info", function(nick, message) {
            var m = message.toString().split(' ');
            var cmd = m[0];
            var who = m[1];
            if( cmd == "start" ) {
                stream.write( who + " is now talking to you\n");
                shareTable(who);
            }
            else if( cmd == "stop" ) {
                stream.write( who + " stopped talking to you\n");
                leaveTable(who);
            }
            });
        });
        }
        else {
        stream.write("Please register with NICK <nickname>\n");
        }
        return;
    }

    var fragments = data.toString().replace('\r\n', '').split(' ');
    switch(fragments[0]) {
    case 'TALKTO':
        pub.publish(fragments[1]+":info", "start " + nick, function(a,b) {
        });
        shareTable(fragments[1]);
        break;
    case 'MSG':
        pub.publish(channel(nick, fragments[1]),
            nick + ':' +fragments.slice(2).join(' '),
              function(err, reply) {
              if(err) {
                  stream.write("ERROR!");
              }
              });
        break;
    case 'WHO':
        pub.smembers('mclarens:inside', function(err, users) {
        stream.write("Online:\n" + users.join('\n') + "\n");
        });
        break;
    case 'STOP':
        leaveTable(fragments[1]);
        pub.publish(fragments[1]+":info", "stop " + nick, function() {});
        break;
    case 'QUIT':
        stream.end();
        break;
    }
    });

    stream.addListener("end", function() {
    pub.publish(nick, nick + " is offline");
    pub.srem('mclarens:inside', nick, function(err) {
        if(err) {
        sys.debug("Could not remove client");
        }
    });
    });
});

server.listen(8000, "localhost");

文件

有一吨的文档在那里,这些API是在这种类型的堆栈的快速变化,所以你必须权衡每个文档的时间相关性。

  • 节点活动流
  • Cloud Foundry的例子
  • 如何节点的Redis发布订阅
  • Redis的延迟
  • Redis的食谱使用的Pub / Sub异步通信
  • LinkedIn的通用技巧
  • 节点Redis的绑定
  • 谷歌群体问题的NodeJS

相关问题

就在几个相关的问题,这是在堆栈热门话题:

  • 在node.js的聊天服务器的Redis的pub / sub
  • 如何设计Redis的发布/订阅的即时消息系统?

值得注意的提示(因人而异)

关闭或优化套接字池,用高效的绑定,监控延迟,并确保你没有重复工作(即没有必要向所有发布听众两次)。



文章来源: Node.js, Socket.io, Redis pub/sub high volume, low latency difficulties