ConcurrentQueue .Net: Multithreaded consumer

2019-06-23 20:14发布

问题:

I had a very basic question which is more around concepts of ConcurrentQueue. A queue is FIFO. When multiple threads start accessing it, how do we guarantee FIFO? Suppose, I have added Apple, Oranges, Lemon, Peach and Apricot - in that order. The first TryTake should return Apple. But what happens when multiple threads starts giving their own TryTake requests? Would'nt there be a possibility that when one thread could return Lemon even before another thread could return Apple? I am assuming the other items will also be returned till the Queue is empty. But will these returns govern around the basic principles of FIFO?

回答1:

The behavior of the the ConcurrentQueue itself is always going to be FIFO.

When we talk about threads "returning" items from the ConcurrentQueue, we're talking about an operation that involves both dequeuing an item and performing some sort of an operation that enables you to observe what has been dequeued. Whether it's printing an output or adding that item to another list, you don't actually know which item has been taken out of the queue until you inspect it.

While the queue itself is FIFO, you can't predict the sequence in which those other events, such as inspecting the dequeued items, will occur. The items will be dequeued FIFO, but you may or may not be able to observe what comes out of the queue in that order. The different threads may not perform that inspection or output in exactly the same order in which they removed items from the queue.

In other words, it's going to happen FIFO but it may or may not always look like it. You wouldn't want to read from a ConcurrentQueue concurrently if the exact sequence in which the items handled was critical.

If you were to test this (I'm about to write something) then you'd probably find items getting processed in exact FIFO sequence most of the time, but then every once in a while they wouldn't be.


Here's a console app. It's going to

  • insert the numbers from 1-5000 in a ConcurrentQueue, single-threaded.
  • perform concurrent operations to dequeue each of those items and move them to another ConcurrentQueue. This is the "multithreaded consumer."
  • Read the items in the second queue (single threaded, again) and report any numbers that are out of sequence.

Many times I run it and nothing is out of sequence. But about 50% of the time it reports just a few numbers out of sequence. So if you were counting on all of the numbers getting processed in their original sequence it would happen with almost all of the numbers most of the time. But then it wouldn't. That's fine if you don't care about the exact sequence, but buggy and unpredictable if you do.

The conclusion - don't depend on the exact sequence of multithreaded operations.

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;

namespace ConcurrentQueueExperiment
{
    class Program
    {
        static void Main(string[] args)
        {
            var inputQueue = new ConcurrentQueue<int>();
            var outputQueue = new ConcurrentQueue<int>();
            Enumerable.Range(1,5000).ToList().ForEach(inputQueue.Enqueue);
            while (inputQueue.Any())
            {
                Task.Factory.StartNew(() =>
                {
                    int dequeued;
                    if (inputQueue.TryDequeue(out dequeued))
                    {
                        outputQueue.Enqueue(dequeued);
                    }
                });
            }
            int output = 0;
            var previous = 0;
            while (outputQueue.TryDequeue(out output))
            {
                if(output!=previous+1)
                    Console.WriteLine("Out of sequence: {0}, {1}", previous, output);
                previous = output;
            }
            Console.WriteLine("Done!");
            Console.ReadLine();
        }
    }
}