Speculative execution using the TPL

2019-02-18 02:23发布

问题:

I have a List<Task<bool>> that I want to enumerate in parallel finding the first task to complete with a result of true and not waiting for or observe exceptions on any of the other tasks still pending.

var tasks = new List<Task<bool>>
{ 
    Task.Delay(2000).ContinueWith(x => false), 
    Task.Delay(0).ContinueWith(x => true), 
};

I have tried to use PLINQ to do something like:

var task = tasks.AsParallel().FirstOrDefault(t => t.Result);

Which executes in parallel, but doesn't return as soon as it finds a satisfying result. because accessing the Result property is blocking. In order for this to work using PLINQ, I'd have to write this aweful statement:

var cts = new CancellationTokenSource();
var task = tasks.AsParallel()
    .FirstOrDefault(t =>
    {
        try 
        { 
            t.Wait(cts.Token);
            if (t.Result)
            {
                cts.Cancel();
            }

            return t.Result;
        } 
        catch (OperationCanceledException) 
        { 
            return false;
        }
    } );

I've written up an extension method that yields tasks as they complete like so.

public static class Exts
{
    public static IEnumerable<Task<T>> InCompletionOrder<T>(this IEnumerable<Task<T>> source)
    {
        var tasks = source.ToList();
        while (tasks.Any())
        {
            var t = Task.WhenAny(tasks);
            yield return t.Result;
            tasks.Remove(t.Result);
        }
    }
}

// and run like so
var task = tasks.InCompletionOrder().FirstOrDefault(t => t.Result);

But it feels like this is something common enough that there is a better way. Suggestions?

回答1:

Maybe something like this?

var tcs = new TaskCompletionSource<Task<bool>>();

foreach (var task in tasks)
{
    task.ContinueWith((t, state) =>
    {
        if (t.Result)
        {
            ((TaskCompletionSource<Task<bool>>)state).TrySetResult(t);
        }
    },
        tcs,
        TaskContinuationOptions.OnlyOnRanToCompletion |
        TaskContinuationOptions.ExecuteSynchronously);
}

var firstTaskToComplete = tcs.Task;


回答2:

Perhaps you could try the Rx.Net library. Its very good for in effect providing Linq to Work.

Try this snippet in LinqPad after you reference the Microsoft Rx.Net assemblies.

using System
using System.Linq
using System.Reactive.Concurrency
using System.Reactive.Linq
using System.Reactive.Threading.Tasks
using System.Threading.Tasks

void Main()
{
    var tasks = new List<Task<bool>>
    { 
        Task.Delay(2000).ContinueWith(x => false), 
        Task.Delay(0).ContinueWith(x => true), 
    };

    var observable = (from t in tasks.ToObservable()
                      //Convert task to an observable
                      let o = t.ToObservable()
                      //SelectMany
                      from x in o
                      select x);


    var foo = observable
                .SubscribeOn(Scheduler.Default) //Run the tasks on the threadpool
                .ToList()
                .First();

    Console.WriteLine(foo);
}


回答3:

First, I don't understand why are you trying to use PLINQ here. Enumerating a list of Tasks shouldn't take long, so I don't think you're going to gain anything from parallelizing it.

Now, to get the first Task that already completed with true, you can use the (non-blocking) IsCompleted property:

var task = tasks.FirstOrDefault(t => t.IsCompleted && t.Result);

If you wanted to get a collection of Tasks, ordered by their completion, have a look at Stephen Toub's article Processing tasks as they complete. If you want to list those that return true first, you would need to modify that code. If you don't want to modify it, you can use a version of this approach from Stephen Cleary's AsyncEx library.


Also, in the specific case in your question, you could “fix” your code by adding .WithMergeOptions(ParallelMergeOptions.NotBuffered) to the PLINQ query. But doing so still wouldn't work most of the time and can waste threads a lot even when it does. That's because PLINQ uses a constant number of threads and partitioning and using Result would block those threads most of the time.