How to wrap a cold observable (ReactiveX / RxJava)

2019-09-20 21:44发布

I'm trying to rely on Reactive Programming to share the result of an http call with many subscribers. At the same time I want to be able to perform the call again (refresh).

I start with a cold Observable that perform the http call and then immediately complete.

I want to wrap it to obtain an hot observable that work like this: every subscriber should always receive the last event (if any) when subscribing and every other event until unsubscribed. I should have a way (external to that observable) to trigger a refresh and thus a new event on all the subscribers.

More in detail:

I have a cold Observable for the http request build by retrofit2. For the sake of completeness this is my service interface

@GET("/path")
Observable<MyData> httpCall();

I ask retrofit for a service:

Retrofit retrofit = new Retrofit.Builder()
                    .baseUrl(REST_BASE_URL)
                    .addConverterFactory(GsonConverterFactory.create())
                    .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                    .build();
MyService service = retrofit.create(MyServiceInterface.class);

From the service I get the observable:

Observable<MyData> coldObservable = service.httpCall();

This is a cold observable, it will perform the http call every time subscribe() is called on it and then immediately complete.

I want to expose an hot observable, I've read I can do this:

Observable<MyData>hotObservable = coldObservable.publish()
    .autoConnect();

This way the http call is performed at the first subscribe() on it and if I subscribe multiple times all will "connect" to the same coldObservable.

After the call is completed if I call subscribe() again nothing will happen, not even a callback to completed.

Instead I want it to receive the last event.

If the user request it I want to force a refresh (repeat http call). All subscribers should receive the new result / error.

I imagine something like this:

Observable<MyData> theSmartObservable = helperClass.getObservable();

// at some point later
helperClass.triggerRefresh();

The refresh triggered should produce a new event in theSmartObservable.

How do I build such an observable?

I hope I explained myself, if not please tell in comments.

1条回答
趁早两清
2楼-- · 2019-09-20 22:38

Subclassing Observable is tricky. Using composition with a subject as a "bridge" should be easier. Something like

public class HelperClass {

    private Subscription bridgeSubscription;
    private ReplaySubject<MyData> bridgeSubject;
    private MyService service;
    private Subscriber<MyData> mSubscriber = new Subscriber<MyData>() {
                @Override
                public void onCompleted() {
                  // To DO
                }

                @Override
                public void onError(Throwable e) {
                  // TO DO
                }

                @Override
                public void onNext(MyData d) {
                    bridgeSubject.onNext(d);
                }
            };
    public HelperClass(MyService service) {
        this.service = service;
        bridgeSubject = ReplaySubject.create(1);
        bridgeSubscription = this.service.httpCall().publish().autoConnect().subscribe(mSubscriber);
    }

    public Observable<MyData> getObservable() {
        // Might not work too well to hide the observable identity  
        // return bridgeSubject.asObservable();
        return bridgeSubject;
    }

    public void triggerRefresh() {
        if (bridgeSubscription != null) {
            bridgeSubscription.unsubscribe();
            bridgeSubscription = null;
        }
        bridgeSubscription = service.httpCall().publish().autoConnect().subscribe(mSubscriber);
    }
}
查看更多
登录 后发表回答