C#生产者/消费者(C# producer/consumer)

2019-09-03 07:56发布

最近我遇到一个生产者/消费者模式的C#实现。 这是非常简单(至少对我来说)非常优雅。

它似乎已经在2006年左右提出的,所以我在想,如果这个实现
- 安全
- 仍然适用

代码如下(原代码是在引用http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375 )

using System;  
using System.Collections;  
using System.Threading;

public class Test
{  
    static ProducerConsumer queue;

    static void Main()
    {
        queue = new ProducerConsumer();
        new Thread(new ThreadStart(ConsumerJob)).Start();

        Random rng = new Random(0);
        for (int i=0; i < 10; i++)
        {
            Console.WriteLine ("Producing {0}", i);
            queue.Produce(i);
            Thread.Sleep(rng.Next(1000));
        }
    }

    static void ConsumerJob()
    {
        // Make sure we get a different random seed from the
        // first thread
        Random rng = new Random(1);
        // We happen to know we've only got 10 
        // items to receive
        for (int i=0; i < 10; i++)
        {
            object o = queue.Consume();
            Console.WriteLine ("\t\t\t\tConsuming {0}", o);
            Thread.Sleep(rng.Next(1000));
        }
    }
}

public class ProducerConsumer
{
    readonly object listLock = new object();
    Queue queue = new Queue();

    public void Produce(object o)
    {
        lock (listLock)
        {
            queue.Enqueue(o);

            // We always need to pulse, even if the queue wasn't
            // empty before. Otherwise, if we add several items
            // in quick succession, we may only pulse once, waking
            // a single thread up, even if there are multiple threads
            // waiting for items.            
            Monitor.Pulse(listLock);
        }
    }

    public object Consume()
    {
        lock (listLock)
        {
            // If the queue is empty, wait for an item to be added
            // Note that this is a while loop, as we may be pulsed
            // but not wake up before another thread has come in and
            // consumed the newly added object. In that case, we'll
            // have to wait for another pulse.
            while (queue.Count==0)
            {
                // This releases listLock, only reacquiring it
                // after being woken up by a call to Pulse
                Monitor.Wait(listLock);
            }
            return queue.Dequeue();
        }
    }
}

Answer 1:

该代码是比旧的 - 我写了一些时间才.NET 2.0就出来了。 生产者/消费者队列的概念比,虽然旧的方式 :)

是的,代码是安全的,据我所知 - 但它有一些不足之处:

  • 这是不通用。 一个现代版肯定是通用的。
  • 它没有停止队列的方式。 停止队列(让所有的消费者线程退休)的一个简单的方法是有一个“停止工作”令牌,该令牌可以被放入队列中。 然后,你有线程添加尽可能多的令牌。 或者,你有一个独立的标志,表示要停止。 (这允许其他线程完成所有的当前工作队列前停下来。)
  • 如果工作是非常小的,在耗时一个作业可能不是做最有效的事。

代码背后的想法是比代码本身更重要的,是诚实的。



Answer 2:

你可以做类似下面的代码片段。 这是通用的,有排队-ING空值(或任何标记您想使用)来告诉工作线程退出的方法。

该代码是从这里取: http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ConsoleApplication1
{

    public class TaskQueue<T> : IDisposable where T : class
    {
        object locker = new object();
        Thread[] workers;
        Queue<T> taskQ = new Queue<T>();

        public TaskQueue(int workerCount)
        {
            workers = new Thread[workerCount];

            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (workers[i] = new Thread(Consume)).Start();
        }

        public void Dispose()
        {
            // Enqueue one null task per worker to make each exit.
            foreach (Thread worker in workers) EnqueueTask(null);
            foreach (Thread worker in workers) worker.Join();
        }

        public void EnqueueTask(T task)
        {
            lock (locker)
            {
                taskQ.Enqueue(task);
                Monitor.PulseAll(locker);
            }
        }

        void Consume()
        {
            while (true)
            {
                T task;
                lock (locker)
                {
                    while (taskQ.Count == 0) Monitor.Wait(locker);
                    task = taskQ.Dequeue();
                }
                if (task == null) return;         // This signals our exit
                Console.Write(task);
                Thread.Sleep(1000);              // Simulate time-consuming task
            }
        }
    }
}


Answer 3:

早在从上面这段代码和那天我学到Monitor.Wait /脉冲如何工作(以及许多有关一般线程) 系列文章是从。 因此,作为乔恩说,它有很多的价值,它不愧是安全和适用的。

然而,随着.NET 4中,有一个框架中的生产者-消费者队列实现 。 我才刚刚发现自己,但到这一点它我需要的一切。



Answer 4:

警告:如果你读的评论,你会明白我的答案是错的:)

有一个在你的代码可能的死锁

想象一下以下的情况下,为清楚起见,我用了一个单线程的方法,但应该很容易转换为多线程睡眠:

// We create some actions...
object locker = new object();

Action action1 = () => {
    lock (locker)
    {
        System.Threading.Monitor.Wait(locker);
        Console.WriteLine("This is action1");
    }
};

Action action2 = () => {
    lock (locker)
    {
        System.Threading.Monitor.Wait(locker);
        Console.WriteLine("This is action2");
    }
};

// ... (stuff happens, etc.)

// Imagine both actions were running
// and there's 0 items in the queue

// And now the producer kicks in...
lock (locker)
{
    // This would add a job to the queue

    Console.WriteLine("Pulse now!");
    System.Threading.Monitor.Pulse(locker);
}

// ... (more stuff)
// and the actions finish now!

Console.WriteLine("Consume action!");
action1(); // Oops... they're locked...
action2();

请不要让我知道如果这样做没有任何意义。

如果这被证实,那么回答了你的问题是,“不,这不是安全的”;)我希望这有助于。



文章来源: C# producer/consumer