-->

Using cluster in a Node module

2020-07-11 03:48发布

问题:

UPDATE: Even if this particular scenario is not realistic, as per comments, I'm still interested in how one could write a module that makes use of clustering without rerunning the parent process each time.


I'm trying to write a Node.js module called mass-request that speeds up large numbers of HTTP requests by distributing them to child processes.

My hope is that, on the outside, it work like this.

var mr = require("mass-request"),
    scraper = mr();

for (var i = 0; i < my_urls_to_visit.length; i += 1) {
    scraper.add(my_urls_to_visit[i], function(resp) {
        // do something with response
    }
}

To get started, I put together a skeleton for the mass-request module.

var cluster = require("cluster"),
    numCPUs = require("os").cpus().length;

module.exports = function() {
    console.log("hello from mass-request!");
    if (cluster.isMaster) {
        for (var i = 0; i < numCPUs; i += 1) {
            var worker = cluster.fork();             
        }

        return {
            add: function(url, cb) {}       
        }       
    } else {
        console.log("worker " + process.pid + " is born!");
    }  
}

Then I test it like so in a test script:

var m = mr();
console.log("hello from test.js!", m);

I expected to see "hello from mass-request!" logged four times (as indeed it is). To my amazement, I also see "hello from test.js" four times. Clearly I do not understand how cluster.fork() works. Is it rerunning the whole process, not just the function that call it the first time?

If so, how does one make use of clustering in a module without troubling the person who uses that module with messy multi-process logic?

回答1:

I believe what you are looking for is in setupMaster

From the docs:

cluster.setupMaster([settings])

  • settings Object
    • exec String file path to worker file. (Default=process.argv[1])
    • args Array string arguments passed to worker. (Default=process.argv.slice(2))
    • silent Boolean whether or not to send output to parent's stdio. (Default=false)

setupMaster is used to change the default 'fork' behavior. Once called, the settings will be present in cluster.settings

By making use of the exec property you can have your workers launched from a different module.

Important: as the docs state, this can only be called once. If you are depending on this behavior for your module, then the caller can't be using cluster or the whole thing falls apart.

For example:

index.js

var cluster = require("cluster"),
  path = require("path"),
  numCPUs = require("os").cpus().length;

console.log("hello from mass-request!");
if (cluster.isMaster) {
  cluster.setupMaster({
    exec: path.join(__dirname, 'worker.js')
  });

  for (var i = 0; i < numCPUs; i += 1) {
    var worker = cluster.fork();
  }

  return {
    add: function (url, cb) {
    }
  }
} else {
  console.log("worker " + process.pid + " is born!");
}

worker.js

console.log("worker " + process.pid + " is born!");

output

node index.js 
hello from mass-request!
worker 38821 is born!
worker 38820 is born!
worker 38822 is born!
worker 38819 is born!


回答2:

While it's true that the asynchronous nature of node.js makes it awesome, it still runs in a single thread on the server in a single event loop. Multithreading a node.js app with cluster allows you to fork off child processes of the app into their own threads, allowing you to make better use of a multi-core server. I had built a game server architecture a while back that used cluster and zmq (ZeroMQ) to multithread and enable the processes to easily send messages back and forth over various channels. I've simplified that architecture into the example below to hopefully help illustrate how multithreaded node.js can be put together. I apologize if it's a bit rough, it was years ago and I was relatively new to node at the time ;)

Ideally, you don't want to nest everything for the master/child in a single script, but I figured this was the easiest way to let you copy/paste/run :)

As you mentioned in your comment, I gave a good example of clustering, but not one that fit your specific use case as far as dispatching everything around. I didn't have a whole lot of time, so I adapted my example to make it work for your needs rather quickly. Give this a shot:

mass-request.js

var cluster = require('cluster');
var zmq = require('zmq');

module.exports = {
    _childId : null,
    _urls : [],
    _threadCount : 1,
    _readyThreads : 0,
    _callbacks : {},
    zmqReceive : null, //the socket we receive on for this thread
    zmqMaster : null, //the socket to the master
    zmqChildren : {}, //an object storing the sockets for the children
    setThreads : function( threadCount ) {
        this._threadCount = threadCount;
    },
    add : function( url , cb ) {
        this._urls.push( {url: url, cb : cb } );
    },
    run : function() {

        if( cluster.isMaster ) {

            this._masterThread();

        } else {

            this._childThread();

        }

    },
    _masterThread : function() {

        console.log( 'Master Process Starting Up' );

        this.zmqReceive = zmq.socket('pull').bindSync( 'ipc://master.ipc' );

        //bind handler for messages coming into this process using closure to allow us to access the massrequest object inside the callback
        ( function( massRequest ) {
            this.zmqReceive.on( 'message' , function( msg ) {

                msg = JSON.parse(msg);

                //was this an online notification?
                if( msg && msg.status == 'Online' ) {
                    massRequest._threadReady();
                    return; //we're done
                }
                if( msg && msg.html ) {
                    //this was a response from a child, call the callback for it
                    massRequest._callbacks[ msg.sender ].call( massRequest , msg.html );
                    //send the child another URL
                    massRequest._sendUrlToChild( msg.sender );
                }

            } );
        }).call( this , this );

        //fork 4 child processes and set up the sending sockets for them
        for( var i=0; i < this._threadCount; ++i ) {
            //set up the sending socket
            this.zmqChildren[i] = zmq.socket('push').connect( 'ipc://child_' + i + '.ipc' );
            //fork the process and pass it an id
            cluster.fork( {
                _childId:i
            } );
        }

    },
    _sendUrlToChild : function( child ) {
        //if there's no urls left, return (this would also be a good place to send a message to the child to exit gracefully)
        if( !this._urls.length ) return;
        //grab a url to process
        var item = this._urls.pop();
        //set the callback for the child
        this._callbacks[child] = item.cb;
        this.zmqChildren[child].send( JSON.stringify( { url:item.url } ) );
    },
    _processUrls : function() {
        for( var i=0; i < this._threadCount; ++i ) {
            this._sendUrlToChild( i );
        }
    },
    _threadReady : function() {
        if( ++this._readyThreads >= this._threadCount ) {
            //all threads are ready, send out urls to start the mayhem
            console.log( 'All threads online, starting URL processing' );
            this._processUrls();
        }
    },
    _childProcessUrl : function( url ) {
        console.log( 'Child Process ' + this.childId + ' Handling URL: ' + url );
        //do something here to scrape your content however you see fit
        var html = 'HTML';
        this.zmqMaster.send( JSON.stringify( { sender:this.childId, html:html } ) );
    },
    _childThread : function() {

        //get the child id that was passed from cluster
        this.childId = process.env._childId;

        console.log( 'Child Process ' + this.childId + ' Starting Up' );

        //bind the pull socket to receive messages to this process
        this.zmqReceive = zmq.socket('pull').bindSync( 'ipc://child_' + this.childId + '.ipc' );

        //bind the push socket to send to the master
        this.zmqMaster = zmq.socket('push').connect('ipc://master.ipc');

        //bind handler for messages coming into this process
        ( function( massRequest ) {
            this.zmqReceive.on( 'message' , function( msg ) {

                msg = JSON.parse(msg);

                console.log( 'Child ' + this.childId + ': ' + msg );

                //handle the url
                if( msg && msg.url ) massRequest._childProcessUrl( msg.url );

            } );
        }).call( this , this );

        //let the master know we're done setting up
        this.zmqMaster.send( JSON.stringify({sender:this.childId,status:'Online'}) );

    },
}

demo.js

var mr = require( './mass-request.js' );
mr.setThreads( 4 );
mr.add( 'http://foo.com' , function( resp ) {
    console.log( 'http://foo.com is done' );
} );
mr.add( 'http://bar.com' , function( resp ) {
    console.log( 'http://bar.com is done' );
} );
mr.add( 'http://alpha.com' , function( resp ) {
    console.log( 'http://alpha.com is done' );
} );
mr.add( 'http://beta.com' , function( resp ) {
    console.log( 'http://beta.com is done' );
} );
mr.add( 'http://theta.com' , function( resp ) {
    console.log( 'http://theta.com is done' );
} );
mr.add( 'http://apples.com' , function( resp ) {
    console.log( 'http://apples.com is done' );
} );
mr.add( 'http://oranges.com' , function( resp ) {
    console.log( 'http://oranges.com is done' );
} );
mr.run();

Put those in the same folder and run node demo.js.

I should also point out that since the base of this was pulled from one of my other projects that utilized [0MQ][http://zeromq.org/], you will need that installed alongside the [node.js module for it][https://github.com/JustinTulloss/zeromq.node] npm install zmq and obviously the cluster module. You can swap out the ZMQ parts for any other method of interprocess communication you desire of course. This just happens to be one I was familiar with and had used.

Brief overview: The master thread AKA the script that calls the run() method will spin up X children (can be set by calling setThreads). Those children report back to the master thread via ZeroMQ sockets when they're finished initializing. Once all threads are ready, the master script dispatches the urls to the children so they can run off and fetch the HTML. They return the HTML to the master where it passes it into the appropriate callback function for that URL and then dispatches another URL to the child script. While it's not a perfect solution, the callback functions are still going to bottleneck in the main (master) thread because you can't easily move them off to another thread. Those callbacks may contain closures/variables/etc that may not work properly outside of the parent thread without some kind of object sharing mechanism.

Anywho, if you spin up my little demo here you'll see 4 threads "processing" the urls (they don't actually load the urls for simplicity sake).

Hopefully that helps ;)