Asynchronous RestAPIs with RxJava/Jersey2. Threadi

2019-03-22 08:32发布

问题:

We are in the process of prototyping a REST API using reactive programming. As shown in the diagram, we keep 3 layers same as we used in our previouse sync API designs ;

http://oi59.tinypic.com/339hhki.jpg

  1. API Layer implemented using Jersey2 which will process request/deserialize JSON and handover to Service Layer.
  2. Service Layer which implements the business-logic.Implemented using reactive programming (RxJava)
  3. Dao Layer which is used for persistence operations by Service Layer.Since we use CouchBase , this will use CouchBase RxClient.

To my understanding the flow is as follows :

a) HTTP request comes,Jersery will process the request/parse JSON/deserialize request model inside a RequestThread from the "Container Thread pool".

b) With Jersey2 Async support , RequestThread will be return back to Container Thread Pool, and the Service Layer will be executed in Schedulers.computation() scheduler.

@Path("/resource")
public class AsyncUserResource {
    @GET
    public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
 
       Observable<User> user = userService.getUser(...); //this is executed using Schedulers.computation() inside Service implementation


       user.subscribe(new Observer<User>() {

            @Override
            public void onCompleted() { 

            }

            @Override
            public void onError(Throwable e) {
                //handle error using ExceptionMappers

            }

            @Override
            public void onNext(User user) {
               asyncResponse.resume(user); 

            }});
    }        


}

c) Any IO operations inside DAOs will use Schedulers.io() to run these long-processing operations in a separate thread.

My questions are :

  1. When implementing DAOs/Services , should I hide the schedulars in use (Threading) inside the implementation.

eg Dao :

public interface UserDao {
  public Observable<User> getUser();
}

In the implementation, is it the good practise to specify the Schedular as below;

public Observable<User> getUser() {

        Observable<User> ret = Observable.create((subscriber)->{
            try {

                 //Do DB call
                 User u = null;
                 subscriber.onNext(u);
                 subscriber.onCompleted();

            }catch (Exception e) {
                subscriber.onError(e);  
            }

        });
        return ret.subscribeOn(Schedulers.io());
}

Or is it better to simply return the Observable ,and the upper-layer will use a particualr Schedular accordingly ?

  1. Since DAOs mostly involes io/network calls I assume Schedulars.io() should be used. How about for the business-logic in side the Service Layer ? Should them be executed inside Schedulers.computation() (Event Loop) ?

  2. There are two thread pools inside the JVM.One is "Container Thread Pool" and the other is "RxThread Pool" used by Schedulers.io() . How to configure pool settings/size of RxJava ?

回答1:

1) In RxJava itself, if an method requires a scheduler, we create two overloads: one without a Scheduler parameter and one with it. The former then delegates to the latter with a reasonable default scheduler. This way, API consumers may chose to accept the default or go with their own.

2) It depends on your computation. If the computation takes similar time as to wait for the IO, you could move the computation into the computation scheduler, thus freeing up the cached worker threads inside IO to do more blocking. Otherwise, you could just do the business logic on the same scheduler.

3) You can't configure the pool sizes in RxJava at the moment. Computation will always use Runtime.availableProcessors() and IO will be always act as an unbounded cached threadpool. If you can live with events thread-hopping (meaning: they are guaranteed to be serial but one event may execute on thread 1 and the subsequent on thread 2), you can use your own ExecutorServices via Schedulers.from().