I have a flow of units of work, lets call them "Work Items" that are processed sequentially (for now). I'd like to speed up processing by doing the work multithreaded.
Constraint: Those work items come in a specific order, during processing the order is not relevant - but once processing is finished the order must be restored.
Something like this:
|.|
|.|
|4|
|3|
|2| <- incoming queue
|1|
/ | \
2 1 3 <- worker threads
\ | /
|3|
|2| <- outgoing queue
|1|
I would like to solve this problem in Java, preferably without Executor Services, Futures, etc., but with basic concurrency methods like wait(), notify(), etc.
Reason is: My Work Items are very small and fine grained, they finish processing in about 0.2 milliseconds each. So I fear using stuff from java.util.concurrent.* might introduce way to much overhead and slow my code down.
The examples I found so far all preserve the order during processing (which is irrelevant in my case) and didn't care about order after processing (which is crucial in my case).
Pump all your Futures through a
BlockingQueue
. Here's all the code you need:Then to use, create a SequentialProcessor (once):
and pump tasks to it:
I created the
callableFromTask()
method for illustration, but you can dispense with it if getting aResult
from aTask
is simple by using a lambda instead or method reference instead.For example, if
Task
had agetResult()
method, do this:or if you need an expression (lambda):
If you allow
BlockingQueue
, why would you ignore the rest of the concurrency utils in java? You could use e.g.Stream
(if you have java 1.8) for the above:Because you started from an ordered
Collection
(List
), and collect also to aList
, you will have results in the same order as the input.Just ID each of the objects for processing, create a proxy which would accept done work and allow to return it only when the ID pushed was sequential. A sample code below. Note how simple it is, utilizing an unsynchronized auto-sorting collection and just 2 simple methods as API.
This code could be easily improved to
Preprocess: add an order value to each item, prepare an array if it is not allocated.
Input: queue (concurrent sampling with order values 1,2,3,4 but doesnt matter which tread gets which sample)
Output: array (writing to indexed elements, using a synch point to wait for all threads in the end, doesn't need collision checks since it writes different positions for every thread)
Postprocess: convert array to a queue.
Needs n element-array for n-threads. Or some multiple of n to do postprocessing only once.
You could have 3 input and 3 output queues - one of each type for each worker thread.
Now when you want to insert something into the input queue you put it into only one of the 3 input queues. You change the input queues in a round robin fashion. The same applies to the output, when you want to take something from the output you choose the first of the output queues and once you get your element you switch to the next queue.
All the queues need to be blocking.
Reactive programming could help. During my brief experience with RxJava I found it to be intuitive and easy to work with than core language features like Future etc. Your mileage may vary. Here are some helpful starting points https://www.youtube.com/watch?v=_t06LRX0DV0
The attached example also shows how this could be done. In the example below we have Packet's which need to be processed. They are taken through a simple trasnformation and fnally merged into one list. The output appended to this message shows that the Packets are received and transformed at different points in time but in the end they are output in the order they have been received
Deconstruction
On one particular run Packets were received in the order 2,6,0,1,8,7,5,9,4,3 and processed in order 2,6,0,1,3,4,5,7,8,9 on different threads