-->

Parallel.ForEach using Thread.Sleep equivalent

2019-02-21 22:02发布

问题:

So here's the situation: I need to make a call to a web site that starts a search. This search continues for an unknown amount of time, and the only way I know if the search has finished is by periodically querying the website to see if there's a "Download Data" link somewhere on it (it uses some strange ajax call on a javascript timer to check the backend and update the page, I think).

So here's the trick: I have hundreds of items I need to search for, one at a time. So I have some code that looks a little bit like this:

var items = getItems();
Parallel.ForEach(items, item =>
{
   startSearch(item);
   var finished = isSearchFinished(item);
   while(finished == false)
   {
      finished = isSearchFinished(item); //<--- How do I delay this action 30 Secs?
   }
   downloadData(item);
}

Now, obviously this isn't the real code, because there could be things that cause isSearchFinished to always be false.

Obvious infinite loop danger aside, how would I correctly keep isSearchFinished() from calling over and over and over, but instead call every, say, 30 seconds or 1 minute?

I know Thread.Sleep() isn't the right solution, and I think the solution might be accomplished by using Threading.Timer() but I'm not very familiar with it, and there are so many threading options that I'm just not sure which to use.

回答1:

It's quite easy to implement with tasks and async/await, as noted by @KevinS in the comments:

async Task<ItemData> ProcessItemAsync(Item item)
{
    while (true)
    {
        if (await isSearchFinishedAsync(item))
            break;
        await Task.Delay(30 * 1000);
    }
    return await downloadDataAsync(item);
}

// ...

var items = getItems();
var tasks = items.Select(i => ProcessItemAsync(i)).ToArray();
await Task.WhenAll(tasks);
var data = tasks.Select(t = > t.Result);

This way, you don't block ThreadPool threads in vain for what is mostly a bunch of I/O-bound network operations. If you're not familiar with async/await, the async-await tag wiki might be a good place to start.

I assume you can convert your synchronous methods isSearchFinished and downloadData to asynchronous versions using something like HttpClient for non-blocking HTTP request and returning a Task<>. If you are unable to do so, you still can simply wrap them with Task.Run, as await Task.Run(() => isSearchFinished(item)) and await Task.Run(() => downloadData(item)). Normally this is not recommended, but as you have hundreds of items, it sill would give you a much better level of concurrency than with Parallel.ForEach in this case, because you won't be blocking pool threads for 30s, thanks to asynchronous Task.Delay.



回答2:

You can also write a generic function using TaskCompletionSource and Threading.Timer to return a Task that becomes complete once a specified retry function succeeds.

public static Task RetryAsync(Func<bool> retryFunc, TimeSpan retryInterval)
{
    return RetryAsync(retryFunc, retryInterval, CancellationToken.None);
}

public static Task RetryAsync(Func<bool> retryFunc, TimeSpan retryInterval, CancellationToken cancellationToken)
{
    var tcs = new TaskCompletionSource<object>();

    cancellationToken.Register(() => tcs.TrySetCanceled());

    var timer = new Timer((state) =>
    {
        var taskCompletionSource = (TaskCompletionSource<object>) state;

        try
        {                   
            if (retryFunc())
            {
                taskCompletionSource.TrySetResult(null);
            }
        }
        catch (Exception ex)
        {
            taskCompletionSource.TrySetException(ex);
        }
    }, tcs, TimeSpan.FromMilliseconds(0), retryInterval);

    // Once the task is complete, dispose of the timer so it doesn't keep firing. Also captures the timer
    // in a closure so it does not get disposed.
    tcs.Task.ContinueWith(t => timer.Dispose(),
                          CancellationToken.None,
                          TaskContinuationOptions.ExecuteSynchronously,
                          TaskScheduler.Default);

    return tcs.Task;
}

You can then use RetryAsync like this:

var searchTasks = new List<Task>();

searchTasks.AddRange(items.Select(
        downloadItem => RetryAsync( () => isSearchFinished(downloadItem),  TimeSpan.FromSeconds(2))  // retry timout
        .ContinueWith(t => downloadData(downloadItem), 
                      CancellationToken.None, 
                      TaskContinuationOptions.OnlyOnRanToCompletion, 
                      TaskScheduler.Default)));

await Task.WhenAll(searchTasks.ToArray());

The ContinueWith part specifies what you do once the task has completed successfully. In this case it will run your downloadData method on a thread pool thread because we specified TaskScheduler.Default and the continuation will only execute if the task ran to completion, i.e. it was not canceled and no exception was thrown.