Parallel.For items in a List by using ConcurrentQu

2019-08-04 12:16发布

I have a List<TaskClass> TaskList items that we can iterate over using a Parallel loop.

The items in the list are sorted in a particular order as the TaskClass implements IComparable with its own CompareTo(object obj) method. Thus we need the items acted upon in sequential order.

Note they do NOT have to complete in sequential order, just start in sequential.

Thus TaskList[0] should be started first; then TaskList[1], TaskList[2], ... However, we don't care if TaskList[2] completes first, or TaskList[0].

This is the quick code I've come up with to try and alleviate this:

//Construct a ConcurrentQueue and populate it with our SORTED list
//of TaskClass items so when we go through a parallel loop
//they are acted upon in sorted order. A parallel loop does not
//guarantee ordering, which we need to make sure tasks with a higher
//number are done first.
ConcurrentQueue<TaskClass> cq = new ConcurrentQueue<TaskClass>();
for (int x = 0; x < TaskList.Count; x++)
    cq.Enqueue(TaskList[x]);

Parallel.For(
    0,
    cq.Count,
    new ParallelOptions { MaxDegreeOfParallelism = DISystem.MaxConcurrentThreads },
    x =>
    {
        TaskClass tc = null;
        if (cq.TryDequeue(out tc))
        {
            TaskTypeManager ttm = new TaskTypeManager();
            tc.Working = true;
            tc.Started = DateTime.Now;
            ttm.ProcessTaskItem(tc);
                }
        }
);

Now the issue I believe is when the Parallel.For loop completes, the original List<TaskClass> TaskList will not have been updated with the latest values.

What is the best way to accomplish this?

With modified code like the following? (lines marked with "//new")

ConcurrentQueue<TaskClass> cq = new ConcurrentQueue<TaskClass>();
for (int x = 0; x < TaskList.Count; x++)
    cq.Enqueue(TaskList[x]);

List<TaskClass> NewTaskList = new List<TaskClass>(); //new
object lockObject = new Object(); //new

Parallel.For(
    0,
    cq.Count,
    new ParallelOptions { MaxDegreeOfParallelism = DISystem.MaxConcurrentThreads },
    x =>
    {
        TaskClass tc = null;
        if (cq.TryDequeue(out tc))
        {
            TaskTypeManager ttm = new TaskTypeManager();
            tc.Working = true;
            tc.Started = DateTime.Now;
            ttm.ProcessTaskItem(tc);
            lock (lockObject) //new
            {
                NewTaskList.Add(tc);
            }
        }
    }
);

NewTaskList.Sort(); //new
TaskList.Clear(); //new
TaskList = NewTaskList.ToList(); //new

Or does anyone have any other ideas/suggestions/improvements?

Thanks!

1条回答
叼着烟拽天下
2楼-- · 2019-08-04 13:08

Will this work? - No. Maybe most of the time, but not if you really need it ordered.

There is an inherent problem with the statement "they do have to start in order". What do you mean with "start"? You probably have a race condition there. Consider this modification:

x =>
{
    TaskClass tc = null;
    if (cq.TryDequeue(out tc))
    {
        Thread.Sleep(random.Next(0, 1000));
        TaskTypeManager ttm = new TaskTypeManager();
         ...

As you can see, the only thing happening in order is your items being dequeued - after that, parallelism kicks in and no order is guaranteed. You need some kind of synchronization into ProcessTaskItem, up to the point where you consider the task to be actually "started".

查看更多
登录 后发表回答