Using thousands of Tasks with a timeout efficientl

2019-08-12 04:47发布

问题:

I am implementing a Library L that communicates via Sockets with another application A.

Basic workflow is as followed:

  1. L connects to A.
  2. L sends ~50.000 pieces of information I to A, and creates a task T for every I that is sent out.
  3. L listens for incoming results from A, and once reuslts are there, uses a TaskCompletionSource to set the results of the Tasks T
  4. L creates a Task T2 with a set Timeout (Task.WhenAny(T,Task.Delay(xx))
  5. L uses Task.WhenAll(T2) to wait for timeout or results on all sent information.

Managing the underlying data structure is no problem at all. The main problem is that assembling the "main" Task.WhenAll(T2) costs around 5-6 seconds on my computer with ca. 50.000 entries (creating 50.000*2+1 tasks).

I can't think off a more lightweight way that accomplishes the same, however. It should use all Cores available and be non-blocking, and support timeouts aswell.

Is there a way to accomplish the same using the Parallel- or ThreadPool classes which enhances the performance?

EDIT: Code showing how the basic setup is: https://dotnetfiddle.net/gIq2DP

回答1:

Start a total of n LongRunningTasks, where n is the number of cores on your machine. Each task should run on one core. It would be a waste to create 50K new tasks for every I that you want to send. Instead design the tasks to accept I and the socket information - where this information is to be sent.

Create a BlockingCollection<Tuple<I, SocketInfo>>. Start one task to populate this blocking collection. The other n long running tasks that you created earlier can keep taking tuples of information and the address to send the information and then perform the job for you in a loop that will break when blocking collection is done.

Timeouts can be set in the long running tasks itself.

This entire setup will keep your CPU busy to the maximum with useful work rather than keeping it needlessly busy with a "job" of 50K tasks' creation.

Since the operations (like this network operation) which happen beyond the main memory are very very slow for the CPU, feel free to set n not just equal to number of cores in your machine but even thrice that value. In my code demonstration I have set it equal to the number of cores only.

With the code at the provided link, this is one way...

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq.Expressions;
using System.Net.NetworkInformation;
using System.Threading.Tasks;

namespace TestConsoleApplication
{
    public static class Test
    {
        public static void Main()
        {
            TaskRunningTest();
        }

        private static void TaskRunningTest()
        {
            var s = new Stopwatch();
            const int totalInformationChunks = 50000;
            var baseProcessorTaskArray = new Task[Environment.ProcessorCount];
            var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
            var tcs = new TaskCompletionSource<int>();

            var itemsToProcess = new BlockingCollection<Tuple<Information, Address>>(totalInformationChunks);

            s.Start();
            //Start a new task to populate the "itemsToProcess"
            taskFactory.StartNew(() =>
            {
                // Add Tuples of Information and Address to which this information is to be sent to.
                Console.WriteLine("Done intializing all the jobs...");
                // Finally signal that you are done by saying..
                itemsToProcess.CompleteAdding();
            });

            //Initializing the base tasks
            for (var index = 0; index < baseProcessorTaskArray.Length; index++)
            {
                var thisIndex = index;
                baseProcessorTaskArray[index] = taskFactory.StartNew(() =>
                {
                    while (!itemsToProcess.IsAddingCompleted && itemsToProcess.Count != 0)
                    {
                        Tuple<Information, Address> item;
                        itemsToProcess.TryTake(out item);
                        //Process the item
                        tcs.TrySetResult(thisIndex);
                    }
                });
            }

            // Need to provide new timeout logic now
            // Depending upon what you are trying to achieve with timeout, you can devise out the way

            // Wait for the base tasks to completely empty OR
            // timeout and then stop the stopwatch.
            Task.WaitAll(baseProcessorTaskArray); 
            s.Stop();
            Console.WriteLine(s.ElapsedMilliseconds);
        }

        private class Address
        {
            //This class should have the socket information
        }

        private class Information
        {
            //This class will have the Information to send
        }
    }
}


回答2:

Profiling shows that most time (90%?) is spent in timer setup, expiration and disposal. This seems plausible to me.

Maybe you can create your own super cheap timeout mechanism. Enqueue timeouts into a priority queue ordered by expiration time. Then, run a single timer every 100ms and make that timer expire everything in the priority queue that is due.

The cost of doing this would be one TaskCompletionSource per timeout and some small further processing.

You can even cancel timeouts by removing them from the queue and just dropping the TaskCompletionSource.