I need to filter entries emitted by an observable by checking the entry against some web service. The normal observable.filter operator is not suitable here, as it expects the predicate function to return the verdict synchronously, but in this situation, the verdict can only be retrieved asynchronously.
I can make shift by the following code, but I was wondering whether there is some better operator I can use for this case.
someObservable.flatmap(function(entry) {
return Rx.Observable.fromNodeCallback(someAsynCheckFunc)(entry).map(function(verdict) {
return {
verdict: verdict,
entry: entry
};
});
}).filter(function(obj) {
return obj.verdict === true;
}).map(function(obj) {
return obj.entry;
});
Update for RxJS 6+
Since RxJS version 6.0 we have pipe operators instead of observable prototype method chaining.
So I updated the original code of this request to RxJS 6 pipeline style improved by the information in the accepted answer.
Update 1 to this Post
I now refactored this code into a npm package.
https://www.npmjs.com/package/filter-async-rxjs-pipe
The serial variant with concatMap already works correctly, the parallel variant with flatMap seems not to run in parallel currently. But since I need the concatMap version, I currently have all I need. If somebody has an idea on how to write the parallel version correctly, please add an issue at the connected Git repository. :)
Note
Since I only need to pass a predicate function which returns a Promise, I wrote the conversion of the Promise to an Observable directly into the filterAsync method. If you need to have an Observable as filter input, feel free to adjust the code.
Here is a gist with the full ts file code, including imports:
https://gist.github.com/bjesuiter/288326f9822e0bc82389976f8da66dd8#file-filter-async-ts
Here's how you'd implement such an operator using existing operators. There is one snag you need to think about. Because your filter operation is async, it is possible for new items to arrive faster than your filter operation can process them. What should happen in this case? Do you want to run the filters sequentially and guarantee that the order of your items is maintained? Do you want to run the filters in parallel and accept that your items may come out in different order?
Here are the 2 versions of the operator
Usage:
Not that I'm aware of. You could roll your own. I haven't tested this, but here's an idea:
And you could use it like so: