RxJava order of execution confusion

2019-09-17 07:16发布

问题:

I have this very simple RxJava example

List<Integer> arrayIntegers = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5));

Observable.fromIterable(arrayIntegers).map(i -> {
    Log.d("RxJava", "map i = " + i);
    return i;
}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).
subscribe(new DisposableObserver<Integer>() {
    @Override
    public void onNext(Integer i) {
        Log.d("RxJava", "next i = " + i);
    }

    @Override
    public void onError(Throwable e) {}

    @Override
    public void onComplete() {
        Log.d("RxJava", "Completed");
    }
});

Which gives this result..

D/RxJava: map i = 1
D/RxJava: map i = 2
D/RxJava: map i = 3
D/RxJava: map i = 4
D/RxJava: map i = 5
D/RxJava: next i = 1
D/RxJava: next i = 2
D/RxJava: next i = 3
D/RxJava: next i = 4
D/RxJava: next i = 5

What I was expecting though is something more like this

D/RxJava: map i = 1
D/RxJava: next i = 1
D/RxJava: map i = 2
D/RxJava: next i = 2
D/RxJava: map i = 3
D/RxJava: next i = 3
D/RxJava: map i = 4
D/RxJava: next i = 4
D/RxJava: map i = 5
D/RxJava: next i = 5

Could someone explain what I am doing wrong which is causing my order to be incorrect?

回答1:

Your Rx pipeline operates in two different threads. The first thread (Schedulers.newThread()), does the fromIterable and map. The second calls DisposableObserver.

Between the two threads there's (an invisible) buffer inside the observeOn() operator. So what happens is that the first thread gets some cpu time, quickly does all of its work and puts the results in the observeOn buffer. This probably happens during a single spin of that thread on the CPU.

Then, the second thread is launched, it takes the items from the buffer and continues the processing by calling your DisposableObserver.

I think that what you're looking for here is using a single thread. Remove the observeOn operator, see if it does what you want it to do.

Edit: Just tried removing the observeOn operator and running it on my machine. Here's the result:

RxJava map i = 1
RxJava next i = 1
RxJava map i = 2
RxJava next i = 2
RxJava map i = 3
RxJava next i = 3
RxJava map i = 4
RxJava next i = 4
RxJava map i = 5
RxJava next i = 5

For future reference, it might be useful for debugging purposes to print the name of the thread that reaches the various Rx operators. You can get the name using Thread.currentThread().getName().