rxjs subscription calls a function that returns a

2019-09-12 06:18发布

问题:

I'm trying to read JSON from a file, filter out stuff I don't want and possibly map and flatten the JSON elements, and then for each filtered JSON line in the file call a function that returns a promise. (Basically tail -f from a bunyan produced file). Additional Requirement: I want the pacing to be based on how fast the functions resolve the promise (output pacing, one element at a time, like Promise.each)

Sounds like a great use for RXJS and promises, right? Well, except that all the examples out there are for promises that produce observable streams, not streams that have subscribers who call functions that return promises.

The available schedulers for RXJS don't seem to have an output-based single concurrency pacing equivalent to Promise.each (I tried flatMapWithMaxConcurrent)

In this code, update returns a promise. The promises are resolved once I fixed a library bug, but the console output is out of order, and the promises are resolved in what appears to be a highly concurrent manner. The promises actually take 5 seconds to resolve, I would expect to a see a console message about every 5 seconds. Instead there's one delay of 5 seconds and a bunch of messages, meaning I think that many messages were run concurrently.

The library I created has Bluebird promises from the ground up, I can't rewrite it without removing promises from all my code. Kinda getting tempted, this is my second all nighter hunting down unresolved promises. I know how I'd write this in "C"...

function tailMyFile(filename, ocu, oce) {
    var handle = new Tail(filename);

    handle.bunyan().filter(function (x) {
        if ("descr" in x &&  "units" in x && "value" in x && "time" in x)  {
            return true;
        }
    })
    .subscribe (
        function(x) {
            if (x.descr == "enter") {
                MyPromisifiedLib.update(oce, x.value, "").done(function() {
                    console.log("updated enter value");
                })
            } else if ("push" in x) {
                MyPromisifiedLib.update(ocu, x.value, "push").done(function() {
                    console.log("updated with push");
                })
            } else {
                MyPromisifiedLib.update(ocu, x.value, "").done(function() {
                    console.log("updated value");
                })
            }
        },
        function(err) {
            console.error("err leaked through to subscriber");
            log.error("err leaked through to subscriber");
        },
        function() { log.info("Completed") }
    );
}