I am trying to use a library that uses RxJava 1.1.5 with Spring WebFlux (i.e. Reactor Core 3.1.0.M3) but I am having trouble adapting Observable
to Flux
.
I thought this would be relatively straightforward, but my adapter isn't working:
import reactor.core.publisher.Flux;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
public static <T> Flux<T> toFlux(Observable<T> observable) {
return Flux.create(emitter -> {
final Subscription subscription = observable.subscribe(new Subscriber<T>() {
@Override
public void onNext(T value) {
emitter.next(value);
}
@Override
public void onCompleted() {
emitter.complete();
}
@Override
public void onError(Throwable throwable) {
emitter.error(throwable);
}
});
emitter.onDispose(subscription::unsubscribe);
});
}
I have verified that onNext
and onCompleted
are both getting called in the correct order but my Flux
is always empty. Does anyone see what I am doing wrong?
On a related note, why isn't there an adapter for RxJava 1 in reactor-addons?
Use the RxJavaReactiveStreams adapter to turn your
Observable
into aPublisher
, then haveFlux.fromPublisher()
consume it.They don't want to support or encourage using that old technology and I completely agree.