Bluebird Promises with Event Emitter

2019-08-01 06:05发布

问题:

I am fairly new with using Bluebird promises. I was trying to use them over an emitter. However, I am stuck on how to handle errors.

I have a stream object which is the emitter. Code is as below -

return new Promise((resolve, reject) => {

    var onDocFunc = doc => {
        //JSON.parse('*');
        // some logic goes in here to construct setStmt
        bulk.find(query).upsert().update({$set: setStmt});
        count++;
        if (count % bulkSize == 0) {
            stream.pause();
            var execute = Promise.promisify(bulk.execute);
            execute().catch(() => {}).then(() => {
                stream.resume();
            });
        }
    };

    stream.on('data', onDocFunc);

    stream.on('end', () => {
        JSON.parse('*'); // how to catch errors that happen here??
        var boundResolve = resolve.bind(this, {count: count});
        if (count % bulkSize != 0) {
            Promise.promisify(bulk.execute)().then(boundResolve).catch(boundResolve);
        }
        else {
            boundResolve();
        }
    });

    stream.on('error', err => {
        reject(err);
    });

})

I want to know what is the recommended way to catch an error which occurs inside the callback of the end event handler? Right now if any error occurs, the NodeJS application crashes with uncaughtException: Unexpected token *

回答1:

Don't mix application logic into the promisification of the event emitter. Such code (that can throw etc) should always go in then callbacks. In your case:

var execute = Promise.promisify(bulk.execute);
return new Promise((resolve, reject) => {
    stream.on('data', onDocFunc); // not sure what this does
    stream.on('end', resolve);
    stream.on('error', reject);
}).then(() => {
    JSON.parse('*'); // exceptions that happen here are caught implicitly!
    var result = {count: count};
    if (count % bulkSize != 0) {
        return execute().catch(()=>{}).return(result);
    } else {
        return result;
    }
});

Regarding your real code, I'd probably try to factor out the batching into a helper function:

function asyncBatch(stream, size, callback) {
    var batch = [], count = 0;
    stream.on('data', data => {
        batch.push(data);
        count++;
        if (batch.length == size) {
            stream.pause();
            Promise.resolve(batch).then(callback).then(() => {
                batch = [];
                stream.resume();
            }, e => {
                stream.emit('error', e);
            });
        }
    });
    return new Promise((resolve, reject) => {
        stream.on('end', resolve);
        stream.on('error', reject);
    }).then(() => batch.length ? callback(batch) : null).then(() => count);
}

Promise.promisifyAll(Bulk);
return asyncBatch(stream, bulkSize, docs => {
    const bulk = new Bulk()
    for (const doc of docs) {
        // JSON.parse('*');
        // some logic goes in here to construct setStmt
        bulk.find(query).upsert().update({$set: setStmt});
    }
    return bulk.executeAsync().catch(err => {/* ignore */});
})


回答2:

You'll have to use a try/catch block:

stream.on('end', () => {
  try {
    JSON.parse('*')
    // ...the rest of your code
  } catch (e) {
    reject(e)
  }
})