rxjs - combining inner observables after filtering

2019-07-24 22:34发布

问题:

I call backend that respond with:

[
  "https://some-url.com/someData1.json",
  "https://some-url.com/someData2.json"
]

Each JSON can have following schema:

{
  "isValid": boolean,
  "data": string
}

I want to get array with all data, that have isValid is set to true

backend.get(url)
    .pipe(
        mergeMap((urls: []) =>
            urls.map((url: string) =>
                backend.get(url)
                    .pipe(
                        filter(response => response.isValid),
                        map(response => response.data)
                    )
            )
        ),
        combineAll()
    )

When both .json have "isValid" set to true, I get array with both data. But when one of them has "isValid" set to false observable never completes.

I could use mergeAll instead of combineAll, but then I receive stream of single data not collection of all data.

Is there any better way to filter out observable?

回答1:

As you said, the inner observable never emits, because filter does not forward the only value that is ever emitted by the backend.get observable. In that case, the operator subscribing on that observable - in your case combineAll - will also never receive any value and cannot ever emit itself.

What I would do is just move the filtering and mapping to combineAll by providing a project function, like that:

backend.get(url)
    .pipe(
        mergeMap((urls: string[]) =>
            urls.map((url: string) => backend.get(url))
        ),
        combineAll(responses =>
            responses
                .filter(response => response.isValid)
                .map(response => response.data)
        )
    )

See if that works for you ;)



回答2:

import { forkJoin, Observable } from 'rxjs';
import { map } from 'rxjs/operators';

interface IRes {
  isValid: boolean;
  data: string;
}

interface IResValid {
  isValid: true;
  data: string;
}

function isValid(data: IRes): data is IResValid {
  return data.isValid;
}

const res1$: Observable<IRes> = backend.get(url1);
const res2$: Observable<IRes> = backend.get(url2);

// When all observables complete, emit the last emitted value from each.
forkJoin([res1$, res2$])
  .pipe(map((results: IRes[]) => results.filter(isValid)))
  .subscribe((results: IResValid[]) => console.log(results));


标签: rxjs rxjs6