C# TPL calling tasks in a parallel manner and asyn

2019-07-13 10:33发布

问题:

I am trying to learn TPL. I write to files in a parallel manner like this:

public async Task SaveToFilesAsync(string path, List<string> list, CancellationToken ct)
{
    int count = 0;
    foreach (var str in list)
    {
        string fullPath = path + @"\" + count.ToString() + "_element.txt";
        using (var sw = File.CreateText(fullPath))
        {
            await sw.WriteLineAsync(str);
        }
        count++;

        Log("Saved in thread: {0} to {1}", 
           Environment.CurrentManagedThreadId,
           fullPath);

        if (ct.IsCancellationRequested)
            ct.ThrowIfCancellationRequested();
    }
}

And call it like this:

var tasks = new List<Task>();

try
{
    tasks.Add(SaveToFilesAsync(path, myListOfStrings, cts.Token));
}
catch (Exception ex)
{
    Log("Failed to save: " + ex.Message);
    throw;
}

tasks.Add(MySecondFuncAsync(), cts.Token);
//...
tasks.Add(MyLastFuncAsync(), cts.Token);

try
{
    //Or should I call await Task.WhenAll(tasks) ? What should I call here?
    Task.WaitAll(tasks.ToArray()); 
}
catch (AggregateException ex)
{
    foreach (var v in ex.InnerExceptions)
       Error(ex.Message + " " + v.Message);
}
finally
{
   cts.Dispose();
} 

foreach (task in tasks)
{
// Now, how to print results from the tasks? 
//Considering that all tasks return bool value, 
//I need to do something like this:
if (task.Status != TaskStatus.Faulted)
         Console.Writeline(task.Result);
else
         Log("Error...");
}

My goal is to make all functions (SaveToFilesAsync, MySecondFuncAsync) run at the same time in a parallel manner, using all cores on the computer and saving time. But when I see logs of SaveToFilesAsync I realize that saving to files always occur in the same thread, not parallel. What am I doing wrong? Second question: How can I get Task.Result from each task in task list at the end of the code? If the second function returns Task(bool), how can I get bool value in my code? Also, all comments about my code are very welcome since I am new at TPL.

回答1:

You need to replace the foreach loop, which runs sequentially from the first to the last item, with a Parallel.ForEach() loop that can be configured for parallelism, or Parallel.For() which gives you the index of the currently processed item. Since you need to use a counter for the files names, you will need to modify the list parameter to provide the file number, which you populate when creating your list, or use the index provided by Parallel.For(). Another option would be to have a long variable on which you could do an Interlocked.Increment after creating the file name but I'm not sure that would be optimal, I haven't tried it.

Here's how it would look like.

Wrap the code that will invoke SaveFilesAsync in a try/catch to handle operation canceled via the CancellationTokenSource

var cts = new CancellationTokenSource();

try
{
    Task.WaitAll(SaveFilesAsync(@"C:\Some\Path", files, cts.Token));
}
catch (Exception)
{
    Debug.Print("SaveFilesAsync Exception");
}
finally
{
    cts.Dispose();
}

Then do your parallelism in that method.

public async Task SaveFilesAsync(string path, List<string> list, CancellationToken token)
{
    int counter = 0;

    var options = new ParallelOptions
                      {
                          CancellationToken = token,
                          MaxDegreeOfParallelism = Environment.ProcessorCount,
                          TaskScheduler = TaskScheduler.Default
                      };

    await Task.Run(
        () =>
            {
                try
                {
                    Parallel.ForEach(
                        list,
                        options,
                        (item, state) =>
                            {
                                // if cancellation is requested, this will throw an OperationCanceledException caught outside the Parallel loop
                                options.CancellationToken.ThrowIfCancellationRequested();

                                // safely increment and get your next file number
                                int index = Interlocked.Increment(ref counter);
                                string fullPath = string.Format(@"{0}\{1}_element.txt", path, index);

                                using (var sw = File.CreateText(fullPath))
                                {
                                    sw.WriteLine(item);
                                }

                                Debug.Print(
                                    "Saved in thread: {0} to {1}",
                                    Thread.CurrentThread.ManagedThreadId,
                                    fullPath);
                            });
                }
                catch (OperationCanceledException)
                {
                    Debug.Print("Operation Canceled");
                }
            });
}

The other part of your code doesn't change, simply adapt where you create your list of files contents.

Edit: The try/catch around the invocation of the SaveFileAsync method does nothing actually, it is all handled inside SaveFileAsync.



回答2:

Try this:

public async Task SaveToFileAsync(string fullPath, line)
{
    using (var sw = File.CreateText(fullPath))
    {
        await sw.WriteLineAsync(str);
    }

    Log("Saved in thread: {0} to {1}", 
       Environment.CurrentManagedThreadId,
       fullPath);
}

public async Task SaveToFilesAsync(string path, List<string> list)
{
    await Task.WhenAll(
        list
            .Select((line, i) =>
                SaveToFileAsync(
                    string.Format(
                        @"{0}\{1}_element.txt",
                        path,
                        i),
                    line));
}

Since you're writing only one line per file and you want to parellelize it all, I don't think it's cancellable.