Executing rx.Obseravables secuentially

2019-09-18 20:04发布

I'm developing an Android App using Fernando Ceja's clean architecture. One of my Interactors or Use Cases is in charge of getting the User's feed data. In order to get the data, first I have to retrieve the User's Teams from a database table and then I have to get the Feed list from the server-side.

This is how I get the Teams from the database layer:

    mTeamCache.getAllTeams().subscribe(new DefaultSubscriber<List<SimpleTeam>>() {
        @Override
        public void onNext(List<SimpleTeam> simpleTeams) {
            super.onNext(simpleTeams);
            mTeams = simpleTeams;
        }
    });

TeamCache is basically just another Interactor that takes care of getting all the teams that I have in the database.

Here's how I get the Feed data from the server-side:

    mFeedRepository.getFeed(0, 50).subscribe(new ServerSubscriber<List<ApiFeedResponse>>() {
        @Override
        protected void onServerSideError(Throwable errorResponse) {
            callback.onFeedFetchFailed(...);
        }

        @Override
        protected void onSuccess(List<ApiFeedResponse> responseBody) {
            //Do stuff with mTeams
            callback.onFeedFetched(...);
        }
    });

My GetFeedInteractor class has a method called execute, where I pass through the Callback that I'm later using in the UI to handle the response. The issue with all this is that currently I'm chaining the responses like this:

@Override
public void execute(final Callback callback, String userSipId) {
    mTeamCache.getAllTeams().subscribe(new DefaultSubscriber<List<SimpleTeam>>() {
        @Override
        public void onNext(List<SimpleTeam> simpleTeams) {
            super.onNext(simpleTeams);
            mTeams = simpleTeams;
            getFeedFromRepository(callback);
        }
    });
}

public void getFeedFromRepository(final Callback callback) {
    mFeedRepository.getFeedRx(0, 50).subscribe(new ServerSubscriber<List<ApiFeedResponse>>() {
        @Override
        protected void onServerSideError(Throwable errorResponse) {
            callback.onFeedFetchFailed("failed");
        }

        @Override
        protected void onSuccess(List<ApiFeedResponse> responseBody) {
            //Do stuff with mTeams
            List<BaseFeedItem> responseList = new ArrayList();

            for (ApiFeedResponse apiFeedResponse : responseBody) {
                responseList.add(FeedDataMapper.transform(apiFeedResponse));
            }

            callback.onFeedFetched(responseList);
        }
    });
}

As you can see, once that I get the Team collection from the Cache Interactor I call the method that gets the feed from the very same Subscriber. I don't like this. I want to be able to do something nicer, like using Observable.concat(getTeamsFromCache(), getFeedFromRepository()); chain a call to another rx.Observable inside a Subscriber is not something nice to do. I guess that my question is, how can I chain two rx.Observables that are using different Subscribers?

Update:

ServerSubscriber is a subscriber that I implemted to subscribe to Retrofit services. It simply checks the error codes and some stuff. Here is:

https://gist.github.com/4gus71n/65dc94de4ca01fb221a079b68c0570b5

Default subscriber is an empty default subscriber. Here is:

https://gist.github.com/4gus71n/df501928fc5d24c2c6ed7740a6520330

TeamCache#getAllTeams() returns rx.Observable> FeedRepository#getFeed(int page, int offset) returns rx.Observable>

Update 2:

This is how the Interactor to get the User's feed looks like now:

@Override
    public void execute(final Callback callback, int offset, int pageSize) {
        User user = mGetLoggedUser.get();
        String userSipid = mUserSipid.get();

        mFeedRepository.getFeed(offset, pageSize) //Get items from the server-side
                .onErrorResumeNext(mFeedCache.getFeed(userSipid)) //If something goes wrong take it from cache
                .mergeWith(mPendingPostCache.getAllPendingPostsAsFeedItems(user)) //Merge the response with the pending posts
                .subscribe(new DefaultSubscriber<List<BaseFeedItem>>() {
                    @Override
                    public void onNext(List<BaseFeedItem> baseFeedItems) {
                        callback.onFeedFetched(baseFeedItems);
                    }

                    @Override
                    public void onError(Throwable e) {
                        if (e instanceof ServerSideException) {
                            //Handle the http error
                        } else if (e instanceof DBException) {
                            //Handle the database cache error
                        } else {
                            //Handle generic error
                        }
                    }
                });
    }

1条回答
Emotional °昔
2楼-- · 2019-09-18 20:21

I think you're missing the point of RxJava and reactive approach, you should not have different subscribers with OO hierarchy, and callbacks.
You should construct separated Observables that should emit the specific data it's handle, without the Subscriber, then you can chain you're Observable as needed, and at the end, you have the subscriber that react to the final result expected from the chained Observable stream.

something like this (using lambdas to have more thin code):

TeamCache mTeamCache = new TeamCache();
FeedRepository mFeedRepository = new FeedRepository();

Observable.zip(teamsObservable, feedObservable, Pair::new)
    .subscribe(resultPair -> {
            //Do stuff with mTeams
            List<BaseFeedItem> responseList = new ArrayList(); 
            for (ApiFeedResponse apiFeedResponse : resultPair.second) {
                responseList.add(FeedDataMapper.transform(apiFeedResponse));
            }
        }, throwable -> {
             //handle errors
           }
     );

I've use zip and not concat as it's seems you have 2 independent calls here that you want to wait for both to finish ('zip' them together) and then act upon, but ofcourse, as you have separated Observables stream, you can chain them together differently according to your needs.

as for your ServerSubscriber with all the response validation logic, it should be rxify too, so you can compose it along your server Observable stream.

something like this (some logic emitted to simplify, and as I'm not familiar with it...)

Observable<List<SimpleTeam>> teamsObservable = mTeamCache.getAllTeams();
Observable<List<ApiFeedResponse>> feedObservable = mFeedRepository.getFeed(0, 50)
            .flatMap(apiFeedsResponse -> {
                if (apiFeedsResponse.code() != 200) {
                    if (apiFeedsResponse.code() == 304) {
                        List<ApiFeedResponse> body = apiFeedsResponse.body();
                        return Observable.just(body);
                        //onNotModified(o.body());
                    } else {
                        return Observable.error(new ServerSideErrorException(apiFeedsResponse));
                    }
                } else {
                    //onServerSideResponse(o.body());
                    return Observable.just(apiFeedsResponse.body());
                }
            });
查看更多
登录 后发表回答