delay and interval operators do not work properly

2019-03-05 04:34发布

问题:

As shown below, I am creating Observables. I would like to wait specific amount of time in seconds as shown in the code. thereforeI used either delay or interval operator. I expected the code to wait i.e 5 seconds then System.out.println from the observer to be printed.

but what happens is, doOnNext is executed and the code never goes further..i mean the execution stops at doOnNext even after the 5 seconds elapsed.

code:

public static void main(String[] args) {
    Observable<List<Person>> observables = Observable.create(e-> {
        for(List<Person> p : Main.getPersons()) {
            e.onNext(p);
        }
        e.onComplete();
    });
     observables
    //.subscribeOn(Schedulers.newThread())//newThread
    .flatMap(p->Main.toObservable(p.get(0).getName()))
    .doOnNext(p-> System.out.println(p.length()) )
    .map(p->p+"..STRING")
    //.delay(5, TimeUnit.SECONDS)
    //.interval(0, 5, TimeUnit.SECONDS)
    .observeOn(Schedulers.io())
    .subscribe(new Observer() {
        @Override
        public void onComplete() {
            // TODO Auto-generated method stub
            System.out.println("onCompleted");
        }

        @Override
        public void onError(Throwable arg0) {
            // TODO Auto-generated method stub

        }

        @Override
        public void onNext(Object arg0) {
            // TODO Auto-generated method stub
            System.out.println("onNextFromObserver: " + arg0);
        }

        @Override
        public void onSubscribe(Disposable arg0) {
            // TODO Auto-generated method stub
        }
    });
}

private static <T> Observable<T> toObservable(T s) {
    return Observable.just(s);
}
private static List<List<Person>> getPersons() {
    return Arrays.asList(
            Arrays.asList(new Person("Sanna1", 59, "EGY"), new Person("Sanna2", 59, "EGY"), new Person("Sanna3", 59, "EGY")),
            Arrays.asList(new Person("Mohamed1", 59, "EGY"), new Person("Mohamed2", 59, "EGY")),
            Arrays.asList(new Person("Ahmed1", 44, "QTR"), new Person("Ahmed2", 44, "QTR"), new Person("Ahmed3", 44, "QTR")),
                    Arrays.asList(new Person("Fatma", 29, "KSA")),
                    Arrays.asList(new Person("Lobna", 24, "EGY"))
                    );
}
}

回答1:

You have to wait in the main method. Put Thread.sleep(10000) at the very end of the main() method so the Observable has chance to run. RxJava threads are daemon threads that stop when the application thread falls out of the main() method.

public static void main(String[] args) {

    Observable.just("Hello World!", "Keep printing values!")
    .zipWith(Observable.interval(0, 5, TimeUnit.SECONDS), (a, b) -> a)
    .subscribe(v -> 
        System.out.println(Thread.currentThread() + ": " + v)
    );

    Thread.sleep(10000);  // <-----------------------------------

}