Wait for an async operation in onNext of RxJS Obse

2020-05-20 02:01发布

I have an RxJS sequence being consumed in the normal manner...

However, in the observable 'onNext' handler, some of the operations will complete synchronously, but others require async callbacks, that need to be waited on before processing the next item in the input sequence.

...little bit confused how to do this. Any ideas? thanks!

someObservable.subscribe(
    function onNext(item)
    {
        if (item == 'do-something-async-and-wait-for-completion')
        {
            setTimeout(
                function()
                {
                    console.log('okay, we can continue');
                }
                , 5000
            );
        }
        else
        {
            // do something synchronously and keep on going immediately
            console.log('ready to go!!!');
        }
    },
    function onError(error)
    {
        console.log('error');
    },
    function onComplete()
    {
        console.log('complete');
    }
);

3条回答
欢心
2楼-- · 2020-05-20 02:50

Each operation you want to perform can be modeled as an observable. Even the synchronous operation can be modeled this way. Then you can use map to convert your sequence into a sequence of sequences, then use concatAll to flatten the sequence.

someObservable
    .map(function (item) {
        if (item === "do-something-async") {
            // create an Observable that will do the async action when it is subscribed
            // return Rx.Observable.timer(5000);

            // or maybe an ajax call?  Use `defer` so that the call does not
            // start until concatAll() actually subscribes.
            return Rx.Observable.defer(function () { return Rx.Observable.ajaxAsObservable(...); });
        }
        else {
            // do something synchronous but model it as an async operation (using Observable.return)
            // Use defer so that the sync operation is not carried out until
            // concatAll() reaches this item.
            return Rx.Observable.defer(function () {
                return Rx.Observable.return(someSyncAction(item));
            });
        }
    })
    .concatAll() // consume each inner observable in sequence
    .subscribe(function (result) {
    }, function (error) {
        console.log("error", error);
    }, function () {
        console.log("complete");
    });

To reply to some of your comments...at some point you need to force some expectations on the stream of functions. In most languages, when dealing with functions that are possibly async, the function signatures are async and the actual async vs sync nature of the function is hidden as an implementation detail of the function. This is true whether you are using javaScript promises, Rx observables, c# Tasks, c++ Futures, etc. The functions end up returning a promise/observable/task/future/etc and if the function is actually synchronous, then the object it returns is just already completed.

Having said that, since this is JavaScript, you can cheat:

var makeObservable = function (func) {
    return Rx.Observable.defer(function () {
        // execute the function and then examine the returned value.
        // if the returned value is *not* an Rx.Observable, then
        // wrap it using Observable.return
        var result = func();
        return result instanceof Rx.Observable ? result: Rx.Observable.return(result);
    });
}

someObservable
    .map(makeObservable)
    .concatAll()
    .subscribe(function (result) {
    }, function (error) {
        console.log("error", error);
    }, function () {
        console.log("complete");
    });
查看更多
我命由我不由天
3楼-- · 2020-05-20 02:51

Another simple example to do manual async operations.

Be aware that it is not a good reactive practice ! If you only want to wait 1000ms, use Rx.Observable.timer or delay operator.

someObservable.flatMap(response => {
  return Rx.Observable.create(observer => {
    setTimeout(() => {
      observer.next('the returned value')
      observer.complete()
    }, 1000)
  })
}).subscribe()

Now, replace setTimeout by your async function, like Image.onload or fileReader.onload ...

查看更多
神经病院院长
4楼-- · 2020-05-20 02:54

First of all, move your async operations out of subscribe, it's not made for async operations.

What you can use is mergeMap (alias flatMap) or concatMap. (I am mentioning both of them, but concatMap is actually mergeMap with the concurrent parameter set to 1.) Settting a different concurrent parameter is useful, as sometimes you would want to limit the number of concurrent queries, but still run a couple concurrent.

source.concatMap(item => {
  if (item == 'do-something-async-and-wait-for-completion') {
    return Rx.Observable.timer(5000)
      .mapTo(item)
      .do(e => console.log('okay, we can continue'));
    } else {
      // do something synchronously and keep on going immediately
      return Rx.Observable.of(item)
        .do(e => console.log('ready to go!!!'));
    }
}).subscribe();

I will also show how you can rate limit your calls. Word of advice: Only rate limit at the point where you actually need it, like when calling an external API that allows only a certain number of requests per second or minutes. Otherwise it is better to just limit the number of concurrent operations and let the system move at maximal velocity.

We start with the following snippet:

const concurrent;
const delay;
source.mergeMap(item =>
  selector(item, delay)
, concurrent)

Next, we need to pick values for concurrent, delay and implement selector. concurrent and delay are closely related. For example, if we want to run 10 items per second, we can use concurrent = 10 and delay = 1000 (millisecond), but also concurrent = 5 and delay = 500 or concurrent = 4 and delay = 400. The number of items per second will always be concurrent / (delay / 1000).

Now lets implement selector. We have a couple of options. We can set an minimal execution time for selector, we can add a constant delay to it, we can emit the results as soon as they are available, we can can emit the result only after the minimal delay has passed etc. It is even possible to add an timeout by using the timeout operators. Convenience.

Set minimal time, send result early:

function selector(item, delay) {
   return Rx.Observable.of(item)
     .delay(1000) // replace this with your actual call.
     .merge(Rx.Observable.timer(delay).ignoreElements())
}

Set minimal time, send result late:

function selector(item, delay) {
   return Rx.Observable.of(item)
     .delay(1000) // replace this with your actual call.
     .zip(Rx.Observable.timer(delay), (item, _))
}

Add time, send result early:

function selector(item, delay) {
   return Rx.Observable.of(item)
     .delay(1000) // replace this with your actual call.
     .concat(Rx.Observable.timer(delay).ignoreElements())
}

Add time, send result late:

function selector(item, delay) {
   return Rx.Observable.of(item)
     .delay(1000) // replace this with your actual call.
     .delay(delay)
}
查看更多
登录 后发表回答