I am using RxJava to process some notifications that we pull from a queue.
RxJava seemed to work fine with a simple workflow, now with new requirements coming in, the flow is growing in complexity with more branches (please see below picture as a reference) I tried to exemplify the flow with a small unit test:
@Test
public void test() {
Observable.range(1, 100)
.groupBy(n -> n % 3)
.toMap(GroupedObservable::getKey)
.flatMap(m1 -> {
Observable<Integer> ones1 = m1.get(0);
Observable<Integer> twos1 = m1.get(1).map(n -> n - 10);
Observable<Integer> threes = m1.get(2).map(n -> n + 100);
Observable<Integer> onesAndTwos = Observable.merge(ones1, twos1)
.map(n -> n * 3)
.groupBy(n -> n % 2)
.toMap(GroupedObservable::getKey)
.flatMap(m2 -> {
Observable<Integer> ones2 = m2.get(0).map(n -> n * 10);
Observable<Integer> twos2 = m2.get(1).map(n -> n * 100);
return Observable.merge(ones2, twos2);
});
return Observable.merge(onesAndTwos, threes).map(n -> n +1);
})
.subscribe(System.out::println);
}
Whilst it's still technically achievable to use RxJava, I am now wondering if it's a good choice, as to formalise the branching I had to do 2 level of nesting inside the main flatMap
, which doesn't seem really neat.
Would this be the right way of describing a workflow like above? Or RxJava is not a good fit for branching workflows?
Thank you for your help!
The grouping observable is the right way to go AFAIK. Personally, if anything in your picture between "split by a type" and "merge everything", is async, doing this in RX definitely has a lot of advantages like retry logic, buffering, error handling, backpressure, etc.. If it's regular non-async code it's a personal preference I guess. You could do it using RX, but you can also do everything between "split by a type" and "merge everything" using regular synchronous code.
Whatever way you chose, splitting up the code to make it more readable is always a good idea so you can "read the flow" as easy as we can read the image you attached.
Just an idea for another approach which may work for you: Instead of grouping/toMap, you could multicast the source and handle the branches individually.
Example:
This video may be be helpful (at least the first 15 min)