-->

Executing N number of threads in parallel and in a

2019-06-16 06:55发布

问题:

I have an application where i have 1000+ small parts of 1 large file.

I have to upload maximum of 16 parts at a time.

I used Thread parallel library of .Net.

I used Parallel.For to divide in multiple parts and assigned 1 method which should be executed for each part and set DegreeOfParallelism to 16.

I need to execute 1 method with checksum values which are generated by different part uploads, so i have to set certain mechanism where i have to wait for all parts upload say 1000 to complete. In TPL library i am facing 1 issue is it is randomly executing any of the 16 threads from 1000.

I want some mechanism using which i can run first 16 threads initially, if the 1st or 2nd or any of the 16 thread completes its task next 17th part should be started.

How can i achieve this ?

回答1:

Here is the manual way of doing this.

You need a queue. The queue is sequence of pending tasks. You have to dequeue and put them inside list of working task. When ever the task is done remove it from list of working task and take another from queue. Main thread controls this process. Here is the sample of how to do this.

For the test i used List of integer but it should work for other types because its using generics.

private static void Main()
{
    Random r = new Random();
    var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList();

    ParallelQueue(items, DoWork);
}

private static void ParallelQueue<T>(List<T> items, Action<T> action)
{
    Queue pending = new Queue(items);
    List<Task> working = new List<Task>();

    while (pending.Count + working.Count != 0)
    {
        if (pending.Count != 0 && working.Count < 16)  // Maximum tasks
        {
            var item = pending.Dequeue(); // get item from queue
            working.Add(Task.Run(() => action((T)item))); // run task
        }
        else
        {
            Task.WaitAny(working.ToArray());
            working.RemoveAll(x => x.IsCompleted); // remove finished tasks
        }
    }
}

private static void DoWork(int i) // do your work here.
{
    // this is just an example
    Task.Delay(i).Wait(); 
    Console.WriteLine(i);
}

Please let me know if you encounter problem of how to implement DoWork for your self. because if you change method signature you may need to do some changes.

Update

You can also do this with async await without blocking the main thread.

private static void Main()
{
    Random r = new Random();
    var items = Enumerable.Range(0, 100).Select(x => r.Next(100, 200)).ToList();

    Task t = ParallelQueue(items, DoWork);

    // able to do other things.

    t.Wait();
}

private static async Task ParallelQueue<T>(List<T> items, Func<T, Task> func)
{
    Queue pending = new Queue(items);
    List<Task> working = new List<Task>();

    while (pending.Count + working.Count != 0)
    {
        if (working.Count < 16 && pending.Count != 0)
        {
            var item = pending.Dequeue();
            working.Add(Task.Run(async () => await func((T)item)));
        }
        else
        {
            await Task.WhenAny(working);
            working.RemoveAll(x => x.IsCompleted);
        }
    }
}

private static async Task DoWork(int i)
{
    await Task.Delay(i);
}


回答2:

One possible candidate for this can be TPL Dataflow. This is a demonstration which takes in a stream of integers and prints them out to the console. You set the MaxDegreeOfParallelism to whichever many threads you wish to spin in parallel:

void Main()
{
    var actionBlock = new ActionBlock<int>(
            i => Console.WriteLine(i), 
            new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 16});

    foreach (var i in Enumerable.Range(0, 200))
    {
        actionBlock.Post(i);
    }
}

This can also scale well if you want to have multiple producer/consumers.



回答3:

var workitems = ... /*e.g. Enumerable.Range(0, 1000000)*/;
SingleItemPartitioner.Create(workitems)
 .AsParallel()
 .AsOrdered()
 .WithDegreeOfParallelism(16)
 .WithMergeOptions(ParallelMergeOptions.NotBuffered)
 .ForAll(i => { Thread.Slee(1000); Console.WriteLine(i); });

This should be all you need. I forgot how the methods are named exactly... Look at the documentation.

Test this by printing to the console after sleeping for 1sec (which this sample code does).



回答4:

Another option would be to use a BlockingCollection<T> as a queue between your file reader thread and your 16 uploader threads. Each uploader thread would just loop around consuming the blocking collection until it is complete.

And, if you want to limit memory consumption in the queue you can set an upper limit on the blocking collection such that the file-reader thread will pause when the buffer has reached capacity. This is particularly useful in a server environment where you may need to limit memory used per user/API call.

// Create a buffer of 4 chunks between the file reader and the senders
BlockingCollection<Chunk> queue = new BlockingCollection<Chunk>(4);

// Create a cancellation token source so you can stop this gracefully
CancellationTokenSource cts = ...

File reader thread

...
queue.Add(chunk, cts.Token);
...
queue.CompleteAdding();

Sending threads

for(int i = 0; i < 16; i++)
{
   Task.Run(() => {
      foreach (var chunk in queue.GetConsumingEnumerable(cts.Token))
      {
          .. do the upload
      }
   });
}