Multithreaded execution where order of finished Wo

2020-05-19 04:37发布

I have a flow of units of work, lets call them "Work Items" that are processed sequentially (for now). I'd like to speed up processing by doing the work multithreaded.

Constraint: Those work items come in a specific order, during processing the order is not relevant - but once processing is finished the order must be restored.

Something like this:

   |.|
   |.|
   |4|
   |3|
   |2|    <- incoming queue
   |1|
  / | \
 2  1  3  <- worker threads
  \ | /
   |3|
   |2|    <- outgoing queue
   |1|

I would like to solve this problem in Java, preferably without Executor Services, Futures, etc., but with basic concurrency methods like wait(), notify(), etc.

Reason is: My Work Items are very small and fine grained, they finish processing in about 0.2 milliseconds each. So I fear using stuff from java.util.concurrent.* might introduce way to much overhead and slow my code down.

The examples I found so far all preserve the order during processing (which is irrelevant in my case) and didn't care about order after processing (which is crucial in my case).

9条回答
啃猪蹄的小仙女
2楼-- · 2020-05-19 05:17

This is how I solved your problem in a previous project (but with java.util.concurrent):

(1) WorkItem class does the actual work/processing:

public class WorkItem implements Callable<WorkItem> {
    Object content;
    public WorkItem(Object content) {
        super();
        this.content = content;
    }

    public WorkItem call() throws Exception {
        // getContent() + do your processing
        return this;
    }
}

(2) This class puts Work Items in a queue and initiates processing:

public class Producer {
    ...
    public Producer() {
        super();
        workerQueue = new ArrayBlockingQueue<Future<WorkItem>>(THREADS_TO_USE);
        completionService = new ExecutorCompletionService<WorkItem>(Executors.newFixedThreadPool(THREADS_TO_USE));
        workerThread = new Thread(new Worker(workerQueue));
        workerThread.start();
    }

    public void send(Object o) throws Exception {
        WorkItem workItem = new WorkItem(o);
        Future<WorkItem> future = completionService.submit(workItem);
        workerQueue.put(future);
    }
}

(3) Once processing is finished the Work Items are dequeued here:

public class Worker implements Runnable {
    private ArrayBlockingQueue<Future<WorkItem>> workerQueue = null;

    public Worker(ArrayBlockingQueue<Future<WorkItem>> workerQueue) {
        super();
        this.workerQueue = workerQueue;
    }

    public void run() {
        while (true) {
            Future<WorkItem> fwi = workerQueue.take(); // deqeueue it
            fwi.get(); // wait for it till it has finished processing
        }
    }
}

(4) This is how you would use the stuff in your code and submit new work:

public class MainApp {
    public static void main(String[] args) throws Exception {
        Producer p = new Producer();
        for (int i = 0; i < 10000; i++)
            p.send(i);
    }
}
查看更多
我命由我不由天
3楼-- · 2020-05-19 05:18

You could launch a DoTask thread for every WorkItem. This thread processes the work. When the work is done, you try to post the item, synchronized on the controlling object, in which you check if it's the right ID and wait if not.

The post implementation can be something like:

synchronized(controllingObject) {
try {
while(workItem.id != nextId) controllingObject.wait();
} catch (Exception e) {}
//Post the workItem
nextId++;
object.notifyAll();
}
查看更多
放我归山
4楼-- · 2020-05-19 05:20

I think that you need an extra queue to hold the incoming order. IncomingOrderQueue.

When you consume the objects you put them in some storage, for example Map and then from another thread which consumes from the IncomingOrderQueue you pick the ids(hashes) of the objects and then you collect them from this HashMap.

This solution can easily be implemented without execution service.

查看更多
登录 后发表回答