Process queue with multithreading or tasks

2019-01-06 18:01发布

问题:

I have a telephony message application in which there are many many messages to be processed.Because telephone ports are limited, so the message will be processed first in first out. Each message has a flag 'Acknowledge' that indicates whichever is processed. It was initialized as false of course.

I want to put all messages into a queue then process them with multiple threads or tasks.

    public class MessageQueue
    {
        public Queue MessageWorkItem { get; set; }
        public Messages Message { get; set; }
        public MessageQueue()
        {
            MessageWorkItem = new Queue();
            Message = new Messages();
        }
        public void GetMessageMetaData()
        {
            try
            {
                // It is just a test, add only one item into the queue
                Message.MessageID = Guid.NewGuid();
                Message.NumberToCall = "1111111111";
                Message.FacilityID = "3333";
                Message.NumberToDial = "2222222222";
                Message.CountryCode = "1";
                Message.Acknowledge = false;
            }
            catch (Exception ex)
            {
            }
        }

        public void AddingItemToQueue()
        {
            GetMessageMetaData();
            if (!Message.Acknowledge)
            {
                lock (MessageWorkItem)
                {
                    MessageWorkItem.Enqueue(Message);
                }
            }
        }
    }

    public class Messages
    {
        public Guid MessageID { get; set; }
        public string NumberToCall { get; set; }
        public string FacilityID { get; set; }
        public string NumberToDial { get; set; }
        public string CountryCode { get; set; }
        public bool Acknowledge { get; set; }
    }

Now my question is how to dequeue the item from the queue with multithreading. For the each item from the queue, I want to run a script.

        public void RunScript(Message item)
        {
            try
            {
                PlayMessage(item); 
                return;
            }
            catch (HangupException hex)
            {
                Log.WriteWithId("Caller Hungup!", hex.Message);
            }
            catch (Exception ex)
            {
                Log.WriteException(ex, "Unexpected exception: {0}");
            }
        }

What I thought was to see if

if(MessageWorkItem.Count >= 1) Then doing something but I do need code help.

回答1:

If you can use .Net 4.5, I'd suggest looking at Dataflow from the the Task Parallel Library (TPL).

That page leads to a lot of example walkthroughs such as How to: Implement a Producer-Consumer Dataflow Pattern and Walkthrough: Using Dataflow in a Windows Forms Application.

Have a look at that documentation to see if it would help you. It's quite a lot to take in, but I think it would probably be your best approach.

Alternatively, you could look into using a BlockingCollection along with its GetConsumingEnumerable() method to access items in the queue.

What you do is to split up the work into objects that you want to process in some way, and use a BlockingCollection to manage the queue.

Some sample code using ints rather than objects as the work items will help to demonstrate this:

When a worker thread has finished with it's current item, it will remove a new item from the work queue, process that item, then add it to the output queue.

A separate consumer thread removes completed items from the output queue and does something with them.

At the end we must wait for all the workers to finish (Task.WaitAll(workers)) before we can mark the output queue as completed (outputQueue.CompleteAdding()).

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    class Program
    {
        static void Main(string[] args)
        {
            new Program().run();
        }

        void run()
        {
            int threadCount = 4;
            Task[] workers = new Task[threadCount];

            Task.Factory.StartNew(consumer);

            for (int i = 0; i < threadCount; ++i)
            {
                int workerId = i;
                Task task = new Task(() => worker(workerId));
                workers[i] = task;
                task.Start();
            }

            for (int i = 0; i < 100; ++i)
            {
                Console.WriteLine("Queueing work item {0}", i);
                inputQueue.Add(i);
                Thread.Sleep(50);
            }

            Console.WriteLine("Stopping adding.");
            inputQueue.CompleteAdding();
            Task.WaitAll(workers);
            outputQueue.CompleteAdding();
            Console.WriteLine("Done.");

            Console.ReadLine();
        }

        void worker(int workerId)
        {
            Console.WriteLine("Worker {0} is starting.", workerId);

            foreach (var workItem in inputQueue.GetConsumingEnumerable())
            {
                Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem);
                Thread.Sleep(100);          // Simulate work.
                outputQueue.Add(workItem);  // Output completed item.
            }

            Console.WriteLine("Worker {0} is stopping.", workerId);
        }

        void consumer()
        {
            Console.WriteLine("Consumer is starting.");

            foreach (var workItem in outputQueue.GetConsumingEnumerable())
            {
                Console.WriteLine("Consumer is using item {0}", workItem);
                Thread.Sleep(25);
            }

            Console.WriteLine("Consumer is finished.");
        }

        BlockingCollection<int> inputQueue = new BlockingCollection<int>();
        BlockingCollection<int> outputQueue = new BlockingCollection<int>();
    }
}


回答2:

Parallel.ForEach from TPL. It's parallel for-each.

Sample (changed MessageWorkItem to generic Queue):

    public class MessageQueue
{
    public Queue<Message> MessageWorkItem { get; set; }

    public MessageQueue()
    {
        MessageWorkItem = new Queue<Message>();
    }

    public Message GetMessageMetaData()
    {
        try
        {
            // It is just a test, add only one item into the queue
            return new Message()
            {
                MessageID = Guid.NewGuid(),
                NumberToCall = "1111111111",
                FacilityID = "3333",
                NumberToDial = "2222222222",
                CountryCode = "1",
                Acknowledge = false
            };
        }
        catch (Exception ex)
        {
            return null;
        }
    }

    public void AddingItemToQueue()
    {
        var message = GetMessageMetaData();
        if (!message.Acknowledge)
        {
            lock (MessageWorkItem)
            {
                MessageWorkItem.Enqueue(message);
            }
        }
    }
}

public class Message
{
    public Guid MessageID { get; set; }
    public string NumberToCall { get; set; }
    public string FacilityID { get; set; }
    public string NumberToDial { get; set; }
    public string CountryCode { get; set; }
    public bool Acknowledge { get; set; }
}

class Program
{
    static void Main(string[] args)
    {
        MessageQueue me = new MessageQueue();
        for (int i = 0; i < 10000; i++)
            me.AddingItemToQueue();

        Console.WriteLine(me.MessageWorkItem.Count);

        Parallel.ForEach(me.MessageWorkItem, RunScript);
    }

    static void RunScript(Message item)
    {
        // todo: ...
        Console.WriteLine(item.MessageID);
        Thread.Sleep(300);
    }
}