I am new to multi threading concepts. I need to add certain number of strings to a queue and process them with multiple threads. Using ConcurrentQueue
which is thread safe.
This is what I have tried. But all the items added into concurrent queue are not processed. only first 4 items are processed.
class Program
{
ConcurrentQueue<string> iQ = new ConcurrentQueue<string>();
static void Main(string[] args)
{
new Program().run();
}
void run()
{
int threadCount = 4;
Task[] workers = new Task[threadCount];
for (int i = 0; i < threadCount; ++i)
{
int workerId = i;
Task task = new Task(() => worker(workerId));
workers[i] = task;
task.Start();
}
for (int i = 0; i < 100; i++)
{
iQ.Enqueue("Item" + i);
}
Task.WaitAll(workers);
Console.WriteLine("Done.");
Console.ReadLine();
}
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
string op;
if(iQ.TryDequeue(out op))
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, op);
}
Console.WriteLine("Worker {0} is stopping.", workerId);
}
}
I've actually been working with
ConcurrentQueue
quite a bit recently and thought I'd share this. I've create a customConcurrentQueue
calledCQItems
that has methods to build itself with given parameters. Internally, when you tell it to build x number of y items, it makes aParallel.For
call to the item constructors. The benefit here is that when a method or function callsCQItems myCQ = CQItems.Generate(x, y)
that call comes from the base application thread, meaning that nothing can look at the queue until it is finished building. But internally to the queue class, it's building with threads, and is significantly quicker than just using aList<>
orQueue<>
. Mostly it's generating things out of thin air, but it also sometimes (based on params) is creating items from SQL - basically generating objects based on existing data. At any rate, these are the two methods in theCQItems
class that can help with this:Some useful notes:
Supplying the
loopState
variable in theParallel.For()
allows us to set the state to stop if an exception is caught. This is nice because if your loop is asked to do 1000 things, and the 5th iteration throws an exception, it's going to keep looping. You may want it to, but in my case an exception needs to exit the threaded loop. You'll still end up with anAggregateException
coming out of it (apparently, that's just what happens when threads throw exception). Parsing those out and only sending the first one can save a LOT of time and headaches trying to weed through a giant exception group where later exceptions may (or may not) have been caused due to the first anyway.As for the rethrows, I try to add a catch statement for most expected types of exceptions even if I plan to just throw them up the stack anyway. Part of this is for troubleshooting (being able to break on specific exceptions can be handy). Part of it is because sometimes I want to be able to do other things, such as stopping the loop, changing or adding to the exception message, or in the case of breaking apart the
AggregateException
, only send one exception back up the stack rather than the whole aggregate. Just a point of clarification for anyone who might be looking at this.Lastly, in case it's confusing, the
Type(T)
value is coming from myCQItems
class itself:Your workers take one item out of the
queue
and then finish the work, just let them work tillqueue
is empty.Replace
if
in worker function withwhile
As you will run it you will see, that near all items will be processed by two workers. Reason: your cpu has two cores, both are working and there is no "free tiem slot" to create new task. If you want to have all your 4 task to process items, you could add a delay to give your processor time to create anotehr tasks, something like:
that gives you output, that you want:
There are a couple of issues with your implementation. The first and obvious one is that the
worker
method only dequeues zero or one item and then stops:It should be:
That however won't be enough to make your program work properly. If your workers are dequeueing faster than the main thread is enqueueing, they will stop while the main task is still enqueueing. You need to signal the workers that they can stop. You can define a boolean variable that will be set to
true
once enqueueing is done:The workers will check the value: