Rx java - difficult case

2019-08-22 07:05发布

问题:

Suppose

Observable<Integer> obs = Observable.just(1, 2, 3, 4, 5);

I need a sequence, where each even number of obs multiplied by count of even numbers in obs, and each odd number of obs multiplied by count of odd numbers in obs.

I.e. in given case there are 2 evens and 3 odds, so result sequence must be

3  (1 * 3)
4  (2 * 2)
9  (3 * 3)
8  (4 * 2)
15 (5 * 3)

How can I do it?

回答1:

You can achieve it with splitting the Observable to odd and even streams, then iterate over each stream and multiply by the count of each split stream:

Observable<Integer> obs = Observable.just(1, 2, 3, 4, 5);
Observable<Integer> oddsObservable = obs.filter(integer -> integer % 2 == 1);
Observable<Integer> evenObservable = obs.filter(integer -> integer % 2 == 0);
Observable.merge(
       oddsObservable.flatMap(odd -> oddsObservable.count().map(count -> odd * count)),
       evenObservable.flatMap(even -> evenObservable.count().map(count -> even * count))
)
      .sorted()
      .subscribe(System.out::println);

if you're doing it over in-memory lists/arrays you might consider using Java 8 Stream API, and doing the same, the code is very similar:

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> oddsList = list.stream().filter(integer -> integer % 2 == 1).collect(Collectors.toList());
List<Integer> evenList = list.stream().filter(integer -> integer % 2 == 0).collect(Collectors.toList());

Stream.of(oddsList.stream().map(odd -> odd * (int) oddsList.stream().count()),
    evenList.stream().map(even -> even * (int) evenList.stream().count()))
    .flatMap(integerStream -> integerStream)
    .sorted()
    .forEach(System.out::println);

in my machine it runs faster with order of magnitude (~ x12 ).

If the source of items/numbers id over time producer consider optimizing the Reactive approach to avoid resubscribe multiple times to Observables (replay/cache etc.)



回答2:

This worked for me.

    Observable<Integer> obs = Observable.just(1, 2, 3, 4, 5);

    Observable<Integer> evenCount = obs.filter(integer -> integer % 2 == 0);
    Observable<Integer> oddCount = obs.filter(integer -> integer % 2 != 0);


    obs.flatMap(
            integer -> {
                if (integer % 2 == 0) {
                    return evenCount.count().map(count -> count * integer).toObservable();
                } else {
                    return oddCount.count().map(count -> count * integer).toObservable();
                }
            }
    ).subscribe(
            s -> Log.v("", "answer -> " + s),
            error -> error.printStackTrace()
    );


回答3:

Observable<Integer> obs = Observable.just(1, 2, 3, 4, 5);
List<Integer> count = obs.groupBy(i -> i % 2)
    .sorted((g1, g2) -> g1.getKey() - g2.getKey())
    .concatMap(go -> go.count())
    .toList()
    .toBlocking()
    .first();
obs.map(i -> i * count.get(i % 2))
    .forEach(System.out::println);

Traverse twice.



标签: rx-java