I have encountered a problem twice now whereby a producer thread produces N work items, submits them to an ExecutorService
and then needs to wait until all N items have been processed.
Caveats
- N is not known in advance. If it were I would simply create a
CountDownLatch
and then have producer threadawait()
until all work was complete. - Using a
CompletionService
is inappropriate because although my producer thread needs to block (i.e. by callingtake()
) there's no way of signalling that all work is complete, to cause the producer thread to stop waiting.
My current favoured solution is to use an integer counter, and to increment this whenever an item of work is submitted and to decrement it when a work item is processed. Following the subsmission of all N tasks my producer thread will need to wait on a lock, checking whether counter == 0
whenever it is notified. The consumer thread(s) will need to notify the producer if it has decremented the counter and the new value is 0.
Is there a better approach to this problem or is there a suitable construct in java.util.concurrent
I should be using rather than "rolling my own"?
Thanks in advance.
I assume your producer does not need to know when the queue is empty but needs to know when the last task has been completed.
I would add a
waitforWorkDone(producer)
method to the consumer. The producer can add its N tasks and call the wait method. The wait method blocks the incoming thread iff the work queue is not empty and no tasks are executing at the moment.The consumer threads
notifyAll()
on the waitfor lock iff its task has been finished, the queue is empty and no other task is being executed.I've used an ExecutorCompletionService for something like this:
This involves keeping a queue of tasks that have been submitted, so if your list is arbitrarily long, your method might be better.
But make sure to use an
AtomicInteger
for the coordination, so that you will be able to increment it in one thread, and decrement it in the worker threads.What you described is quite similar to using a standard Semaphore but used 'backwards'.
Plus you have the flexibility to only acquire M < N permits which is useful if you want to check the intermediate state. For example I'm testing an asynchronous bounded message queue so I expect the queue to be full for some M < N so I can acquire M and check that the queue is indeed full then acquire the remaining N - M permits after consuming messages from the queue.
java.util.concurrent.Phaser
looks like it would work well for you. It is planned to be release in Java 7 but the most stable version can be found at jsr166's interest group website.The phaser is a glorified Cyclic Barrier. You can register N number of parties and when youre ready await their advance at the specific phase.
A quick example on how it would work:
Standalone Java 8+ method that does exactly this for a
Stream
using a single-passPhaser
.Iterator
/Iterable
variants are exactly the same.FYI. This is fine for singular events but, if this is being called concurrently, a solution involving
ForkJoinPool
will stop the parent thread from blocking.You could of course use a
CountDownLatch
protected by anAtomicReference
so that your tasks get wrapped thus:Notice that the tasks run their work and then spin until the count-down is set (i.e. the total number of tasks is know). As soon as the count-down is set, they count it down and exit. These get submitted as follows:
After the point of creation/submission of your work, when you know how many tasks you have created: