I have a piece of program processing a lot of files, where for each files two things needs to be done: First, some piece of the file is read and processed, and then the resulting MyFileData
gets stored. The first part can be parallelized, the second can not.
Doing everything sequentially is very slow, as the CPU has to wait for the disk, then it works a bit, and then it issues another request, and waits again...
I did the following
class MyCallable implements Callable<MyFileData> {
MyCallable(File file) {
this.file = file;
}
public MyFileData call() {
return someSlowOperation(file);
}
private final File file;
}
for (File f : files) futures.add(executorService.submit(new MyCallable(f)));
for (Future<MyFileData> f : futures) sequentialOperation(f.get());
and it helped a lot. However, I'd like to improve two things:
The sequentialOperation
gets executed in a fixed order instead of processing whatever result is available first. How can I change it?
There are thousands of files to be processed and starting thousands of disk requests could lead to disk trashing. By using Executors.newFixedThreadPool(10)
I've limited this number, however I'm looking for something better. Ideally it should be self-tuning, so that it works optimal on different computers (e.g., issues more requests when RAID and/or NCQ is available, etc.). I don't think it could be based on finding out the HW configuration, but measuring the processing speed and optimizing based on it should somehow be possible. Any idea?
The sequentialOperation gets executed in a fixed order instead of processing whatever result is available first. How can I change it?
That's exactly what a CompletionService does: it processes the tasks in parallel and returns them as they get completed, regardless of the submission order.
Simplified (not tested) example:
int NUM_THREADS = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
CompletionService<MyFileData> completionService = new ExecutorCompletionService<MyFileData>(executor);
for (File f : files) futures.add(completionService.submit(new MyCallable(f)));
for(int i = 0; i < futures.size(); i++) {
Future<MyFileData> next = completionService.take();
sequentialOperation(next.get());
}
There are thousands of files to be processed and starting thousands of disk requests could lead to disk trashing. By using Executors.newFixedThreadPool(10) I've limited this number, however I'm looking for something better.
I'm not 100% sure on that one. I suppose it depends on how many disks you have, but I would have thought that the disk access part should not be split in too many threads (one thread per disk would probably be sensible): if many threads access one disk at the same time, it will spend more time seeking than reading.
The sequentialOperation gets executed in a fixed order instead of processing whatever result is available first. How can I change it?
Assumptions: each someSlowOperation(file);
call is going to take a variable amount of time, and thus, you want to process the MyFileData
as soon as you recieve it, but not at the same time as another sequentialOperation
.
You can achieve this by setting up a producer/consumer queue.
Producers are the callables
that you execute in your example, with the added bit where you add the result to a queue of work awaiting processing.
Consumer is the sequentialOperation()
call - it runs in its own thread, and there is only one. All this thread does is take the head of the queue, and process it, repeat till program ends.
This way, you maximize the use of all resources on the machine.
a relevant post with some sample code: Producer/Consumer threads using a Queue
Edit:i figured you might want a quick sample since its pretty opaque to anyone who never done it before
public class Main {
private final ExecutorService producerExecutor = Executors.newFixedThreadPool(10);
private final ExecutorService consumerExecutor = Executors.newFixedThreadPool(1);
private final LinkedBlockingQueue<MyData> queue = new LinkedBlockingQueue();//or some other impl
abstract class Producer implements Runnable{
private final File file;
Producer(File file) {
this.file = file;
}
public void run() {
MyData result = someLongAssOperation(file);
queue.offer(result);
}
public abstract void someLongAssOperation(File file);
}
abstract class Consumer implements Runnable {
public void run() {
while (true) {
sequentialOperation(queue.take());
}
}
public abstract void sequentialOperation(MyData data);
}
private void start() {
consumerExecutor.submit(new Consumer(){
//implement sequentialOperation here
});
for (File f : files) {
producerExecutor.submit(new Producer(file) {
//implement the someLongAssOperation()
});
}
}
public static void main(String[] args) {
new Main().start();
}
}