Rx java - difficult case

2019-08-22 06:57发布

Suppose

Observable<Integer> obs = Observable.just(1, 2, 3, 4, 5);

I need a sequence, where each even number of obs multiplied by count of even numbers in obs, and each odd number of obs multiplied by count of odd numbers in obs.

I.e. in given case there are 2 evens and 3 odds, so result sequence must be

3  (1 * 3)
4  (2 * 2)
9  (3 * 3)
8  (4 * 2)
15 (5 * 3)

How can I do it?

标签: rx-java
3条回答
走好不送
2楼-- · 2019-08-22 07:24

You can achieve it with splitting the Observable to odd and even streams, then iterate over each stream and multiply by the count of each split stream:

Observable<Integer> obs = Observable.just(1, 2, 3, 4, 5);
Observable<Integer> oddsObservable = obs.filter(integer -> integer % 2 == 1);
Observable<Integer> evenObservable = obs.filter(integer -> integer % 2 == 0);
Observable.merge(
       oddsObservable.flatMap(odd -> oddsObservable.count().map(count -> odd * count)),
       evenObservable.flatMap(even -> evenObservable.count().map(count -> even * count))
)
      .sorted()
      .subscribe(System.out::println);

if you're doing it over in-memory lists/arrays you might consider using Java 8 Stream API, and doing the same, the code is very similar:

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> oddsList = list.stream().filter(integer -> integer % 2 == 1).collect(Collectors.toList());
List<Integer> evenList = list.stream().filter(integer -> integer % 2 == 0).collect(Collectors.toList());

Stream.of(oddsList.stream().map(odd -> odd * (int) oddsList.stream().count()),
    evenList.stream().map(even -> even * (int) evenList.stream().count()))
    .flatMap(integerStream -> integerStream)
    .sorted()
    .forEach(System.out::println);

in my machine it runs faster with order of magnitude (~ x12 ).

If the source of items/numbers id over time producer consider optimizing the Reactive approach to avoid resubscribe multiple times to Observables (replay/cache etc.)

查看更多
萌系小妹纸
3楼-- · 2019-08-22 07:27

This worked for me.

    Observable<Integer> obs = Observable.just(1, 2, 3, 4, 5);

    Observable<Integer> evenCount = obs.filter(integer -> integer % 2 == 0);
    Observable<Integer> oddCount = obs.filter(integer -> integer % 2 != 0);


    obs.flatMap(
            integer -> {
                if (integer % 2 == 0) {
                    return evenCount.count().map(count -> count * integer).toObservable();
                } else {
                    return oddCount.count().map(count -> count * integer).toObservable();
                }
            }
    ).subscribe(
            s -> Log.v("", "answer -> " + s),
            error -> error.printStackTrace()
    );
查看更多
走好不送
4楼-- · 2019-08-22 07:39
Observable<Integer> obs = Observable.just(1, 2, 3, 4, 5);
List<Integer> count = obs.groupBy(i -> i % 2)
    .sorted((g1, g2) -> g1.getKey() - g2.getKey())
    .concatMap(go -> go.count())
    .toList()
    .toBlocking()
    .first();
obs.map(i -> i * count.get(i % 2))
    .forEach(System.out::println);

Traverse twice.

查看更多
登录 后发表回答