Situation
We have one message queue. We would like to process messages in parallel and limit the number of simultaneously processed messages.
Our trial code below does process messages in parallel, but it only starts a new batch of processes when the previous one is finished. We would like to restart Tasks as they finish.
In other words: The maximum number of Tasks should always be active as long as the message queue is not empty.
Trial code
static string queue = @".\Private$\concurrenttest";
private static void Process(CancellationToken token)
{
Task.Factory.StartNew(async () =>
{
while (true)
{
IEnumerable<Task> consumerTasks = ConsumerTasks();
await Task.WhenAll(consumerTasks);
await PeekAsync(new MessageQueue(queue));
}
});
}
private static IEnumerable<Task> ConsumerTasks()
{
for (int i = 0; i < 15; i++)
{
Command1 message;
try
{
MessageQueue msMq = new MessageQueue(queue);
msMq.Formatter = new XmlMessageFormatter(new Type[] { typeof(Command1) });
Message msg = msMq.Receive();
message = (Command1)msg.Body;
}
catch (MessageQueueException mqex)
{
if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
yield break; // nothing in queue
else throw;
}
yield return Task.Run(() =>
{
Console.WriteLine("id: " + message.id + ", name: " + message.name);
Thread.Sleep(1000);
});
}
}
private static Task<Message> PeekAsync(MessageQueue msMq)
{
return Task.Factory.FromAsync<Message>(msMq.BeginPeek(), msMq.EndPeek);
}
EDIT
I spent a lot of time thinking about reliability of the pump - specifically if a message is received from the
MessageQueue
, cancellation becomes tricky - so I provided two ways to terminate the queue:CancellationToken
stops the pipeline as quickly as possible and will likely result in dropped messages.MessagePump.Stop()
terminates the pump but allows all messages which have already been taken from the queue to be fully processed before theMessagePump.Completion
task transitions toRanToCompletion
.The solution uses TPL Dataflow (NuGet: Microsoft.Tpl.Dataflow).
Full implementation:
Usage:
Untidy but functional unit tests:
https://gist.github.com/KirillShlenskiy/7f3e2c4b28b9f940c3da
ORIGINAL ANSWER
As mentioned in my comment, there are established producer/consumer patterns in .NET, one of which is pipeline. An excellent example of such can be found in "Patterns of Parallel Programming" by Microsoft's own Stephen Toub (full text here: https://www.microsoft.com/en-au/download/details.aspx?id=19222, page 55).
The idea is simple: producers continuously throw stuff in a queue, and consumers pull it out and process (in parallel to producers and possibly one another).
Here's an example of a message pipeline where the consumer uses synchronous, blocking methods to process the items as they arrive (I've parallelised the consumer to suit your scenario):
The above code completes in approx 130-140 ms, which is exactly what you would expect given the parallelisation of the consumers.
Now, in your scenario you are using
Task
s andasync
/await
better suited to TPL Dataflow (official Microsoft supported library tailored to parallel and asynchronous sequence processing).Here's a little demo showing the different types of TPL Dataflow processing blocks that you would use for the job:
It's not hard to adapt the above to use your
MessageQueue
and you can be sure that the end result will be free of threading issues. I'll do just that if I get a bit more time today/tomorrow.You have one collection of things you want to process. You create another collection for things being processed (this could be your task objects or items of some sort that reference a task).
You create a loop that will repeat as long as you have work to do. That is, work items are waiting to be started or you still have work items being processed.
At the start of the loop you populate your active task collection with as many tasks as you want to run concurrently and you start them as you add them.
You let the things run for a while (like Thread.Sleep(10);).
You create an inner loop that checks all your started tasks for completion. If one has completed, you remove it and report the results or do whatever seems appropriate.
That's it. On the next turn the upper part of your outer loop will add tasks to your running tasks collection until the number equals the maximum you have set, keeping your work-in-progress collection full.
You may want to do all this on a worker thread and monitor cancel requests in your loop.
The task library in .NET is made to execute a number of tasks in parallell. While there are ways to limit the number of active tasks, the library itself will limit the number of active tasks according to the computers CPU.
The first question that needs to be answered is why do you need to create another limit? If the limit imposed by the task library is OK, then you can just keep create tasks and rely on the task library to execute it with good performance.
If this is OK, then as soon as you get a message from MSMQ just start a task to process the message, skip the waiting (WhenAll call), start over and wait for the next message.
You can limit the number of concurrent tasks by using a custom task scheduler. More on MSDN: https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler%28v=vs.110%29.aspx.
My colleague came up with the solution below. This solution works, but I'll let this code be reviewed on Code Review.
Based on answers given and some research of our own, we've come to a solution. We're using a
SemaphoreSlim
to limit our number of parallel Tasks.You should look at using Microsoft's Reactive Framework for this.
You code could look like this:
This does all of the processing in parallel, and ensures that the processing is properly distributed across all cores. When one value ends another starts.
To cancel the subscription just call
subscription.Dispose()
.The code for
FromQueue
is:Just NuGet "Rx-Main" to get the bits.
In order to limit the concurrency you can do this: