I have an RxJS sequence being consumed in the normal manner...
However, in the observable 'onNext' handler, some of the operations will complete synchronously, but others require async callbacks, that need to be waited on before processing the next item in the input sequence.
...little bit confused how to do this. Any ideas? thanks!
someObservable.subscribe(
function onNext(item)
{
if (item == 'do-something-async-and-wait-for-completion')
{
setTimeout(
function()
{
console.log('okay, we can continue');
}
, 5000
);
}
else
{
// do something synchronously and keep on going immediately
console.log('ready to go!!!');
}
},
function onError(error)
{
console.log('error');
},
function onComplete()
{
console.log('complete');
}
);
Each operation you want to perform can be modeled as an observable. Even the synchronous operation can be modeled this way. Then you can use
map
to convert your sequence into a sequence of sequences, then useconcatAll
to flatten the sequence.To reply to some of your comments...at some point you need to force some expectations on the stream of functions. In most languages, when dealing with functions that are possibly async, the function signatures are async and the actual async vs sync nature of the function is hidden as an implementation detail of the function. This is true whether you are using javaScript promises, Rx observables, c# Tasks, c++ Futures, etc. The functions end up returning a promise/observable/task/future/etc and if the function is actually synchronous, then the object it returns is just already completed.
Having said that, since this is JavaScript, you can cheat:
Another simple example to do manual async operations.
Be aware that it is not a good reactive practice ! If you only want to wait 1000ms, use Rx.Observable.timer or delay operator.
Now, replace setTimeout by your async function, like Image.onload or fileReader.onload ...
First of all, move your async operations out of
subscribe
, it's not made for async operations.What you can use is
mergeMap
(aliasflatMap
) orconcatMap
. (I am mentioning both of them, butconcatMap
is actuallymergeMap
with theconcurrent
parameter set to 1.) Settting a different concurrent parameter is useful, as sometimes you would want to limit the number of concurrent queries, but still run a couple concurrent.I will also show how you can rate limit your calls. Word of advice: Only rate limit at the point where you actually need it, like when calling an external API that allows only a certain number of requests per second or minutes. Otherwise it is better to just limit the number of concurrent operations and let the system move at maximal velocity.
We start with the following snippet:
Next, we need to pick values for
concurrent
,delay
and implementselector
.concurrent
anddelay
are closely related. For example, if we want to run 10 items per second, we can useconcurrent = 10
anddelay = 1000
(millisecond), but alsoconcurrent = 5
anddelay = 500
orconcurrent = 4
anddelay = 400
. The number of items per second will always beconcurrent / (delay / 1000)
.Now lets implement
selector
. We have a couple of options. We can set an minimal execution time forselector
, we can add a constant delay to it, we can emit the results as soon as they are available, we can can emit the result only after the minimal delay has passed etc. It is even possible to add an timeout by using thetimeout
operators. Convenience.Set minimal time, send result early:
Set minimal time, send result late:
Add time, send result early:
Add time, send result late: