-->

RxJava 2: flatMapCompletable is not waiting for th

2019-08-16 23:05发布

问题:

The use case: There is a dataset of a POJO class. In this case it is an ArrayList. I have created a class which extends ArrayList with some helper methods which divides the list into pages. A page just divides the list into small pages for uploading to a REST API. I have created an Iterable from an Iterator, which checks whether a page is available for uploading and then fetches that page. The page needs to uploaded and on a success event of the upload, it will be removed and then again the next page needs to uploaded. This process must continue serially uploading every page from that list until it is empty.

The code: (The API call is simulated here)

import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class Sample {

    public static void main(String[] args) {
        Store store = new Store();

        for (int i = 0; i < 20; i++)
            store.add("Sample value");

        store.getPageStream()
                .flatMapCompletable(in -> getApi().doOnComplete(store::removePage))
                .subscribe(() -> System.out.println("Job done"), Throwable::printStackTrace);

    }

    private static Completable getApi() {
        return Completable.create(emitter -> {

            System.out.println("API Requesting");
            Thread.sleep(2000);
            System.out.println("API Done\n\n");
            emitter.onComplete();

        })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());
    }

}

class Store extends ArrayList<String> {

    private static final int PAGE_SIZE = 3;

    private final Object pageLock = new Object();
    private final Iterator<List<String>> iterator = new Iterator<List<String>>() {

        @Override
        public boolean hasNext() {
            return isAvailable();
        }

        @Override
        public List<String> next() {
            return getPage();
        }

    };

    private int lastPageSize = 0;

    Flowable<List<String>> getPageStream() {
        return Flowable.fromIterable(() -> iterator);
    }

    private List<String> getPage() {
        synchronized (pageLock) {
            List<String> temp = new ArrayList<>();

            lastPageSize = Math.min(size(), PAGE_SIZE);

            for (int i = 0; i < lastPageSize; i++)
                temp.add(get(i));
            return temp;
        }
    }

    void removePage() {
        synchronized (pageLock) {
            if (lastPageSize <= 0)
                return;

            System.out.println(toString());
            removeRange(0, lastPageSize);
            lastPageSize = 0;
            System.out.println(toString());
        }
    }

    private boolean isAvailable() {
        synchronized (pageLock) {
            return size() > 0;
        }
    }

}

Problem is: The process is not happening serially. The flatMapCompletable is not waiting for the API to finish before requesting for the next page. Moreover since it is continuously requesting pages without removing them, it never finishes.

Extra Info: If I remove the line .subscribeOn(Schedulers.io()) from the getApi() method, the calls happen serially and it works perfectly well, but it causes NetworkOnMainThreadException.

Thanks in advance if anyone can solve this problem.