Subscribewith Vs subscribe in RxJava2(Android)?

2019-03-08 18:59发布

问题:

When to call the subscribeWith method rather than plain subscribe? And what is the use case?

compositeDisposable.add(get()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.io())
    .subscribe(this::handleResponse, this::handleError));

VS

   compositeDisposable.add(get()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
              //  .subscribe(this::handleResponse, this::handleError);
                .subscribeWith(new DisposableObserver<News>() {
                    @Override public void onNext(News value) {
                        handleResponse(value);
                    }

                    @Override public void onError(Throwable e) {
                        handleError(e);
                    }

                    @Override public void onComplete() {
                       // dispose here ? why? when the whole thing will get disposed later
                       //via  compositeDisposable.dispose();  in onDestroy();
                    }
                }));

Thank you


Added Later

According to the documentation, both return disposable SingleObserver instances:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <E extends SingleObserver<? super T>> E subscribeWith(E observer) {
    subscribe(observer);
    return observer;
}

@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(final Consumer<? super T> onSuccess, final Consumer<? super Throwable> onError) {
    ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ConsumerSingleObserver<T> s = new ConsumerSingleObserver<T>(onSuccess, onError);
    subscribe(s);
    return s;
}

Where ConsumerSingleObserver class implements SingleObserver and Disposable.

回答1:

Observable#subscribe explanation:

In your first code snippet:

.subscribe(this::handleResponse, this::handleError));

You are actually using one of the several overloaded Observable#subscribe methods:

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)

There is another one that also takes in an Action to perform onComplete:

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete) {

And another option allows you to simply pass in an Observer (NOTE: void method) (Edit 2 - this method is defined in ObservableSource, which is the interface that Observable extends.)

public final void subscribe(Observer<? super T> observer)

In the second code snippet in your question, you used the subscribeWith method which simply returns the Observer you passed in (for convenience/caching etc):

public final <E extends Observer<? super T>> E subscribeWith(E observer)

Observer#onComplete explanation:

Observer#onComplete gets called after the Observable has emitted all the items in the stream. From the java doc:

/**
 * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
 * <p>
 * The {@link Observable} will not call this method if it calls {@link #onError}.
 */
void onComplete();

So for example, if the get() in your code snippets returned an Observable that emitted multiple News objects, each one will be handled in the Observer#onNext. Here you can process each item.

After they have all been processed (and assuming no error occured), then the onComplete will get called. Here you can perform any extra actions that you need to do (for eg. update UI) knowing that you've processed all the News objects.

This is not to be confused with Disposable#dispose which gets invoked when the observable stream ends (complete/error), or manually by you to terminate the observation (this is where the CompositeDisposable comes in as it helps you dispose of all your Disposables that it contains at once).

If in your scenario the get() will return an Observable that only emits a single item, then instead of using an Observable, consider using an io.reactivex.Single where you only process the one item (in onSuccess), and won't need to specify an Action for onComplete :)

Edit: response to your comment:

However I still do not get use of subscribeWith, you said it passes the observer for caching etc , where does it pass to? on complete? and from what I understood subscribeWith is not actually consuming the observable( or Single) right?

To further clarify the subscribeWith explanation, what I meant was that it will consume the Observer object that you passed into the subscribeWith (exactly like the subscribe method) however it will additionally return that same Observer right back to you. At time of writing, the implementation of subscribeWith is:

public final <E extends Observer<? super T>> E subscribeWith(E observer) {
    subscribe(observer);
    return observer;
}

Therefore, subscribeWith can be used interchangeably with subscribe.

Can you give a use case of subscribeWith with example? I guess that will answer the question completely

The subscribeWith javadoc gives the following usage example:

Observable<Integer> source = Observable.range(1, 10);
CompositeDisposable composite = new CompositeDisposable();

ResourceObserver<Integer> rs = new ResourceObserver<>() {
     // ...
};

composite.add(source.subscribeWith(rs));

See here the usage of subscribeWith will return that same ResourceObserver object that was instantiated. This gives the convenience of performing the subscription & adding the ResourceObserver to the CompositeDisposable in one line (note that ResourceObservable implements Disposable.)

Edit 2 Replying to second comment.

source.subscribeWith(rs); source.subscribe(rs); both return SingleObserver instance,

ObservableSource#subscribe(Observer <? super T> observer) does NOT return an Observer. It is a void method (See NOTE under the Observable#subscribe explanation above.) Whereas the Observable#subscribeWith DOES return the Observer. If we were to rewrite the example usage code using ObservableSource#subscribe instead, we'd have to do it in two lines like so:

source.subscribe(rs); //ObservableSource#subscribe is a void method so nothing will be returned
composite.add(rs);

Whereas the Observable#subscribeWith method made it convenient for us to do the above in just one line composite.add(source.subscribeWith(rs));

It can get confusing with all the overloaded subscribe methods that look somewhat similar, however there are differences (some of which are subtle). Looking at the code and documentation helps to provide the distinction between them.


Edit 3 Another sample use case for subscribeWith

The subscribeWith method is useful for when you have a specific implementation of an Observer that you may want to reuse. For example, in the sample code above, it provided a specific implementation of ResourceObserver in the subscription, thereby inheriting it's functionality while still allowing you to handle onNext onError and onComplete.

Another example use: for to the sample code in your question, what if you wanted to perform the same subscription for the get() response in multiple places?

Instead of copying the Consumer implementations for onNext and onError across different classes, what you can do instead is define a new class for eg.

//sample code..
public class GetNewsObserver extends DisposableObserver<News> {
    //implement your onNext, onError, onComplete.
    ....
}

Now, whenever you do that get() request, you can simply subscribe by doing:

compositeDisposable.add(get()
    ...
    .subscribeWith(new GetNewsObserver()));

See the code is simple now, you maintain separation of responsibility for handling the response, and can now reuse that GetNewsObserver wherever you want.