How to stream MongoDB Query Results with nodejs?

2019-01-21 17:21发布

问题:

I have been searching for an example of how I can stream the result of a MongoDB query to a nodejs client. All solutions I have found so far seem to read the query result at once and then send the result back to the server.

Instead, I would (obviously) like to supply a callback to the query method and have MongoDB call that when the next chunk of the result set is available.

I have been looking at mongoose - should I probably use a different driver?

Jan

回答1:

Streaming in Mongoose became available in version 2.4.0 which appeared three months after you've posted this question:

Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream);

More elaborated examples can be found on their documentation page.



回答2:

node-mongodb-driver (the underlying layer that every mongoDB client uses in nodejs) except the cursor API that others mentioned has a nice stream API (#458). Unfortunately i did not find it documented elsewhere.

Update: there are docs also here.

It can be used like this:

var stream = collection.find().stream()
stream.on('error', function (err) {
  console.error(err)
})
stream.on('data', function (doc) {
  console.log(doc)
})

It actually implements the ReadableStream interface, so it has all the goodies (pause/resume etc)



回答3:

mongoose is not really "driver", it's actually an ORM wrapper around the MongoDB driver (node-mongodb-native).

To do what you're doing, take a look at the driver's .find and .each method. Here's some code from the examples:

// Find all records. find() returns a cursor
collection.find(function(err, cursor) {
  sys.puts("Printing docs from Cursor Each")
  cursor.each(function(err, doc) {
    if(doc != null) sys.puts("Doc from Each " + sys.inspect(doc));
  })                    
});

To stream the results, you're basically replacing that sys.puts with your "stream" function. Not sure how you plan to stream the results. I think you can do response.write() + response.flush(), but you may also want to checkout socket.io.



回答4:

Here is the solution I found (please correct me anyone if thatis the wrong way to do it): (Also excuse the bad coding - too late for me now to prettify this)

var sys = require('sys')
var http = require("http");

var Db = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Db,
  Connection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Connection,
  Collection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Collection,
  Server = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Server;

var db = new Db('test', new Server('localhost',Connection.DEFAULT_PORT , {}));

var products;

db.open(function (error, client) {
  if (error) throw error;
  products = new Collection(client, 'products');
});

function ProductReader(collection) {
        this.collection = collection;
}

ProductReader.prototype = new process.EventEmitter();

ProductReader.prototype.do = function() {
        var self = this;

        this.collection.find(function(err, cursor) {
                if (err) {
                        self.emit('e1');
                        return;

                }
                sys.puts("Printing docs from Cursor Each");

                self.emit('start');
                cursor.each(function(err, doc) {
                        if (!err) {
                                self.emit('e2');
                                self.emit('end');
                                return;
                        }

                        if(doc != null) {
                                sys.puts("doc:" + doc.name);
                                self.emit('doc',doc);
                        } else {
                                self.emit('end');
                        }
                })
        });
};
http.createServer(function(req,res){
        pr = new ProductReader(products);
        pr.on('e1',function(){
                sys.puts("E1");
                res.writeHead(400,{"Content-Type": "text/plain"});
                res.write("e1 occurred\n");
                res.end();
        });
        pr.on('e2',function(){
                sys.puts("E2");
                res.write("ERROR\n");
        });

        pr.on('start',function(){
                sys.puts("START");
                res.writeHead(200,{"Content-Type": "text/plain"});
                res.write("<products>\n");
        });

        pr.on('doc',function(doc){
                sys.puts("A DOCUMENT" + doc.name);
                res.write("<product><name>" + doc.name + "</name></product>\n");
        });

        pr.on('end',function(){
                sys.puts("END");
                res.write("</products>");
                res.end();
        });

        pr.do();

  }).listen(8000);


回答5:

I have been studying mongodb streams myself, while I do not have the entire answer you are looking for, I do have part of it. you can setup a socket.io stream

this is using javascript socket.io and socket.io-streaming available at NPM also mongodb for the database because using a 40 year old database that has issues is incorrect, time to modernize also the 40 year old db is SQL and SQL doesn't do streams to my knowledge

So although you only asked about data going from server to client, I also want to get client to server in my answer because I can NEVER find it anywhere when I search and I wanted to setup one place with both the send and receive elements via stream so everyone could get the hang of it quickly.

client side sending data to server via streaming

stream = ss.createStream();
blobstream=ss.createBlobReadStream(data);
blobstream.pipe(stream);
ss(socket).emit('data.stream',stream,{},function(err,successful_db_insert_id){
 //if you get back the id it went into the db and everything worked
});

server receiving stream from the client side and then replying when done

ss(socket).on('data.stream.out',function(stream,o,c){
 buffer=[];
 stream.on('data',function(chunk){buffer.push(chunk);});
 stream.on('end',function(){
  buffer=Buffer.concat(buffer);
  db.insert(buffer,function(err,res){
   res=insertedId[0];
   c(null,res);
  });
 });
});

//This is the other half of that the fetching of data and streaming to the client

client side requesting and receiving stream data from server

stream=ss.createStream();
binarystring='';
stream.on('data',function(chunk){ 
 for(var I=0;i<chunk.length;i++){
  binarystring+=String.fromCharCode(chunk[i]); 
 }
});
stream.on('end',function(){ data=window.btoa(binarystring); c(null,data); });
ss(socket).emit('data.stream.get,stream,o,c);

server side replying to request for streaming data

ss(socket).on('data.stream.get',function(stream,o,c){
 stream.on('end',function(){
  c(null,true);
 });
 db.find().stream().pipe(stream);
});

The very last one there is the only one where I am kind of just pulling it out of my butt because I have not yet tried it, but that should work. I actually do something similar but I write the file to the hard drive then use fs.createReadStream to stream it to the client. So not sure if 100% but from what I read it should be, I'll get back to you once I test it.

P.s. anyone want to bug me about my colloquial way of talking, I'm Canadian, and I love saying "eh" come at me with your hugs and hits bros/sis' :D