RxJava + Retrofit -> BaseObservable for API calls

2019-03-18 02:30发布

I am new to RxJava so please forgive me if this sounds too newbie :-).

As of now I have an abstract CallbackClass that implements the Retofit Callback. There I catch the Callback's "onResponse" and "onError" methods and handle various error types before finally forwarding to the custom implemented methods. I also use this centralized class to for request/response app logging and other stuff.

For example: for specific error codes from my sever I receive a new Auth token in the response body, refresh the token and then clone.enqueue the call. There are of course several other global behaviors to the responses from my server.

Current solution (Without Rx):

    public abstract void onResponse(Call<T> call, Response<T> response, boolean isSuccess);

    public abstract void onFailure(Call<T> call, Response<T> response, Throwable t, boolean isTimeout);

    @Override
    public void onResponse(Call<T> call, Response<T> response) {
        if (_isCanceled) return;

        if (response != null && !response.isSuccessful()) {
            if (response.code() == "SomeCode" && retryCount < RETRY_LIMIT) {
                TokenResponseModel newToken = null;
                try {
                    newToken = new Gson().fromJson(new String(response.errorBody().bytes(), "UTF-8"), TokenResponseModel.class);
                } catch (Exception e) {
                    e.printStackTrace();
                }

                    SomeClass.token = newToken.token;
                    retryCount++;
                    call.clone().enqueue(this);
                    return;
                }
            }
        } else {
            onResponse(call, response, true);
            removeFinishedRequest();
            return;
        }

        onFailure(call, response, null, false);
        removeFinishedRequest();
    }

    @Override
    public void onFailure(Call<T> call, Throwable t) {
        if (_isCanceled) return;

        if (t instanceof UnknownHostException)
            if (eventBus != null)
                eventBus.post(new NoConnectionErrorEvent());

        onFailure(call, null, t, false);
        removeFinishedRequest();
    }

My question is: Is there any way to have this sort of centralized response handling behavior before finally chaining (or retrying) back to the subscriber methods?

I found these 2 links which both have a nice starting point but not a concrete solution. Any help will be really appreciated.

Forcing request retry after custom API exceptions in RxJava

Retrofit 2 and RxJava error handling operators

3条回答
Bombasti
2楼-- · 2019-03-18 02:53

Two links you provided are a really good starting point, which I used to construct solution to react to accidental

  • network errors happen sometimes due to temporary lack of network connection, or switch to low throughtput network standard, like EDGE, which causes SocketTimeoutException
  • server errors -> happen sometimes due to server overload

I have overriden CallAdapter.Factory to handle errors and react appropriately to them.

  1. Import RetryWithDelayIf from the solution you found

  2. Override CallAdapter.Factory to handle errors:

    public class RxCallAdapterFactoryWithErrorHandling extends CallAdapter.Factory {
        private final RxJavaCallAdapterFactory original;
    
        public RxCallAdapterFactoryWithErrorHandling() {
            original = RxJavaCallAdapterFactory.create();
        }
    
        @Override
        public CallAdapter<?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
            return new RxCallAdapterWrapper(retrofit, original.get(returnType, annotations, retrofit));
        }
    
        public class RxCallAdapterWrapper implements CallAdapter<Observable<?>> {
            private final Retrofit retrofit;
            private final CallAdapter<?> wrapped;
    
            public RxCallAdapterWrapper(Retrofit retrofit, CallAdapter<?> wrapped) {
                this.retrofit = retrofit;
                this.wrapped = wrapped;
            }
    
            @Override
            public Type responseType() {
                return wrapped.responseType();
            }
    
            @SuppressWarnings("unchecked")
            @Override
            public <R> Observable<?> adapt(final Call<R> call) {
                return ((Observable) wrapped.adapt(call)).onErrorResumeNext(new Func1<Throwable, Observable>() {
                    @Override
                    public Observable call(Throwable throwable) {
                        Throwable returnThrowable = throwable;
                        if (throwable instanceof HttpException) {
                            HttpException httpException = (HttpException) throwable;
                            returnThrowable = httpException;
                            int responseCode = httpException.response().code();
                            if (NetworkUtils.isClientError(responseCode)) {
                                returnThrowable = new HttpClientException(throwable);
                            }
                            if (NetworkUtils.isServerError(responseCode)) {
                                returnThrowable = new HttpServerException(throwable);
                            }
                        }
    
                        if (throwable instanceof UnknownHostException) {
                            returnThrowable = throwable;
                        }
    
                        return Observable.error(returnThrowable);
                    }
                }).retryWhen(new RetryWithDelayIf(3, DateUtils.SECOND_IN_MILLIS, new Func1<Throwable, Boolean>() {
                    @Override
                    public Boolean call(Throwable throwable) {
                        return throwable instanceof HttpServerException
                                || throwable instanceof SocketTimeoutException
                                || throwable instanceof UnknownHostException;
                    }
                }));
            }
        }
    }
    

    HttpServerException is just a custom exception.

  3. Use it in Retrofit.Builder

    Retrofit retrofit = new Retrofit.Builder()
            .addCallAdapterFactory(new RxCallAdapterFactoryWithErrorHandling())
            .build();
    

Extra: If you wish to parse errors that come from API (error that don't invoke UnknownHostException, HttpException or MalformedJsonException or etc.) you need to override Factory and use custom one during building Retrofit instance. Parse the response and check if it contains errors. If yes, then throw error and error will be handled inside the method above.

查看更多
Melony?
3楼-- · 2019-03-18 02:58

See how you can do it. Here is api call and pass Request model and response model in this.

public interface RestService {
//SEARCH_USER
@POST(SEARCH_USER_API_LINK)
Observable<SearchUserResponse> getSearchUser(@Body SearchUserRequest getSearchUserRequest);
}

This is the retrofit call,I used retrofit2

public RestService getRestService() {

    Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(ApiConstants.BASE_URL)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
            .client(getOkHttpClient())
            .build();

    return retrofit.create(RestService.class);
}

//get OkHttp instance
@Singleton
@Provides
public OkHttpClient getOkHttpClient() {

    HttpLoggingInterceptor httpLoggingInterceptor = new HttpLoggingInterceptor();
    httpLoggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY);

    OkHttpClient.Builder builder = new OkHttpClient.Builder();
    builder.interceptors().add(httpLoggingInterceptor);
    builder.readTimeout(60, TimeUnit.SECONDS);
    builder.connectTimeout(60, TimeUnit.SECONDS);
    return builder.build();
}

This is the api call, call it in your activity.

@Inject
Scheduler mMainThread;
@Inject
Scheduler mNewThread;

 //getSearchUser api method
public void getSearchUser(String user_id, String username) {

    SearchUserRequest searchUserRequest = new SearchUserRequest(user_id, username);

    mObjectRestService.getSearchUser(searchUserRequest).
            subscribeOn(mNewThread).
            observeOn(mMainThread).
            subscribe(searchUserResponse -> {
                Timber.e("searchUserResponse :" + searchUserResponse.getResponse().getResult());
                if (isViewAttached()) {
                    getMvpView().hideProgress();
                    if (searchUserResponse.getResponse().getResult() == ApiConstants.STATUS_SUCCESS) {

                    } else {

                    }
                }
            }, throwable -> {
                if (isViewAttached()) {

                }
            });
}

Hope this will help you.

查看更多
beautiful°
4楼-- · 2019-03-18 03:11

have you consider using the rxjava adapter for retrofit? https://mvnrepository.com/artifact/com.squareup.retrofit2/adapter-rxjava/2.1.0 in your gradle file add

compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'

here's a interface for retrofit

public interface Service {
@GET("userauth/login?")
Observable<LoginResponse> getLogin(
        @Query("v") String version,
        @Query("username") String username,
        @Query("password") String password);
}

and here's my implementation

Service.getLogin(
            VERSION,
            "username",
            "password")
            .subscribe(new Subscriber<LoginResponse>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(LoginResponse loginResponse) {

                }
            });

please note I'm using the gson converter factory to parse my response so I get an pojo (Plain Ole Java Object) returned.

查看更多
登录 后发表回答