-->

How does RxJava Observable “Iteration” work?

2020-08-17 17:58发布

问题:

I started to play around with RxJava and ReactFX, and I became pretty fascinated with it. But as I'm experimenting I have dozens of questions and I'm constantly researching for answers.

One thing I'm observing (no pun intended) is of course lazy execution. With my exploratory code below, I noticed nothing gets executed until the merge.subscribe(pet -> System.out.println(pet)) is called. But what fascinated me is when I subscribed a second subscriber merge.subscribe(pet -> System.out.println("Feed " + pet)), it fired the "iteration" again.

What I'm trying to understand is the behavior of the iteration. It does not seem to behave like a Java 8 stream that can only be used once. Is it literally going through each String one at a time and posting it as the value for that moment? And do any new subscribers following any previously fired subscribers receive those items as if they were new?

public class RxTest {

    public static void main(String[] args) {

        Observable<String> dogs = Observable.from(ImmutableList.of("Dasher", "Rex"))
                .filter(dog -> dog.matches("D.*"));

        Observable<String> cats = Observable.from(ImmutableList.of("Tabby", "Grumpy Cat", "Meowmers", "Peanut"));

        Observable<String> ferrets = Observable.from(CompletableFuture.supplyAsync(() -> "Harvey"));

        Observable<String> merge = dogs.mergeWith(cats).mergeWith(ferrets);

        merge.subscribe(pet -> System.out.println(pet));


        merge.subscribe(pet -> System.out.println("Feed " + pet));

    }
}

回答1:

Observable<T> represents a monad, a chained operation, not the execution of the operation itself. It is descriptive language, rather than the imperative you're used to. To execute an operation, you .subscribe() to it. Every time you subscribe a new execution stream is created from scratch. Do not confuse streams with threads, as subscription are executed synchronously unless you specify a thread change with .subscribeOn() or .observeOn(). You chain new elements to any existing operation/monad/Observable to add new behaviour, like changing threads, filtering, accumulation, transformation, etc. In case your observable is an expensive operation you don't want to repeat on every subscription, you can prevent recreation by using .cache().

To make any asynchronous/synchronous Observable<T> operation into a synchronous inlined one, use .toBlocking() to change its type to BlockingObservable<T>. Instead of .subscribe() it contains new methods to execute operations on each result with .forEach(), or coerce with .first()

Observables are a good tool because they're mostly* deterministic (same inputs always yield same outputs unless you're doing something wrong), reusable (you can send them around as part of a command/policy pattern) and for the most part ignore concurrence because they should not rely on shared state (a.k.a. doing something wrong). BlockingObservables are good if you're trying to bring an observable-based library into imperative language, or just executing an operation on an Observable that you have 100% confidence it's well managed.

Architecting your application around these principles is a change of paradigm that I can't really cover on this answer.

*There are breaches like Subject and Observable.create() that are needed to integrate with imperative frameworks.



标签: java rx-java