可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
I'm writing a windows service to consume MSMQ messages. The service will have periods of high activity (80k messages coming in very quickly) and long periods of inactivity (could be several days without a new message).
Processing the messages is very network-bound, so I get a big benefit out of parallelism. But during periods of inactivity, I don't want to tie up a bunch of threads waiting for messages that aren't coming anytime soon.
The MSMQ interface seems to be very focused on a synchronous workflow - get one message, process it, get another, etc. How should I structure my code so that I can take advantage of parallelism during periods of high activity but not tie up a bunch of threads during periods of no activity? Bonus points for using the TPL. Pseudocode would be appreciated.
回答1:
I have done allot of MSMQ (including mobile implementations) over the years and you are correct in the characterization of "synchronous workflow." It's not that you can't take the various message envelops and process them across the different cores via TPL ... the limiting factor is reading / writing to the queue ... inherently a serial operation. You can't send 8 messages at once (a computer with 8 cores) for example.
I had a similar need (without using the System.Messaging namespace) and solved it with some help from a book I read "Parallel Programming with Microsoft.NET" by Campbell and Johnson.
Check out their “parallel tasks” chapter and specifically the part of using a global queue that cooperates with per-thread local queues for work processing (i.e., the TPL) that use a “work stealing” algorithm to perform load balancing. I modeled my solution, in part, after their example. The final version of my system had a huge difference in its performance (from 23 messages per second to over 200).
Depending on how long it takes your system to go from 0 to the 80,000, you’ll want to take the same design and spread it across multiple servers (each with multiple processors and multiple cores). In theory my setup would require a little less than 7 minute to polish off all 80K, so by adding a 2nd computer it would cut that down to about ~3 minutes and 20 seconds, etc., etc., etc. The trick is the work stealing logic.
Food for thought …
A quick edit: BTW the computer is a Dell T7500 workstation with dual quad core Xeons @ 3GHz, 24 GB of RAM, Windows 7 Ultimate 64-bit edition.
回答2:
Here is a simplified version of what I wound up doing:
while(true) {
int msgCount = 0;
Parallel.ForEach(Enumerable.Range(0,20), (i) => {
MessageQueue queue = new MessageQueue(_queuePath);
try {
msg = queue.Receive(TimeSpan.Zero);
// do work
Interlocked.Increment(ref msgCount);
catch(MessageQueueException mqex) {
if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) {
return; // nothing in queue
}
else throw;
}
}
if (msgCount < 20)
Thread.Sleep(1000); // nothing more to do, take a break
}
So I try to get messages 20 at a time, counting the ones I receive. For those 20, I let the TPL go to town. At the end, if I've processed less than 20 messages, the queue is empty and I sleep the thread for a second before trying again.
回答3:
NServiceBus has a great concept for this very problem. It's called the Distributor. The idea is that the distributor can forward the work to be done and distribute it across any number of running child nodes. Depending upon the type of working being done, e.g. heavy computations vs disk writes, you could distribute that over multiple process or even multiple machines.
回答4:
The solution also depends partly on how the messages are being handled.
I used a WorkflowService hosted in Windows Server AppFabric with Net.Msmq binding and a transactional queue. The transactional net.msmq binding was needed to handle out of order message processing. The workflow is a .Net 4.0.1 state machine, and messages are coming onto the same queue from different systems. It is possible, for example, to have one system send an update to a state machine instance before another system has sent a message to instantiate it. To enable out of order message processing the workflow service host uses BufferedReceive to lock the messages, and repeatedly retries getting them from the lock subqueue. BufferedReceive has max pending messages set to the maximum likely batch length, because messages in the locked queue are returned to the retry queue at the front.
WF also has a number of throttling settings. My max likely batch length is about 20000. I have set the MaxConcurrentCalls to 64, and the MaxConcurrentInstances to 20000. This results in IIS/WAS handles 64 concurrent calls.
However, and this is the thing, because the Receives in the workflow are one-way, this does not means that the spawned WF processes terminate as soon as the Receive completes. What happens next in my scenario is that after the message is dequeued and a WF instance called, which is one of the 64 calls, a number of following steps are scheduled by the workflow engine and one of those is a database operation.
The gotcha is that 64 calls may be the maximum, but if the rate of message drain is higher than the rate of asynchronous process completion, as the incoming batch of messages are processed there will be a higher number of executing threads (in my case WF instances). This can cause unexpected things to happen, for example the ADO.NET connection pool has a default of 100 as maximum number of connections. This will cause processes to timeout waiting for connections from an exhausted pool. For this particular problem, you could either raise the MaxPoolSize value, or use Service Broker to deal with db operations asynchronously too (which means more complexity in the workflow).
Hope this helps someone.
回答5:
Here's how I do it(changing the code on the fly so there might be spelling mistakes):
for (int i = 0; i < numberOfSimultaneousRequests; i++)
priorityQueue.BeginReceive(TimeSpan.FromDays(30), state, callback);
and the callback looks something like this:
private void ProcessMessage(IAsyncResult asyncResult)
{
try
{
Message msg = priorityQueue.EndReceive(asyncResult);
//Do something with the message
}
finally
{
priorityQueue.BeginDequeue(null, ProcessMessage);//start processing another one
}
回答6:
Just trying somehow similar, tpl seems to be able to throw some sort of thread safety exception where it runs into physical issues, eg try create a sqlconnection outside of the tpl foreach and use it within the loop body - it threw an exception for me. I new up a queue before entering the body, enumerating over a list of strings and it seemed ok, my code was processing 10000 items consistantly under 500 ms using 1way messaging on a i7 2500 8gb and local msmq
回答7:
I have found a complete solution on a blog called CodeRonin. It seems to me that this is the only complete example in the whole internet. Thank you, CodeRonin!
http://code-ronin.blogspot.de/2008/09/msmq-transactional-message-processing.html