How to properly parallelize worker tasks?

2019-05-11 13:06发布

问题:

Consider the following code snippet and notice the difference in total runtime between setting numberTasksToSpinOff equal to 1 and then 3,4, or more (depending on thread resources on your machine). I notice much longer run times when spinning off more tasks.

I passed on purpose a data collection into each worker instance that each worker tasks reads from at the same time. I thought that tasks can access a shared data structure without blocking as long as those operations are only reads or enumerations.

My goal is to spin off multiple tasks that iterate over the same shared data structure via read operations and complete altogether at around the same time regardless of number tasks spun off.

Edit: Please see second code snippet where I implement Parallel.Foreach() and create each worker's own dataset, hence no accessing identical data structures by different tasks/threads. Yet I still see an unacceptable amount of overhead.

class Program
{
    static void Main(string[] args)
    {
        Console.WriteLine($"Entry Main Function Thread Id: {Thread.CurrentThread.ManagedThreadId}");

        //run
        var task = Task.Run(async () =>
        {
            Console.WriteLine($"Entry RunMe Task Thread Id: {Thread.CurrentThread.ManagedThreadId}");
            await RunMe();

            Console.WriteLine($"Exit RunMe Task Thread Id: {Thread.CurrentThread.ManagedThreadId}");
        });

        task.Wait();

        Console.WriteLine($"Exit Main Function Thread Id: {Thread.CurrentThread.ManagedThreadId}");

        Console.WriteLine("Press key to quit");
        Console.ReadLine();
    }

    private static async Task RunMe()
    {
        var watch = new Stopwatch();
        var numberTasksToSpinOff = 6;
        var numberItems = 20000;
        var random = new Random((int)DateTime.Now.Ticks);
        var dataPoints = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
        var tasks = new List<Task>();
        var workers = new List<Worker>();

        //structure workers
        for (int i = 1; i <= numberTasksToSpinOff; i++)
        {
            workers.Add(new Worker(i, dataPoints));
        }

        //start timer
        watch.Restart();

        //spin off tasks
        foreach (var worker in workers)
        {
            tasks.Add(Task.Run(() =>
            {
                Console.WriteLine($"Entry WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
                worker.DoSomeWork();
                Console.WriteLine($"Exit WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");
            }));

        }

        //completion tasks
        await Task.WhenAll(tasks);

        //stop timer
        watch.Stop();

        Console.WriteLine($"Time it took to complete in Milliseconds: {watch.ElapsedMilliseconds}");
    }
}

public class Worker
{
    public int WorkerId { get; set; }
    private List<double> _data;

    public Worker(int workerId, List<double> data)
    {
        WorkerId = workerId;
        _data = data;
    }

    public void DoSomeWork()
    {
        var indexPos = 0;

        foreach (var dp in _data)
        {
            var subSet = _data.Skip(indexPos).Take(_data.Count - indexPos).ToList();
            indexPos++;
        }
    }
}

Second Code Snippet:

class Program
{
    static void Main(string[] args)
    {
        var watch = new Stopwatch();
        var numberTasksToSpinOff = 1;
        var numberItems = 20000;
        //var random = new Random((int)DateTime.Now.Ticks);
        //var dataPoints = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();
        var workers = new List<Worker>();

        //structure workers
        for (int i = 1; i <= numberTasksToSpinOff; i++)
        {
            workers.Add(new Worker(i));
        }

        //start timer
        watch.Restart();

        //parellel work

        if (workers.Any())
        {
            var processorCount = Environment.ProcessorCount;
            var parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = processorCount };
            Parallel.ForEach(workers, parallelOptions, DoSomeWork);
        }

        //stop timer
        watch.Stop();
        Console.WriteLine($"Time it took to complete in Milliseconds: {watch.ElapsedMilliseconds}");

        Console.WriteLine("Press key to quit");
        Console.ReadLine();
    }

    private static void DoSomeWork(Worker worker)
    {
        Console.WriteLine($"WorkerId: {worker.WorkerId} -> New Tasks spun off with in Thread Id: {Thread.CurrentThread.ManagedThreadId}");

        var indexPos = 0;

        foreach (var dp in worker.Data)
        {
            var subSet = worker.Data.Skip(indexPos).Take(worker.Data.Count - indexPos).ToList();
            indexPos++;
        }
    }
}

public class Worker
{
    public int WorkerId { get; set; }
    public List<double> Data { get; set; }

    public Worker(int workerId)
    {
        WorkerId = workerId;

        var numberItems = 20000;
        var random = new Random((int)DateTime.Now.Ticks);
        Data = Enumerable.Range(1, numberItems).Select(x => random.NextDouble()).ToList();

    }
}

回答1:

NOTE: The following answer is based on testing and observation and not definitiv knowledge.

The more task you spin off the more overhead you generate and thus the total execution time also rises. BUT if you think of it from another viewpoint you will see that the actually processed "data-points" will increase the more tasks you spin up (up until you reach the limit of available hardware-threads):

The following values are generated on my machine (4C/8T) with 10000 points per list:

  • 1 worker -> 1891 ms -> 5288 p/s
  • 2 worker -> 1921 ms -> 10411 p/s
  • 4 worker -> 2670 ms -> 14981 p/s
  • 8 worker -> 4871 ms -> 16423 p/s
  • 12 worker -> 7449 ms -> 16109 p/s

There you see until I reach my "core-limit" the processed data increases significantly, then until I reach my "thread-limit" it increases still noticeable, but after that it decreases again, because of the risen overhead and no more available hardware-resources.



回答2:

Have you had a look at Parallel Tasks? You could then do something like this.

eg:

if (workers.Any())
{
    var parallelOptions = new ParallelOptions {MaxDegreeOfParallelism = Environment.ProcessorCount};
    Parallel.ForEach(workers, parallelOptions, DoSomeWork);
}

private static void DoSomeWork(Worker worker)
{
}