Is there an “async” version of filter operator in

2020-08-12 18:17发布

I need to filter entries emitted by an observable by checking the entry against some web service. The normal observable.filter operator is not suitable here, as it expects the predicate function to return the verdict synchronously, but in this situation, the verdict can only be retrieved asynchronously.

I can make shift by the following code, but I was wondering whether there is some better operator I can use for this case.

someObservable.flatmap(function(entry) {
  return Rx.Observable.fromNodeCallback(someAsynCheckFunc)(entry).map(function(verdict) {
    return {
      verdict: verdict,
      entry: entry
    };
  });
}).filter(function(obj) {
  return obj.verdict === true;
}).map(function(obj) {
  return obj.entry;
});

3条回答
Explosion°爆炸
2楼-- · 2020-08-12 18:42

Update for RxJS 6+

Since RxJS version 6.0 we have pipe operators instead of observable prototype method chaining.

So I updated the original code of this request to RxJS 6 pipeline style improved by the information in the accepted answer.


Update 1 to this Post

I now refactored this code into a npm package.
https://www.npmjs.com/package/filter-async-rxjs-pipe

The serial variant with concatMap already works correctly, the parallel variant with flatMap seems not to run in parallel currently. But since I need the concatMap version, I currently have all I need. If somebody has an idea on how to write the parallel version correctly, please add an issue at the connected Git repository. :)


Note
Since I only need to pass a predicate function which returns a Promise, I wrote the conversion of the Promise to an Observable directly into the filterAsync method. If you need to have an Observable as filter input, feel free to adjust the code.

export function filterAsync<T>(predicate: (value: T, index: number) => Promise<boolean>): MonoTypeOperatorFunction<T> {
    let count = 0;
    return pipe(
        // Convert the predicate Promise<boolean> to an observable (which resolves the promise,
        // Then combine the boolean result of the promise with the input data to a container object
        concatMap((data: T) => {
            return from(predicate(data, count++))
                .pipe(map((isValid) => ({filterResult: isValid, entry: data})));
        }),
        // Filter the container object synchronously for the value in each data container object
        filter(data => data.filterResult === true),
        // remove the data container object from the observable chain
        map(data => data.entry)
    );
}

Here is a gist with the full ts file code, including imports:
https://gist.github.com/bjesuiter/288326f9822e0bc82389976f8da66dd8#file-filter-async-ts

查看更多
老娘就宠你
3楼-- · 2020-08-12 18:49

Here's how you'd implement such an operator using existing operators. There is one snag you need to think about. Because your filter operation is async, it is possible for new items to arrive faster than your filter operation can process them. What should happen in this case? Do you want to run the filters sequentially and guarantee that the order of your items is maintained? Do you want to run the filters in parallel and accept that your items may come out in different order?

Here are the 2 versions of the operator

// runs the filters in parallel (order not guaranteed)
// predicate should return an Observable
Rx.Observable.prototype.flatFilter = function (predicate) {
    return this.flatMap(function (value, index) {
        return predicate(value, index)
            .filter(Boolean) // filter falsy values
            .map(function () { return value; });
    });
};

// runs the filters sequentially (order preserved)
// predicate should return an Observable
Rx.Observable.prototype.concatFilter = function (predicate) {
    return this.concatMap(function (value, index) {
        return predicate(value, index)
            .filter(Boolean) // filter falsy values
            .map(function () { return value; });
    });
};

Usage:

var predicate = Rx.Observable.fromNodeCallback(someAsynCheckFunc);
someObservable.concatFilter(predicate).subscribe(...);
查看更多
欢心
4楼-- · 2020-08-12 18:51

Not that I'm aware of. You could roll your own. I haven't tested this, but here's an idea:

Observable.prototype.flatFilter = function(predicate) {
  var self = this;
  return Observable.create(function(obs) {
    var disposable = new CompositeDisposable();
    disposable.add(self.subscribe(function(x) {
       disposable.add(predicate(x).subscribe(function(result) {
         if(result) {
            obs.onNext(x);
         }
       }, obs.onError.bind(obs)));
    }, obs.onError.bind(obs), obs.onCompleted.bind(obs)));
    return disposable;
  });
};

And you could use it like so:

someStream.flatFilter(function(x) {
  return Rx.DOM.get('/isValid?check=' + x);
}).subscribe(function(x) {
  console.log(x);
});
查看更多
登录 后发表回答