I have a collection which has elements to be processed and at max only four elements can be processed together. At run time all process started together and all goes in waiting state. Only four elements are processed at one time.
Problem is the processing elements are chosen randomly as all threads are waiting for resources to get free. Means first element can be the last one from collection.
But, I need to process elements in order they are in the collection.
Please tell me how can I achieve this?
I am using TPL and C# 4.0
With parallelism there is always a problem of defining what "in order" means. Let's say you have a collection of 100 items. Processing them "in order 4 at a time" (as you requested) could mean:
Loose ordering: use 4 threads and issue tasks in the order of the original collection.
In this case you can use:
ParallelOptions po = new ParallelOptions() { MaxDegreeOfParallelism = 4 };
Parallel.ForEach(list.AsParallel().AsOrdered(), po,
(item) =>
{
// code
});
In case of unbalanced tasks this will quickly lose the original ordering, as some threads may lag behind on heavy tasks, but the tasks will be allocated in order.
Strict ordering: process them in order in groups of 4 like below:
0 1 2 3
4 tasks
_____________________________
barrier
4 5 6 7
4 tasks
_____________________________
barrier
etc.
In this case you can employ a barrier:
Barrier b = new Barrier(4);
ParallelOptions po = new ParallelOptions() { MaxDegreeOfParallelism = 4 };
Parallel.ForEach(list.AsParallel().AsOrdered(), po,
(item) =>
{
// code
b.SignalAndWait();
});
Although you have to make sure that the number of tasks is a multiple of 4, otherwise the barrier won't be signaled at the last iterations.
Process 4 items in a single task: you can create a task object that encapsulates 4 items of the original list and then do a simple Parallel.ForEach
like in the first case (i.e. each thread will process 4 items sequentially as a part of a single task). This would issue tasks in groups of 4 in order, but again may cause some threads to lag behind if a task takes too long.
It's unclear to me what exactly are you doing where “elements are chosen randomly”. But if you use Paralle.ForEach()
, then it tries to be efficient and so it partitions the input sequence in some way. If the input sequence is an IList<T>
, it will use range partitioning, otherwise, it will use chunk partitioning (see Chunk partitioning vs range partitioning in PLINQ).
If you want to process the items in order, you could configure Parallel.ForEach()
by using a custom partitioner, which would partition the collection into chunks of size 1.
But since you don't really need Parallel.ForEach()
here, possibly a simpler solution would be just to create 4 tasks that process the items one by one. For synchronization, you could use BlockingCollection
. Something like:
public static class ParallelOrdered
{
public static void ForEach<T>(IEnumerable<T> collection, Action<T> action, int degreeOfParallelism)
{
var blockingCollection = new BlockingCollection<T>();
foreach (var item in collection)
blockingCollection.Add(item);
blockingCollection.CompleteAdding();
var tasks = new Task[degreeOfParallelism];
for (int i = 0; i < degreeOfParallelism; i++)
{
tasks[i] = Task.Factory.StartNew(
() =>
{
foreach (var item in blockingCollection.GetConsumingEnumerable())
action(item);
});
}
Task.WaitAll(tasks);
}
}
This is how I acheived this task
public delegate void ProcessFinished(IParallelProcess process);
public interface IParallelProcess
{
void Start();
event ProcessFinished ProcessFinished;
}
public class ParallelProcessBasket : ConcurrentQueue<IParallelProcess>
{
public void Put(IParallelProcess process)
{
base.Enqueue(process);
}
public IParallelProcess Get()
{
IParallelProcess process = null;
base.TryDequeue(out process);
return process;
}
}
public class ParallelProcessor<T> where T : class
{
private ParallelProcessBasket basket;
private readonly int MAX_DEGREE_OF_PARALLELISM;
private Action<T> action;
public ParallelProcessor(int degreeOfParallelism, IEnumerable<IParallelProcess> processes, Action<T> action)
{
basket = new ParallelProcessBasket();
this.action = action;
processes.ToList().ForEach(
(p) =>
{
basket.Enqueue(p);
p.ProcessFinished += new ProcessFinished(p_ProcessFinished);
});
MAX_DEGREE_OF_PARALLELISM = degreeOfParallelism;
}
private void p_ProcessFinished(IParallelProcess process)
{
if (!basket.IsEmpty)
{
T element = basket.Get() as T;
if (element != null)
{
Task.Factory.StartNew(() => action(element));
}
}
}
public void StartProcessing()
{
// take first level of iteration
for (int cnt = 0; cnt < MAX_DEGREE_OF_PARALLELISM; cnt++)
{
if (!basket.IsEmpty)
{
T element = basket.Get() as T;
if (element != null)
{
Task.Factory.StartNew(() => action(element));
}
}
}
}
}
static void Main(string[] args)
{
ParallelProcessor<ParallelTask> pr = new ParallelProcessor<ParallelTask>(Environment.ProcessorCount, collection, (e) => e.Method1());
pr.StartProcessing();
}
Thanks..