How to disconnect a socket after streaming data?

2019-08-31 17:04发布

I am making use of "socket.io-client" and "socket.io stream" to make a request and then stream some data. I have the following code that handles this logic

Client Server Logic

router.get('/writeData', function(req, res) {
    var io = req.app.get('socketio');
    var nameNodeSocket = io.connect(NAMENODE_ADDRESS, { reconnect: true });

    var nameNodeData = {};

    async.waterfall([
        checkForDataNodes,
        readFileFromS3
    ], function(err, result) {
        if (err !== null) {
            res.json(err);
        }else{
            res.json("Finished Writing to DN's");
        }
    });

    function checkForDataNodes(cb) {
        nameNodeSocket.on('nameNodeData', function(data) {
            nameNodeData = data;
            console.log(nameNodeData);
            cb(null, nameNodeData);
        });
        if (nameNodeData.numDataNodes === 0) {
            cb("No datanodes found");
        }
    }

    function readFileFromS3(nameNodeData, cb) {
        for (var i in nameNodeData['blockToDataNodes']) {
            var IP = nameNodeData['blockToDataNodes'][i]['ipValue'];
            var dataNodeSocket = io.connect('http://'+ IP +":5000");
            var ss = require("socket.io-stream");
            var stream = ss.createStream();

            var byteStartRange = nameNodeData['blockToDataNodes'][i]['byteStart'];
            var byteStopRange = nameNodeData['blockToDataNodes'][i]['byteStop'];
            paramsWithRange['Range'] = "bytes=" + byteStartRange.toString() + "-" + byteStopRange.toString();

            //var file = require('fs').createWriteStream('testFile' + i + '.txt');
            var getFileName = nameNodeData['blockToDataNodes'][i]['key'].split('/');
            var fileData = {
                'mainFile': paramsWithRange['Key'].split('/')[1],
                'blockName': getFileName[1]
            };
            ss(dataNodeSocket).emit('sendData', stream, fileData);
            s3.getObject(paramsWithRange).createReadStream().pipe(stream);
            //dataNodeSocket.disconnect();
        }
        cb(null);
    }
}); 

Server Logic (that gets the data)

var dataNodeIO = require('socket.io')(server);
var ss = require("socket.io-stream");
dataNodeIO.on('connection', function(socket) {
    console.log("Succesfully connected!");
    ss(socket).on('sendData', function(stream, data) {
        var IP = data['ipValue'];
        var blockName = data['blockName'];
        var mainFile = data['mainFile'];
        dataNode.makeDir(mainFile);
        dataNode.addToReport(mainFile, blockName);
        stream.pipe(fs.createWriteStream(mainFile + '/' + blockName));
    });
});

How can I properly disconnect the connections in function readFileFromS3. I have noticed using dataNodeSocket.disconnect() at the end does not work as I cannot verify the data was received on the 2nd server. But if I comment it out, I can see the data being streamed to the second server.

My objective is to close the connections in Client Server side

1条回答
聊天终结者
2楼-- · 2019-08-31 17:17

It appears that the main problem with closing the socket is that you weren't waiting for the stream to be done writing before trying to close the socket. So, because the writing is all asynchronous and finishes sometime later, you were trying to close the socket before the data had been written.

Also because you were putting asynchronous operations inside a for loop, you were also running all your operations in parallel which may not be exactly what you want as it makes error handling more difficult and server load more difficult.

Here's the code I would suggest that does the following:

  1. Create a function streamFileFromS3() that streams a single file and returns a promise that will notify when it's done.
  2. Use await in a for loop with that streamFileFromS3() to serialize the operations. You don't have to serialize them, but then you would have to change your error handling to figure out what to do if one errors while the others are already running and you'd have to be more careful about concurrency issues.
  3. Use try/catch to catch any errors from streamFileFromS3().
  4. Add error handling on the stream.
  5. Change all occurrences of data['propertyName'] to data.propertyName. The only time you need to use brackets is if the property name contains a character that is not allowed in a Javascript identifier or if the property name is in a variable. Otherwise, the dot notation is preferred.
  6. Add socket.io connection error handling logic for both socket.io connections.
  7. Set returned status to 500 when there's an error processing the request

So, here's the code for that:

const ss = require("socket.io-stream");

router.get('/writeData', function(req, res) {
    const io = req.app.get('socketio');

    function streamFileFromS3(ip, data) {
        return new Promise((resolve, reject) => {
            const dataNodeSocket = io.connect(`http://${ip}:5000`);
            dataNodeSocket.on('connect_error', reject);
            dataNodeSocket.on('connect_timeout', () {
                reject(new Error(`timeout connecting to http://${ip}:5000`));
            });
            dataNodeSocket.on('connection', () => {
                // dataNodeSocket connected now
                const stream = ss.createStream().on('error', reject);
                paramsWithRange.Range = `bytes=${data.byteStart}-${data.byteStop}`;

                const filename = data.key.split('/')[1];
                const fileData = {
                    'mainFile': paramsWithRange.Key.split('/')[1],
                    'blockName': filename
                };
                ss(dataNodeSocket).emit('sendData', stream, fileData);
                // get S3 data and pipe it to the socket.io stream
                s3.getObject(paramsWithRange).createReadStream().on('error', reject).pipe(stream);
                stream.on('close', () => {
                    dataNodeSocket.disconnect();
                    resolve();
                });
            });
        });
    }

    function connectError(msg) {
        res.status(500).send(`Error connecting to ${NAMENODE_ADDRESS}`);
    }

    const nameNodeSocket = io.connect(NAMENODE_ADDRESS, { reconnect: true });
    nameNodeSocket.on('connect_error', connectError).on('connect_timeout', connectError);
    nameNodeSocket.on('nameNodeData', async (nameNodeData) => {
        try {
            for (let item of nameNodeData.blockToDataNodes) {
                await streamFileFromS3(item.ipValue, item);
            }
            res.json("Finished Writing to DN's");
        } catch(e) {
            res.status(500).json(e);
        }
    });
}); 

Other notes:

I don't know what paramsWithRange is as it is not declared here and when you were doing everything in parallel, it was getting shared among all the connections which is asking for a concurrency issue. In my serialized implementation, it's probably safe to share it, but the way it is now bothers me as it's a concurrency issue waiting to happen.

查看更多
登录 后发表回答