-->

How to work threading with ConcurrentQueue

2019-01-11 07:18发布

问题:

I am trying to figure out what the best way of working with a queue will be. I have a process that returns a DataTable. Each DataTable, in turn, is merged with the previous DataTable. There is one problem, too many records to hold until the final BulkCopy (OutOfMemory).

So, I have determined that I should process each incoming DataTable immediately. Thinking about the ConcurrentQueue<T>...but I don't see how the WriteQueuedData() method would know to dequeue a table and write it to the database.

For instance:

public class TableTransporter
{
    private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>();

    public TableTransporter()
    {
        tableQueue.OnItemQueued += new EventHandler(WriteQueuedData);   // no events available
    }

    public void ExtractData()
    {
        DataTable table;

        // perform data extraction
        tableQueue.Enqueue(table);
    }

    private void WriteQueuedData(object sender, EventArgs e)
    {
        BulkCopy(e.Table);
    }
}

My first question is, aside from the fact that I don't actually have any events to subscribe to, if I call ExtractData() asynchronously will this be all that I need? Second, is there something I'm missing about the way ConcurrentQueue<T> functions and needing some form of trigger to work asynchronously with the queued objects?

Update I have just derived a class from ConcurrentQueue<T> that has an OnItemQueued event handler. Then:

new public void Enqueue (DataTable Table)
{
    base.Enqueue(Table);
    OnTableQueued(new TableQueuedEventArgs(Table));
}

public void OnTableQueued(TableQueuedEventArgs table)
{
    EventHandler<TableQueuedEventArgs> handler = TableQueued;

    if (handler != null)
    {
        handler(this, table);
    }
}

Any concerns about this implementation?

回答1:

From my understanding of the problem, you are missing a few things.

The concurrent queue is a data structure designed to accept multiple threads reading and writing to the queue without you needing to explicitly lock the data structure. (All that jazz is taken care of behind the scenes, or the collection is implemented in such a way that it doesn't need to take a lock.)

With that in mind, it looks like the pattern you are trying to use is the "Produce/Consumer". First, you have some tasks producing work (and adding items to the queue). And second you have a second task Consuming things from the queue (and dequeing items).

So really you want two threads: one adding items and a second removing items. Because you are using a concurrent collection, you can have multiple threads adding items and multiple threads removing items. But obviously the more contention you have on the concurrent queue the quicker that will become the bottleneck.



回答2:

I think ConcurrentQueue is useful only in very few cases. Its main advantage is that it is lock free. However, usually the producer thread(s) have to inform the consumer thread(s) somehow that there is data available to process. This signalling between threads needs locks and negates the benefit of using ConcurrentQueue. The fastest way to synchronise threads is using Monitor.Pulse(), which works only within a lock. All other synchronisation tools are even slower.

Of course, the consumer can just continuously check if there is something in the queue, which works without locks, but is a huge waste of processor resources. A little bit better is if the consumer waits between checking.

Raising a thread when writing to the queue is a very bad idea. Using ConcurrentQueue to save mabe 1 microsecond will be completely wasted by executing the eventhandler, which might take 1000 times longer.

If all the processing is done in an event handler or an async call, the question is why still a queue is needed ? Better pass the data directly to the handler and don't use a queue at all.

Please note that the implementation of ConcurrentQueue is rather complicated to allow concurrency. In most cases, better use a normal Queue<> and lock every access to the queue. Since the queue access needs only microseconds, it is extremely unlikely that 2 threads access the queue in the same microsecond and there will be hardly ever any delay because of locking. Using a normal Queue<> with locking will often result in faster code execution than ConcurrentQueue.



回答3:

This is the complete solution for what I came up with:

public class TableTransporter
{
    private static int _indexer;

    private CustomQueue tableQueue = new CustomQueue();
    private Func<DataTable, String> RunPostProcess;
    private string filename;

    public TableTransporter()
    {
        RunPostProcess = new Func<DataTable, String>(SerializeTable);
        tableQueue.TableQueued += new EventHandler<TableQueuedEventArgs>(tableQueue_TableQueued);
    }

    void tableQueue_TableQueued(object sender, TableQueuedEventArgs e)
    {
        //  do something with table
        //  I can't figure out is how to pass custom object in 3rd parameter
        RunPostProcess.BeginInvoke(e.Table,new AsyncCallback(PostComplete), filename);
    }

    public void ExtractData()
    {
        // perform data extraction
        tableQueue.Enqueue(MakeTable());
        Console.WriteLine("Table count [{0}]", tableQueue.Count);
    }

    private DataTable MakeTable()
    { return new DataTable(String.Format("Table{0}", _indexer++)); }

    private string SerializeTable(DataTable Table)
    {
        string file = Table.TableName + ".xml";

        DataSet dataSet = new DataSet(Table.TableName);

        dataSet.Tables.Add(Table);

        Console.WriteLine("[{0}]Writing {1}", Thread.CurrentThread.ManagedThreadId, file);
        string xmlstream = String.Empty;

        using (MemoryStream memstream = new MemoryStream())
        {
            XmlSerializer xmlSerializer = new XmlSerializer(typeof(DataSet));
            XmlTextWriter xmlWriter = new XmlTextWriter(memstream, Encoding.UTF8);

            xmlSerializer.Serialize(xmlWriter, dataSet);
            xmlstream = UTF8ByteArrayToString(((MemoryStream)xmlWriter.BaseStream).ToArray());

            using (var fileStream = new FileStream(file, FileMode.Create))
                fileStream.Write(StringToUTF8ByteArray(xmlstream), 0, xmlstream.Length + 2);
        }
        filename = file;

        return file;
    }

    private void PostComplete(IAsyncResult iasResult)
    {
        string file = (string)iasResult.AsyncState;
        Console.WriteLine("[{0}]Completed: {1}", Thread.CurrentThread.ManagedThreadId, file);

        RunPostProcess.EndInvoke(iasResult);
    }

    public static String UTF8ByteArrayToString(Byte[] ArrBytes)
    { return new UTF8Encoding().GetString(ArrBytes); }

    public static Byte[] StringToUTF8ByteArray(String XmlString)
    { return new UTF8Encoding().GetBytes(XmlString); }
}

public sealed class CustomQueue : ConcurrentQueue<DataTable>
{
    public event EventHandler<TableQueuedEventArgs> TableQueued;

    public CustomQueue()
    { }
    public CustomQueue(IEnumerable<DataTable> TableCollection)
        : base(TableCollection)
    { }

    new public void Enqueue (DataTable Table)
    {
        base.Enqueue(Table);
        OnTableQueued(new TableQueuedEventArgs(Table));
    }

    public void OnTableQueued(TableQueuedEventArgs table)
    {
        EventHandler<TableQueuedEventArgs> handler = TableQueued;

        if (handler != null)
        {
            handler(this, table);
        }
    }
}

public class TableQueuedEventArgs : EventArgs
{
    #region Fields
    #endregion

    #region Init
    public TableQueuedEventArgs(DataTable Table)
    {this.Table = Table;}
    #endregion

    #region Functions
    #endregion

    #region Properties
    public DataTable Table
    {get;set;}
    #endregion
}

As proof of concept, it seems to work pretty well. At most I saw 4 worker threads.