.NET Custom Threadpool with separate instances

2019-03-25 07:54发布

问题:

What is the most recommended .NET custom threadpool that can have separate instances i.e more than one threadpool per application? I need an unlimited queue size (building a crawler), and need to run a separate threadpool in parallel for each site I am crawling.

Edit : I need to mine these sites for information as fast as possible, using a separate threadpool for each site would give me the ability to control the number of threads working on each site at any given time. (no more than 2-3)

Thanks Roey

回答1:

I believe Smart Thread Pool can do this. It's ThreadPool class is instantiated so you should be able to create and manage your separate site specific instances as you require.



回答2:

Ami bar wrote an excellent Smart thread pool that can be instantiated.

take a look here



回答3:

Ask Jon Skeet: http://www.yoda.arachsys.com/csharp/miscutil/

Parallel extensions for .Net (TPL) should actually work much better if you want a large number of parallel running tasks.



回答4:

Using BlockingCollection can be used as a queue for the threads. Here is an implementation of it. Updated at 2018-04-23:

public class WorkerPool<T> : IDisposable
{
    BlockingCollection<T> queue = new BlockingCollection<T>();
    List<Task> taskList;
    private CancellationTokenSource cancellationToken;
    int maxWorkers;
    private bool wasShutDown;

    int waitingUnits;

    public WorkerPool(CancellationTokenSource cancellationToken, int maxWorkers)
    {
        this.cancellationToken = cancellationToken;
        this.maxWorkers = maxWorkers;
        this.taskList = new List<Task>();
    }
    public void enqueue(T value)
    {
        queue.Add(value);
        waitingUnits++;
    }
    //call to signal that there are no more item
    public void CompleteAdding()
    {
        queue.CompleteAdding();          
    }

    //create workers and put then running
    public void startWorkers(Action<T> worker)
    {
        for (int i = 0; i < maxWorkers; i++)
        {
            taskList.Add(new Task(() =>
            {
                string myname = "worker " + Guid.NewGuid().ToString();

                try
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {                     
                        var value = queue.Take();
                        waitingUnits--;
                        worker(value);
                    }
                }
                catch (Exception ex) when (ex is InvalidOperationException)  //throw when collection is closed with  CompleteAdding method. No pretty way to do this.
                {
                    //do nothing
                }
            }));
        }

        foreach (var task in taskList)
        {
            task.Start();
        }
    }

    //wait for all workers to be finish their jobs
    public void await()
    {
        while (waitingUnits >0 || !queue.IsAddingCompleted)
            Thread.Sleep(100);

        shutdown();
    }

    private void shutdown()
    {
        wasShutDown = true;
        Task.WaitAll(taskList.ToArray());            
    }

    //case something bad happen dismiss all pending work
    public void Dispose()
    {
        if (!wasShutDown)
        {
            queue.CompleteAdding();
            shutdown();
        }
    }
}

Then use like this:

WorkerPool<int> workerPool = new WorkerPool<int>(new CancellationTokenSource(), 5);

workerPool.startWorkers(value =>
{
    log.Debug(value);
});
//enqueue all the work
for (int i = 0; i < 100; i++)
{
    workerPool.enqueue(i);
}
//Signal no more work
workerPool.CompleteAdding();

//wait all pending work to finish
workerPool.await();

You can have as many polls has you like simply creating new WorkPool objects.



回答5:

This free nuget library here: CodeFluentRuntimeClient has a CustomThreadPool class that you can reuse. It's very configurable, you can change pool threads priority, number, COM apartment state, even name (for debugging), and also culture.



回答6:

Another approach is to use a Dataflow Pipeline. I added these later answer because i find Dataflows a much better approach for these kind of problem, the problem of having several thread pools. They provide a more flexible and structured approach and can easily scale vertically.

You can broke your code into one or more blocks, link then with Dataflows and let then the Dataflow engine allocate threads according to CPU and memory availability

I suggest to broke into 3 blocks, one for preparing the query to the site page , one access site page, and the last one to Analise the data. This way the slow block (get) may have more threads allocated to compensate.

Here how would look like the Dataflow setup:

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

prepareBlock.LinkTo(get, linkOptions);
getBlock.LinkTo(analiseBlock, linkOptions);

Data will flow from prepareBlock to getBlock and then to analiseBlock. The interfaces between blocks can be any class, just have to bee the same. See the full example on Dataflow Pipeline

Using the Dataflow would be something like this:

 while ...{
    ...
    prepareBlock.Post(...); //to send data to the pipeline
 }
 prepareBlock.Complete(); //when done
 analiseBlock.Completion.Wait(cancellationTokenSource.Token); //to wait for all queues to empty or cancel