Simpler solution than TPL Dataflow for parallel as

2019-08-30 02:08发布

问题:

I'm implementing a worker role on Azure which needs to delete blobs from Azure storage. Let's assume my list of blobs has about 10K items.

The simplest synchronous approach would probably be:

Parallel.ForEach(list, x => ((CloudBlob) x).Delete());

Requirements:

  • I want to implement the same thing asynchronously (on a single thread).

  • I want to limit the number of concurrent connections to 50 - so I'll do my 10K deletions when only 50 async ones are being performed at the same time. If one deletion completes, a new one can be started.

Solution?

So far, after reading this question and this one, it seems that TPL Dataflow is the way to go.

This is such a simple problem and dataflow seems like an overkill. Is there any simpler alternative?

If not, how would this be implemented using dataflow? As I understand, I need a single action block which performs the async delete (do I need await?). When creating my block I should set MaxDegreeOfParallelism to 50. Then I need to post my 10K blobs from the list to the block and then execute with block.Completion.Wait(). Is this correct?

回答1:

For something this simple, a SemaphoreSlim should suffice. TPL Dataflow is great, especially if you're looking to limit work in one part of a larger pipeline. However, in your scenario it sounds more like you really do just have one action that you need to throttle.

Doing it asynchronously is quite simple:

var semaphore = new SemaphoreSlim(50);
var tasks = list.Cast<CloudBlob>().Select(async x =>
{
    using (await semaphore.TakeAsync())
        await x.DeleteAsync();
});
await Task.WhenAll(tasks);

where TakeAsync is defined as:

private sealed class SemaphoreSlimKey : IDisposable
{
    private readonly SemaphoreSlim _semaphore;
    public SemaphoreSlimKey(SemaphoreSlim semaphore) { _semaphore = semaphore; }
    void IDisposable.Dispose() { _semaphore.Release(); }
}

public static async Task<IDisposable> TakeAsync(this SemaphoreSlim semaphore)
{
    await semaphore.WaitAsync().ConfigureAwait(false);
    return new SemaphoreSlimKey(semaphore);
}


回答2:

You may consider using a Task Scheduler that limits parallelism for you : http://msdn.microsoft.com/en-us/library/ee789351.aspx. Unbounded parallelism can lead to throttling since you may DOS the server with too many simultaneous requests instantly, and is therefore considered best practice to limit this at the application layer.

Note the 2.1 storage client supports Task with preemptive cancellation etc, to make this easier. The code you had written above would require 50 threads as it is calling a synchronous method. You can use the new DeleteAsync methods to do this completely Async on a single thread. If you were to leverage Async await, maintaining 50 concurrent requests would be pretty straightforward as you could do an await any in a loop an simply add an additional work item etc.

This talk covers some best practices and may be helpful to you : http://channel9.msdn.com/Events/TechEd/NorthAmerica/2013/WAD-B406