As I understand RxJava2 values.take(1)
creates another Observable that contains only one element from the original Observable. Which MUST NOT throw an exception as it is filtered out by the effect of take(1)
as it's happened second.
as in the following code snippet
Observable<Integer> values = Observable.create(o -> {
o.onNext(1);
o.onError(new Exception("Oops"));
});
values.take(1)
.subscribe(
System.out::println,
e -> System.out.println("Error: " + e.getMessage()),
() -> System.out.println("Completed")
);
Output
1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at ch02.lambda$main$0(ch02.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.Observable.subscribe(Observable.java:10827)
at io.reactivex.Observable.subscribe(Observable.java:10787)
at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at ch02.lambda$main$0(ch02.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.Observable.subscribe(Observable.java:10827)
at io.reactivex.Observable.subscribe(Observable.java:10787)
at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
... 8 more
My questions :
- Am I understanding it correct ?
- What's really happening to cause the exception.
- How to solve this from the consumer ?