In my Android App I have a presenter which handles user interactions, contains kind of request manager and if needed sends user input over request manager to request manager.
Request manager itself contains server API and handles server request using this RxJava.
I have a code, which sends a request to server everytime a user enters a message and show the response from server:
private Observable<List<Answer>> sendRequest(String request) {
MyRequest request = new MyRequest();
request.setInput(request);
return Observable.fromCallable(() -> serverApi.process(request))
.doOnNext(myResponse -> {
// store some data
})
.map(MyResponse::getAnswers)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
}
However now I need to have kind of queue. The user may send a new message before the server has responded. Each message from the queue should be processed sequentially. I.e. the second message will be sent after we've got a response to the first message and so on.
In case an error occurs no further requests should be handled.
I also need to display the answers within a RecyclerView.
I have no idea how to change the code above to achieve the handling described above
I see kind of problem. On one hand, this queue can be anytime updated by the user, on the other hand anytime server sent a response the message should be removed from the queue.
Maybe there is a rxjava operator or special way I just missed.
I saw a similar answer here, however, the "queue" there is constant. Making N sequential api calls using RxJava and Retrofit
I'll be very thankful for any solution or link
My solutions would be as follows (I did something similar in Swift before):
i suggest to create asynchronous observable methods , here a sample :
the first observable will send request when he get response will proceed the second one and you can chain , you can customize each method to handle errors or success, this sample like queue.
here the result for execution :
I don't fnd any elegant native-RxJava solution. So I will custom a
Subscriber
to do your work.For your 3 points:
For sequential execution, we create a single thread scheduler
Scheduler sequential = Schedulers.from(Executors.newFixedThreadPool(1));
For stop all requests when error occur, we should subscribe all request together instead of create a
Flowable
every time. So we define following functions (here I request isInteger
and responseString
):void sendRequest(Integer request)
Flowable<String> reciveResponse()
and define a field to make association of request and response flow:
FlowableProcessor<Integer> requestQueue = UnicastProcessor.create();
For re-run the not-sent request, we define the rerun function:
void rerun()
Then we can use it:
Now let us implement them.
When send request, we simply push it into
requestQueue
First, to do the request sequentialy, we should schedule work to
sequential
:Second, to stop request when error occur. It's a default behavior. If we do nothing, an error will broken the subscription and any futher items will not be emitted.
Third, to re-run the not-sent requests. First because that the native operator will cancel the stream, like
MapSubscriber
do (RxJava-2.1.0-FlowableMap#63):We should wrap the error. Here I use my
Try
class to wrap the possible exception, you can use any other implementation that can wrap the exception instead of throw it:And then it's the custom
OnErrorStopSubscriber implements Subscriber<Try<T>>, Subscription
.It request and emits items normally. When error occur(in fact is a failed
Try
emitted) it stopped there and won't request or emit even downstream request it. After callrerun
method, it will back to the running statu and emit normally. The class is about 80 lines. You can see the code on my github.Now we can test our code:
and output:
You can see it runs sequentialy. And stopped when error occur. After call
rerun
method, it continue handle the left not-sent request.For complete code, see my github.
For this kind of behaviour I'm using Flowable backpressure implementation. Create outer stream that is parent for your api request stream, flatMap the api request with maxConcurrency = 1 and implement some sort of buffer strategy, so your Flowable doesn't throw exception.
It will buffer user input up to given threshold, and then drop it(if you don't do this it will throw exception, but it is highly unlikely that user will exceed such buffer), it will execute sequentially 1 by 1 like a queue. Don't try to implement this behaviour yourself if there are operators for thing kind of behaviour in libary itself.
Oh I forgot to mention, your
sendRequest()
method must return Flowable or you can convert it to Flowable.Hope this helps!