Is RxJava a good fit for branching workflows?

2020-04-21 05:32发布

问题:

I am using RxJava to process some notifications that we pull from a queue.

RxJava seemed to work fine with a simple workflow, now with new requirements coming in, the flow is growing in complexity with more branches (please see below picture as a reference) I tried to exemplify the flow with a small unit test:

@Test
public void test() {
    Observable.range(1, 100)
        .groupBy(n -> n % 3)
        .toMap(GroupedObservable::getKey)
        .flatMap(m1 -> {
            Observable<Integer> ones1 = m1.get(0);
            Observable<Integer> twos1 = m1.get(1).map(n -> n - 10);
            Observable<Integer> threes = m1.get(2).map(n -> n + 100);
            Observable<Integer> onesAndTwos = Observable.merge(ones1, twos1)
                .map(n -> n * 3)
                .groupBy(n -> n % 2)
                .toMap(GroupedObservable::getKey)
                .flatMap(m2 -> {
                    Observable<Integer> ones2 = m2.get(0).map(n -> n * 10);
                    Observable<Integer> twos2 = m2.get(1).map(n -> n * 100);
                    return Observable.merge(ones2, twos2);
                });
                return Observable.merge(onesAndTwos, threes).map(n -> n +1);
        })
        .subscribe(System.out::println);
}

Whilst it's still technically achievable to use RxJava, I am now wondering if it's a good choice, as to formalise the branching I had to do 2 level of nesting inside the main flatMap, which doesn't seem really neat.

Would this be the right way of describing a workflow like above? Or RxJava is not a good fit for branching workflows?

Thank you for your help!

回答1:

The grouping observable is the right way to go AFAIK. Personally, if anything in your picture between "split by a type" and "merge everything", is async, doing this in RX definitely has a lot of advantages like retry logic, buffering, error handling, backpressure, etc.. If it's regular non-async code it's a personal preference I guess. You could do it using RX, but you can also do everything between "split by a type" and "merge everything" using regular synchronous code.

Whatever way you chose, splitting up the code to make it more readable is always a good idea so you can "read the flow" as easy as we can read the image you attached.



回答2:

Just an idea for another approach which may work for you: Instead of grouping/toMap, you could multicast the source and handle the branches individually.

Example:

@Test
public void multicastingShare() {
    final Observable<Integer> sharedSource = Observable.range(1, 10)
            .doOnSubscribe(dummy -> System.out.println("subscribed"))
            .share();
    // split by some criteria
    final Observable<String> oddItems = sharedSource
            .filter(n -> n % 2 == 1)
            .map(odd -> "odd: " + odd)
            .doOnNext(System.out::println);
    final Observable<String> evenItems = sharedSource
            .filter(n -> n % 2 == 0)
            .map(even -> "even: " + even)
            .doOnNext(System.out::println);

    // recombine the individual streams at some point
    Observable.concat(oddItems, evenItems)
            .subscribe(result -> System.out.println("result: " + result));
}

This video may be be helpful (at least the first 15 min)