How to run a set of functions in parallel and wait

2019-03-30 20:53发布

I have a requirement to run a set of heavy functions asynchronously at sametime and populate the results in a list. Here is the pseudo code for this :

List<TResult> results = new List<TResults>();
List<Func<T, TResult>> tasks = PopulateTasks();

foreach(var task in tasks)
{
    // Run Logic in question
    1. Run each task asynchronously/parallely
    2. Put the results in the results list upon each task completion
}

Console.WriteLine("All tasks completed and results populated");

I need the logic inside the foreach bock. Can you guys plz help me?

I have some constraint : The solution must be .net 3.5 compliant (not .net 4, but a .net 4 alternative solution would be appreciated for my knowledge purpose)

Thanks in advance.

6条回答
女痞
2楼-- · 2019-03-30 21:26

Another variant would be with a small future pattern implementation:

    public class Future<T>
    {
        public Future(Func<T> task)
        {
            Task = task;
            _asyncContext = task.BeginInvoke(null, null);
        }

        private IAsyncResult _asyncContext;

        public Func<T> Task { get; private set; }
        public T Result
        {
            get
            {
                return Task.EndInvoke(_asyncContext);
            }
        }

        public bool IsCompleted
        {
            get { return _asyncContext.IsCompleted; }
        }
    }

    public static IList<Future<T>> RunAsync<T>(IEnumerable<Func<T>> tasks)
    {
        List<Future<T>> asyncContext = new List<Future<T>>();
        foreach (var task in tasks)
        {
            asyncContext.Add(new Future<T>(task));
        }
        return asyncContext;
    }

    public static IEnumerable<T> WaitForAll<T>(IEnumerable<Future<T>> futures)
    {
        foreach (var future in futures)
        {
            yield return future.Result;
        }
    }

    public static void Main()
    {
        var tasks = Enumerable.Repeat<Func<int>>(() => ComputeValue(), 10).ToList();

        var futures = RunAsync(tasks);
        var results = WaitForAll(futures);
        foreach (var result in results)
        {
            Console.WriteLine(result);
        }
    }

    public static int ComputeValue()
    {
        Thread.Sleep(1000);
        return Guid.NewGuid().ToByteArray().Sum(a => (int)a);
    }
查看更多
我欲成王,谁敢阻挡
3楼-- · 2019-03-30 21:26

the traditional way is to use a Sempahore. Initialise the semaphore with the number of threads you're using then kick off a thread per task and wait on the semaphore object. When each thread completes, it should increment the semaphore. When the semaphore count reaches 0, the main thread that was waiting will continue.

查看更多
虎瘦雄心在
4楼-- · 2019-03-30 21:27
List<Func<T, TResult>> tasks = PopulateTasks();
TResult[] results = new TResult[tasks.Length];
Parallel.For(0, tasks.Count, i =>
    {
        results[i] = tasks[i]();
    });

TPL for 3.5 apparently exists.

查看更多
狗以群分
5楼-- · 2019-03-30 21:30
    public static IList<IAsyncResult> RunAsync<T>(IEnumerable<Func<T>> tasks)
    {
        List<IAsyncResult> asyncContext = new List<IAsyncResult>();
        foreach (var task in tasks)
        {
            asyncContext.Add(task.BeginInvoke(null, null));
        }
        return asyncContext;
    }

    public static IEnumerable<T> WaitForAll<T>(IEnumerable<Func<T>> tasks, IEnumerable<IAsyncResult> asyncContext)
    {
        IEnumerator<IAsyncResult> iterator = asyncContext.GetEnumerator();
        foreach (var task in tasks)
        {
            iterator.MoveNext();
            yield return task.EndInvoke(iterator.Current);
        }
    }

    public static void Main()
    {
        var tasks = Enumerable.Repeat<Func<int>>(() => ComputeValue(), 10).ToList();

        var asyncContext = RunAsync(tasks);
        var results = WaitForAll(tasks, asyncContext);
        foreach (var result in results)
        {
            Console.WriteLine(result);
        }
    }

    public static int ComputeValue()
    {
        Thread.Sleep(1000);
        return Guid.NewGuid().ToByteArray().Sum(a => (int)a); 
    }
查看更多
一纸荒年 Trace。
6楼-- · 2019-03-30 21:41

Do your processing in separate worker instances, each on their own thread. Use a callback to pass back the results and signal the calling process that the thread is done. Use a Dictionary to keep track of running threads. If you have lots of threads you should load a Queue and launch new threads as old ones finish. In this example all of the threads are created before any are launched to prevent a race condition where the running thread count drops to zero before the final threads are launched.

    Dictionary<int, Thread> activeThreads = new Dictionary<int, Thread>();
    void LaunchWorkers()
    {
        foreach (var task in tasks)
        {
            Worker worker = new Worker(task, new WorkerDoneDelegate(ProcessResult));
            Thread thread = new Thread(worker.Done);
            thread.IsBackground = true;
            activeThreads.Add(thread.ManagedThreadId, thread);
        }
        lock (activeThreads)
        {
            activeThreads.Values.ToList().ForEach(n => n.Start());
        }
    }

    void ProcessResult(int threadId, TResult result)
    {
        lock (results)
        {
            results.Add(result);
        }
        lock (activeThreads)
        {
            activeThreads.Remove(threadId);
            // done when activeThreads.Count == 0
        }
    }
}

public delegate void WorkerDoneDelegate(object results);
class Worker
{
    public WorkerDoneDelegate Done;
    public void Work(Task task, WorkerDoneDelegate Done)
    {
        // process task
        Done(Thread.CurrentThread.ManagedThreadId, result);
    }
}
查看更多
姐就是有狂的资本
7楼-- · 2019-03-30 21:47

A simple 3.5 implementation could look like this

List<TResult> results = new List<TResults>();
List<Func<T, TResult>> tasks = PopulateTasks();

ManualResetEvent waitHandle = new ManualResetEvent(false);
void RunTasks()
{
    int i = 0;
    foreach(var task in tasks)
    {
        int captured = i++;
        ThreadPool.QueueUserWorkItem(state => RunTask(task, captured))
    }

    waitHandle.WaitOne();

    Console.WriteLine("All tasks completed and results populated");
}

private int counter;
private readonly object listLock = new object();
void RunTask(Func<T, TResult> task, int index)
{
    var res = task(...); //You haven't specified where the parameter comes from
    lock (listLock )
    {
       results[index] = res;
    }
    if (InterLocked.Increment(ref counter) == tasks.Count)
        waitHandle.Set();
}
查看更多
登录 后发表回答