We are in the process of prototyping a REST API using reactive programming. As shown in the diagram, we keep 3 layers same as we used in our previouse sync API designs ;
http://oi59.tinypic.com/339hhki.jpg
- API Layer implemented using Jersey2 which will process request/deserialize JSON and handover to Service Layer.
- Service Layer which implements the business-logic.Implemented using reactive programming (RxJava)
- Dao Layer which is used for persistence operations by Service Layer.Since we use CouchBase , this will use CouchBase RxClient.
To my understanding the flow is as follows :
a) HTTP request comes,Jersery will process the request/parse JSON/deserialize request model inside a RequestThread from the "Container Thread pool".
b) With Jersey2 Async support , RequestThread will be return back to Container Thread Pool, and the Service Layer will be executed in Schedulers.computation() scheduler.
@Path("/resource")
public class AsyncUserResource {
@GET
public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
Observable<User> user = userService.getUser(...); //this is executed using Schedulers.computation() inside Service implementation
user.subscribe(new Observer<User>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
//handle error using ExceptionMappers
}
@Override
public void onNext(User user) {
asyncResponse.resume(user);
}});
}
}
c) Any IO operations inside DAOs will use Schedulers.io() to run these long-processing operations in a separate thread.
My questions are :
- When implementing DAOs/Services , should I hide the schedulars in use (Threading) inside the implementation.
eg Dao :
public interface UserDao {
public Observable<User> getUser();
}
In the implementation, is it the good practise to specify the Schedular as below;
public Observable<User> getUser() {
Observable<User> ret = Observable.create((subscriber)->{
try {
//Do DB call
User u = null;
subscriber.onNext(u);
subscriber.onCompleted();
}catch (Exception e) {
subscriber.onError(e);
}
});
return ret.subscribeOn(Schedulers.io());
}
Or is it better to simply return the Observable ,and the upper-layer will use a particualr Schedular accordingly ?
Since DAOs mostly involes io/network calls I assume Schedulars.io() should be used. How about for the business-logic in side the Service Layer ? Should them be executed inside Schedulers.computation() (Event Loop) ?
There are two thread pools inside the JVM.One is "Container Thread Pool" and the other is "RxThread Pool" used by Schedulers.io() . How to configure pool settings/size of RxJava ?