How to better use ExecutorService in multithreadin

2019-04-21 07:25发布

问题:

I need to make a library in which I will have synchronous and asynchronous methods in it.

  • executeSynchronous() - waits until I have a result, returns the result.
  • executeAsynchronous() - returns a Future immediately which can be processed after other things are done, if needed.

Core Logic of my Library

The customer will use our library and they will call it by passing DataKey builder object. We will then construct a URL by using that DataKey object and make a HTTP client call to that URL by executing it and after we get the response back as a JSON String, we will send that JSON String back to our customer as it is by creating DataResponse object. Some customer will call executeSynchronous() and some might call executeAsynchronous() so that's why I need to provide two method separately in my library.

Interface:

public interface Client {

    // for synchronous
    public DataResponse executeSynchronous(DataKey key);

    // for asynchronous
    public Future<DataResponse> executeAsynchronous(DataKey key);
}

And then I have my DataClient which implements the above Client interface:

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // do I need to have all threads as non-daemon or I can have daemon thread for my use case?
    private ExecutorService executor = Executors.newFixedThreadPool(10);

    // for synchronous call
    @Override
    public DataResponse executeSynchronous(DataKey key) {
        DataResponse dataResponse = null;
        Future<DataResponse> future = null;

        try {
            future = executeAsynchronous(key);
            dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
            dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
            future.cancel(true); // terminating tasks that have timed out
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    //for asynchronous call
    @Override
    public Future<DataResponse> executeAsynchronous(DataKey key) {
        Future<DataResponse> future = null;

        try {
            Task task = new Task(key, restTemplate);
            future = executor.submit(task); 
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
        }

        return future;
    }
}

Simple class which will perform the actual task:

public class Task implements Callable<DataResponse> {

    private DataKey key;
    private RestTemplate restTemplate;

    public Task(DataKey key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            response = restTemplate.getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) { // should I catch RuntimeException or just Exception here?
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using key object
    private String createURL() {
        String url = somecode;
        return url;
    }
}

I have few questions on my above solution -

  • Should I use daemon or non daemon threads for my above use case?
  • Also, I am terminating the tasks that have timed out so that it doesn't occupy one of my limited 10 threads for a long time. Does that look right the way I am doing it?
  • In my call() method, I am catching Exception. Should I catch RuntimeException there? What is the difference if I catch Exception or RuntimeException?

When I started working on this solution, I was not terminating the tasks that have timed out. I was reporting the timeout to the client, but the task continues to run in the thread pool (potentially occupying one of my limited 10 threads for a long time). So I did some research online and I found that I can cancel my tasks those have timed out by using cancel on future as shown below -

future.cancel(true);

But I wanted to make sure, does it look right the way I am doing in my executeSynchronous method to cancel the tasks that have got timedout?

Since I am calling cancel() on the Future which will stop it from running if tasks is still in the queue so I am not sure what I am doing is right or not? What is the right approach to do this?

If there is any better way, then can anyone provide an example for that?

Should we always be terminating the tasks that have got timed out? If we don't do that then what might be the impact I will have?

回答1:

Should I use daemon or non daemon threads for my above use case?

It depends. But in this case I would prefer daemon threads because it is convenient to use clients which allows the process to exit.

Does that look right the way I am doing it?

No, it doesn't. Interrupting of an IO task is pretty hard. Try to set timeout in RestTemplate too. Canceling of the future in this case seems meaningless.

What is the difference if I catch Exception or RuntimeException?

If you don't have checked exceptions in try blocks there is no difference :) Just because only RuntimeExceptions possible in that case.

And one more important note: implementing sync call as async + waiting is bad idea. It is meaningless and consumes one thread from the thread pool per a call. Just create instance of the Task and call it in current thread!



回答2:

Should I use daemon or non daemon threads for my above use case?

It depends on whether or not you want these threads to stop the program from exiting. The JVM will exit when the last non-daemon thread finishes.

If these tasks can be killed at any time if the JVM exists then they should be daemon. If you want the JVM to wait for them then make them non-daemon.

See: java daemon thread and non-daemon thread

Also, I am terminating the tasks that have timed out so that it doesn't occupy one of my limited 10 threads for a long time. Does that look right the way I am doing it?

Yes and no. You are properly calling cancel() on the Future which will stop it from running if it is still in the queue. However, if the thread is already running the task then the cancel will just interrupt the thread. Chances are that the restTemplate calls are not interruptible so the interrupt will be ignored. Only certain methods (like Thread.sleep(...) are interruptible and throw InterruptException. So calling future.cancel(true) won't stop the operation and terminate the thread.

See: Thread not interrupting

One thing you could do is to put a cancel() method on your Task object which forcefully closes the restTemplate. You will need to experiment with this. Another idea is to set some sort of timeout on the restTemplate connection or IO so it doesn't wait forever.

If you are working with the Spring RestTemplate then there is not a direct close but you can close the underlying connection I believe which may be via the SimpleClientHttpRequestFactory so you will need to call disconnect() on the underlying HttpURLConnection.

In my call() method, I am catching Exception. Should I catch RuntimeException there?

RuntimeException extends Exception so you will already catch them.

What is the difference if I catch Exception or RuntimeException?

Catching Exception catches both the checked (non-runtime) exceptions and the runtime exceptions. Catching just the RuntimeException will mean that any defined exceptions will not be caught and will be thrown by the method.

RuntimeExceptions are special exceptions that do not need to be checked by the code. For example, any code can throw an IllegalArgumentException without defining the method as throws IllegalArgumentException. With checked exceptions, it is a compiler error if a checked exception is not caught or thrown by the caller method but this is not true with RuntimeExceptions.

Here's a good answer on the topic:

  • Java: checked vs unchecked exception explanation