Flexible CountDownLatch?

2019-01-21 20:13发布

问题:

I have encountered a problem twice now whereby a producer thread produces N work items, submits them to an ExecutorService and then needs to wait until all N items have been processed.

Caveats

  • N is not known in advance. If it were I would simply create a CountDownLatch and then have producer thread await() until all work was complete.
  • Using a CompletionService is inappropriate because although my producer thread needs to block (i.e. by calling take()) there's no way of signalling that all work is complete, to cause the producer thread to stop waiting.

My current favoured solution is to use an integer counter, and to increment this whenever an item of work is submitted and to decrement it when a work item is processed. Following the subsmission of all N tasks my producer thread will need to wait on a lock, checking whether counter == 0 whenever it is notified. The consumer thread(s) will need to notify the producer if it has decremented the counter and the new value is 0.

Is there a better approach to this problem or is there a suitable construct in java.util.concurrent I should be using rather than "rolling my own"?

Thanks in advance.

回答1:

java.util.concurrent.Phaser looks like it would work well for you. It is planned to be release in Java 7 but the most stable version can be found at jsr166's interest group website.

The phaser is a glorified Cyclic Barrier. You can register N number of parties and when youre ready await their advance at the specific phase.

A quick example on how it would work:

final Phaser phaser = new Phaser();

public Runnable getRunnable(){
    return new Runnable(){
        public void run(){
            ..do stuff...
            phaser.arriveAndDeregister();
        }
    };
}
public void doWork(){
    phaser.register();//register self
    for(int i=0 ; i < N; i++){
        phaser.register(); // register this task prior to execution 
        executor.submit( getRunnable());
    }
    phaser.arriveAndAwaitAdvance();
}


回答2:

You could of course use a CountDownLatch protected by an AtomicReference so that your tasks get wrapped thus:

public class MyTask extends Runnable {
    private final Runnable r;
    public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; }

    public void run() {
        r.run();
        while (l.get() == null) Thread.sleep(1000L); //handle Interrupted
        l.get().countDown();
    }
}

Notice that the tasks run their work and then spin until the count-down is set (i.e. the total number of tasks is know). As soon as the count-down is set, they count it down and exit. These get submitted as follows:

AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>();
executor.submit(new MyTask(r, l));

After the point of creation/submission of your work, when you know how many tasks you have created:

latch.set(new CountDownLatch(nTasks));
latch.get().await();


回答3:

I've used an ExecutorCompletionService for something like this:

ExecutorCompletionService executor = ...;
int count = 0;
while (...) {
    executor.submit(new Processor());
    count++;
}

//Now, pull the futures out of the queue:
for (int i = 0; i < count; i++) {
    executor.take().get();
}

This involves keeping a queue of tasks that have been submitted, so if your list is arbitrarily long, your method might be better.

But make sure to use an AtomicInteger for the coordination, so that you will be able to increment it in one thread, and decrement it in the worker threads.



回答4:

I assume your producer does not need to know when the queue is empty but needs to know when the last task has been completed.

I would add a waitforWorkDone(producer) method to the consumer. The producer can add its N tasks and call the wait method. The wait method blocks the incoming thread iff the work queue is not empty and no tasks are executing at the moment.

The consumer threads notifyAll() on the waitfor lock iff its task has been finished, the queue is empty and no other task is being executed.



回答5:

What you described is quite similar to using a standard Semaphore but used 'backwards'.

  • Your semaphore starts with 0 permits
  • Each work unit releases one permit when it completes
  • You block by waiting to acquire N permits

Plus you have the flexibility to only acquire M < N permits which is useful if you want to check the intermediate state. For example I'm testing an asynchronous bounded message queue so I expect the queue to be full for some M < N so I can acquire M and check that the queue is indeed full then acquire the remaining N - M permits after consuming messages from the queue.



回答6:

Standalone Java 8+ method that does exactly this for a Stream using a single-pass Phaser. Iterator/Iterable variants are exactly the same.

public static <X> int submitAndWait(Stream<X> items, Consumer<X> handleItem, Executor executor) {
    Phaser phaser = new Phaser(1); // 1 = register ourselves
    items.forEach(item -> {
            phaser.register(); // new task
            executor.execute(() -> {
                handleItem.accept(item);
                phaser.arrive(); // completed task
            });
    });
    phaser.arriveAndAwaitAdvance(); // block until all tasks are complete
    return phaser.getRegisteredParties()-1; // number of items
}
...
int recognised = submitAndWait(facesInPicture, this::detectFace, executor)

FYI. This is fine for singular events but, if this is being called concurrently, a solution involving ForkJoinPool will stop the parent thread from blocking.