Java executors: how to be notified, without blocki

2019-01-01 06:35发布

Say I have a queue full of tasks which I need to submit to an executor service. I want them processed one at a time. The simplest way I can think of is to:

  1. Take a task from the queue
  2. Submit it to the executor
  3. Call .get on the returned Future and block until a result is available
  4. Take another task from the queue...

However, I am trying to avoid blocking completely. If I have 10,000 such queues, which need their tasks processed one at a time, I'll run out of stack space because most of them will be holding on to blocked threads.

What I would like is to submit a task and provide a call-back which is called when the task is complete. I'll use that call-back notification as a flag to send the next task. (functionaljava and jetlang apparently use such non-blocking algorithms, but I can't understand their code)

How can I do that using JDK's java.util.concurrent, short of writing my own executor service?

(the queue which feeds me these tasks may itself block, but that is an issue to be tackled later)

11条回答
几人难应
2楼-- · 2019-01-01 06:40

Just to add to Matt's answer, which helped, here is a more fleshed-out example to show the use of a callback.

private static Primes primes = new Primes();

public static void main(String[] args) throws InterruptedException {
    getPrimeAsync((p) ->
        System.out.println("onPrimeListener; p=" + p));

    System.out.println("Adios mi amigito");
}
public interface OnPrimeListener {
    void onPrime(int prime);
}
public static void getPrimeAsync(OnPrimeListener listener) {
    CompletableFuture.supplyAsync(primes::getNextPrime)
        .thenApply((prime) -> {
            System.out.println("getPrimeAsync(); prime=" + prime);
            if (listener != null) {
                listener.onPrime(prime);
            }
            return prime;
        });
}

The output is:

    getPrimeAsync(); prime=241
    onPrimeListener; p=241
    Adios mi amigito
查看更多
大哥的爱人
3楼-- · 2019-01-01 06:41

Use Guava's listenable future API and add a callback. Cf. from the website :

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
  public Explosion call() {
    return pushBigRedButton();
  }
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
  // we want this handler to run immediately after we push the big red button!
  public void onSuccess(Explosion explosion) {
    walkAwayFrom(explosion);
  }
  public void onFailure(Throwable thrown) {
    battleArchNemesis(); // escaped the explosion!
  }
});
查看更多
人间绝色
4楼-- · 2019-01-01 06:43

You could extend FutureTask class, and override the done() method, then add the FutureTask object to the ExecutorService, so the done() method will invoke when the FutureTask completed immediately.

查看更多
孤独寂梦人
5楼-- · 2019-01-01 06:43

You may use a implementation of Callable such that

public class MyAsyncCallable<V> implements Callable<V> {

    CallbackInterface ci;

    public MyAsyncCallable(CallbackInterface ci) {
        this.ci = ci;
    }

    public V call() throws Exception {

        System.out.println("Call of MyCallable invoked");
        System.out.println("Result = " + this.ci.doSomething(10, 20));
        return (V) "Good job";
    }
}

where CallbackInterface is something very basic like

public interface CallbackInterface {
    public int doSomething(int a, int b);
}

and now the main class will look like this

ExecutorService ex = Executors.newFixedThreadPool(2);

MyAsyncCallable<String> mac = new MyAsyncCallable<String>((a, b) -> a + b);
ex.submit(mac);
查看更多
看风景的人
6楼-- · 2019-01-01 06:47

ThreadPoolExecutor also has beforeExecute and afterExecute hook methods that you can override and make use of. Here is the description from ThreadPoolExecutor's Javadocs.

Hook methods

This class provides protected overridable beforeExecute(java.lang.Thread, java.lang.Runnable) and afterExecute(java.lang.Runnable, java.lang.Throwable) methods that are called before and after execution of each task. These can be used to manipulate the execution environment; for example, reinitializing ThreadLocals, gathering statistics, or adding log entries. Additionally, method terminated() can be overridden to perform any special processing that needs to be done once the Executor has fully terminated. If hook or callback methods throw exceptions, internal worker threads may in turn fail and abruptly terminate.

查看更多
ら面具成の殇う
7楼-- · 2019-01-01 06:51

This is an extension to Pache's answer using Guava's ListenableFuture.

In particular, Futures.transform() returns ListenableFuture so can be used to chain async calls. Futures.addCallback() returns void, so cannot be used for chaining, but is good for handling success/failure on an async completion.

// ListenableFuture1: Open Database
ListenableFuture<Database> database = service.submit(() -> openDatabase());

// ListenableFuture2: Query Database for Cursor rows
ListenableFuture<Cursor> cursor =
    Futures.transform(database, database -> database.query(table, ...));

// ListenableFuture3: Convert Cursor rows to List<Foo>
ListenableFuture<List<Foo>> fooList =
    Futures.transform(cursor, cursor -> cursorToFooList(cursor));

// Final Callback: Handle the success/errors when final future completes
Futures.addCallback(fooList, new FutureCallback<List<Foo>>() {
  public void onSuccess(List<Foo> foos) {
    doSomethingWith(foos);
  }
  public void onFailure(Throwable thrown) {
    log.error(thrown);
  }
});

NOTE: In addition to chaining async tasks, Futures.transform() also allows you to schedule each task on a separate executor (Not shown in this example).

查看更多
登录 后发表回答