I have a piece of program processing a lot of files, where for each files two things needs to be done: First, some piece of the file is read and processed, and then the resulting MyFileData
gets stored. The first part can be parallelized, the second can not.
Doing everything sequentially is very slow, as the CPU has to wait for the disk, then it works a bit, and then it issues another request, and waits again...
I did the following
class MyCallable implements Callable<MyFileData> {
MyCallable(File file) {
this.file = file;
}
public MyFileData call() {
return someSlowOperation(file);
}
private final File file;
}
for (File f : files) futures.add(executorService.submit(new MyCallable(f)));
for (Future<MyFileData> f : futures) sequentialOperation(f.get());
and it helped a lot. However, I'd like to improve two things:
The
sequentialOperation
gets executed in a fixed order instead of processing whatever result is available first. How can I change it?There are thousands of files to be processed and starting thousands of disk requests could lead to disk trashing. By using
Executors.newFixedThreadPool(10)
I've limited this number, however I'm looking for something better. Ideally it should be self-tuning, so that it works optimal on different computers (e.g., issues more requests when RAID and/or NCQ is available, etc.). I don't think it could be based on finding out the HW configuration, but measuring the processing speed and optimizing based on it should somehow be possible. Any idea?
That's exactly what a CompletionService does: it processes the tasks in parallel and returns them as they get completed, regardless of the submission order.
Simplified (not tested) example:
I'm not 100% sure on that one. I suppose it depends on how many disks you have, but I would have thought that the disk access part should not be split in too many threads (one thread per disk would probably be sensible): if many threads access one disk at the same time, it will spend more time seeking than reading.
Assumptions: each
someSlowOperation(file);
call is going to take a variable amount of time, and thus, you want to process theMyFileData
as soon as you recieve it, but not at the same time as anothersequentialOperation
.You can achieve this by setting up a producer/consumer queue.
Producers are the
callables
that you execute in your example, with the added bit where you add the result to a queue of work awaiting processing.Consumer is the
sequentialOperation()
call - it runs in its own thread, and there is only one. All this thread does is take the head of the queue, and process it, repeat till program ends.This way, you maximize the use of all resources on the machine.
a relevant post with some sample code: Producer/Consumer threads using a Queue
Edit:i figured you might want a quick sample since its pretty opaque to anyone who never done it before