Java executors: how to be notified, without blocki

2019-01-01 06:35发布

Say I have a queue full of tasks which I need to submit to an executor service. I want them processed one at a time. The simplest way I can think of is to:

  1. Take a task from the queue
  2. Submit it to the executor
  3. Call .get on the returned Future and block until a result is available
  4. Take another task from the queue...

However, I am trying to avoid blocking completely. If I have 10,000 such queues, which need their tasks processed one at a time, I'll run out of stack space because most of them will be holding on to blocked threads.

What I would like is to submit a task and provide a call-back which is called when the task is complete. I'll use that call-back notification as a flag to send the next task. (functionaljava and jetlang apparently use such non-blocking algorithms, but I can't understand their code)

How can I do that using JDK's java.util.concurrent, short of writing my own executor service?

(the queue which feeds me these tasks may itself block, but that is an issue to be tackled later)

11条回答
宁负流年不负卿
2楼-- · 2019-01-01 06:52

Use a CountDownLatch.

It's from java.util.concurrent and it's exactly the way to wait for several threads to complete execution before continuing.

In order to achieve the callback effect you're looking after, that does require a little additional extra work. Namely, handling this by yourself in a separate thread which uses the CountDownLatch and does wait on it, then goes on about notifying whatever it is you need to notify. There is no native support for callback, or anything similar to that effect.


EDIT: now that I further understand your question, I think you are reaching too far, unnecessarily. If you take a regular SingleThreadExecutor, give it all the tasks, and it will do the queueing natively.

查看更多
临风纵饮
3楼-- · 2019-01-01 06:56

Define a callback interface to receive whatever parameters you want to pass along in the completion notification. Then invoke it at the end of the task.

You could even write a general wrapper for Runnable tasks, and submit these to ExecutorService. Or, see below for a mechanism built into Java 8.

class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}

With CompletableFuture, Java 8 included a more elaborate means to compose pipelines where processes can be completed asynchronously and conditionally. Here's a contrived but complete example of notification.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}
查看更多
不再属于我。
4楼-- · 2019-01-01 06:58

Simple code to implement Callback mechanism using ExecutorService

import java.util.concurrent.*;
import java.util.*;

public class CallBackDemo{
    public CallBackDemo(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(5);

        try{
            for ( int i=0; i<5; i++){
                Callback callback = new Callback(i+1);
                MyCallable myCallable = new MyCallable((long)i+1,callback);
                Future<Long> future = service.submit(myCallable);
                //System.out.println("future status:"+future.get()+":"+future.isDone());
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        service.shutdown();
    }
    public static void main(String args[]){
        CallBackDemo demo = new CallBackDemo();
    }
}
class MyCallable implements Callable<Long>{
    Long id = 0L;
    Callback callback;
    public MyCallable(Long val,Callback obj){
        this.id = val;
        this.callback = obj;
    }
    public Long call(){
        //Add your business logic
        System.out.println("Callable:"+id+":"+Thread.currentThread().getName());
        callback.callbackMethod();
        return id;
    }
}
class Callback {
    private int i;
    public Callback(int i){
        this.i = i;
    }
    public void callbackMethod(){
        System.out.println("Call back:"+i);
        // Add your business logic
    }
}

output:

creating service
Callable:1:pool-1-thread-1
Call back:1
Callable:3:pool-1-thread-3
Callable:2:pool-1-thread-2
Call back:2
Callable:5:pool-1-thread-5
Call back:5
Call back:3
Callable:4:pool-1-thread-4
Call back:4

Key notes:

  1. If you want process tasks in sequence in FIFO order, replace newFixedThreadPool(5) with newFixedThreadPool(1)
  2. If you want to process next task after analysing the result from callback of previous task,just un-comment below line

    //System.out.println("future status:"+future.get()+":"+future.isDone());
    
  3. You can replace newFixedThreadPool() with one of

    Executors.newCachedThreadPool()
    Executors.newWorkStealingPool()
    ThreadPoolExecutor
    

    depending on your use case.

  4. If you want to handle callback method asynchronously

    a. Pass a shared ExecutorService or ThreadPoolExecutor to Callable task

    b. Convert your Callable method to Callable/Runnable task

    c. Push callback task to ExecutorService or ThreadPoolExecutor

查看更多
姐姐魅力值爆表
5楼-- · 2019-01-01 06:59

If you want to make sure that no tasks will run at the same time then use a SingleThreadedExecutor. The tasks will be processed in the order the are submitted. You don't even need to hold the tasks, just submit them to the exec.

查看更多
刘海飞了
6楼-- · 2019-01-01 07:04

In Java 8 you can use CompletableFuture. Here's an example I had in my code where I'm using it to fetch users from my user service, map them to my view objects and then update my view or show an error dialog (this is a GUI application):

    CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

It executes asynchronously. I'm using two private methods: mapUsersToUserViews and updateView.

查看更多
登录 后发表回答