-->

How to parallelize an azure worker role?

2019-08-10 08:56发布

问题:

I have got a Worker Role running in azure.

This worker processes a queue in which there are a large number of integers. For each integer I have to do processings quite long (from 1 second to 10 minutes according to the integer).

As this is quite time consuming, I would like to do these processings in parallel. Unfortunately, my parallelization seems to not be efficient when I test with a queue of 400 integers.

Here is my implementation :

  public class WorkerRole : RoleEntryPoint {
        private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
        private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
        private readonly Manager _manager = Manager.Instance;
        private static readonly LogManager logger = LogManager.Instance;

        public override void Run() {
            logger.Info("Worker is running");

            try {
                this.RunAsync(this.cancellationTokenSource.Token).Wait();
            }
            catch (Exception e) {
                logger.Error(e, 0, "Error Run Worker: " + e);
            }
            finally {
                this.runCompleteEvent.Set();
            }
        }

        public override bool OnStart() {
            bool result = base.OnStart();

            logger.Info("Worker has been started");

            return result;
        }

        public override void OnStop() {
            logger.Info("Worker is stopping");

            this.cancellationTokenSource.Cancel();
            this.runCompleteEvent.WaitOne();

            base.OnStop();

            logger.Info("Worker has stopped");
        }

        private async Task RunAsync(CancellationToken cancellationToken) {
            while (!cancellationToken.IsCancellationRequested) {
                try {
                    _manager.ProcessQueue();
                }
                catch (Exception e) {
                    logger.Error(e, 0, "Error RunAsync Worker: " + e);
                }
            }
            await Task.Delay(1000, cancellationToken);

        }
    }
}

And the implementation of the ProcessQueue:

  public void ProcessQueue() {
            try {

                _queue.FetchAttributes();

                int? cachedMessageCount = _queue.ApproximateMessageCount;

                if (cachedMessageCount != null && cachedMessageCount > 0) {

                    var listEntries = new List<CloudQueueMessage>();

                    listEntries.AddRange(_queue.GetMessages(MAX_ENTRIES));

                    Parallel.ForEach(listEntries, ProcessEntry);
                }
            }
            catch (Exception e) {
                logger.Error(e, 0, "Error ProcessQueue: " + e);
            }
}

And ProcessEntry

    private void ProcessEntry(CloudQueueMessage entry) {
        try {
            int id = Convert.ToInt32(entry.AsString);

            Service.GetData(id);

            _queue.DeleteMessage(entry);

        }
        catch (Exception e) {
            _queueError.AddMessage(entry);
            _queue.DeleteMessage(entry);
            logger.Error(e, 0, "Error ProcessEntry: " + e);
        }
    }

In the ProcessQueue function, I try with different values of MAX_ENTRIES: first =20 and then =2. It seems to be slower with MAX_ENTRIES=20, but whatever the value of MAX_ENTRIES is, it seems quite slow.

My VM is a A2 medium.

I really don't know if I do the parallelization correctly ; maybe the problem comes from the worker itself (which may be it is hard to have this in parallel).

回答1:

You haven't mentioned which Azure Messaging Queuing technology you are using, however for tasks where I want to process multiple messages in parallel I tend to use the Message Pump Pattern on Service Bus Queues and Subscriptions, leveraging the OnMessage() method available on both Service Bus Queue and Subscription Clients:

  • QueueClient OnMessage() - https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.queueclient.onmessage.aspx
  • SubscriptionClient OnMessage() - https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.subscriptionclient.onmessage.aspx
  • An overview of how this stuff works :-) - http://fabriccontroller.net/blog/posts/introducing-the-event-driven-message-programming-model-for-the-windows-azure-service-bus/

From MSDN:

When calling OnMessage(), the client starts an internal message pump that constantly polls the queue or subscription. This message pump consists of an infinite loop that issues a Receive() call. If the call times out, it issues the next Receive() call.

This pattern allows you to use a delegate (or anonymous function in my preferred case) that handles the receipt of the Brokered Message instance on a separate thread on the WaWorkerHost process. In fact, to increase the level of throughput, you can specify the number of threads that the Message Pump should provide, thereby allowing you to receive and process 2, 4, 8 messages from the queue in parallel. You can additionally tell the Message Pump to automagically mark the message as complete when the delegate has successfully finished processing the message. Both the thread count and AutoComplete instructions are passed in the OnMessageOptions parameter on the overloaded method.

public override void Run()
{
    var onMessageOptions = new OnMessageOptions()
    {
        AutoComplete = true, // Message-Pump will call Complete on messages after the callback has completed processing.
        MaxConcurrentCalls = 2 // Max number of threads the Message-Pump can spawn to process messages.
    };

    sbQueueClient.OnMessage((brokeredMessage) =>
    {

        // Process the Brokered Message Instance here

    }, onMessageOptions);

    RunAsync(_cancellationTokenSource.Token).Wait();
}

You can still leverage the RunAsync() method to perform additional tasks on the main Worker Role thread if required.

Finally, I would also recommend that you look at scaling your Worker Role instances out to a minimum of 2 (for fault tolerance and redundancy) to increase your overall throughput. From what I have seen with multiple production deployments of this pattern, OnMessage() performs perfectly when multiple Worker Role Instances are running.



回答2:

A few things to consider here:

  1. Are your individual tasks CPU intensive? If so, parallelism may not help. However, if they are mostly waiting on data processing tasks to be processed by other resources, parallelizing is a good idea.

  2. If parallelizing is a good idea, consider not using Parallel.ForEach for queue processing. Parallel.Foreach has two issues that prevent you from being very optimal:

    • The code will wait until all kicked off threads finish processing before moving on. So, if you have 5 threads that need 10 seconds each and 1 thread that needs 10 minutes, the overall processing time for Parallel.Foreach will be 10 minutes.

    • Even though you are assuming that all of the threads will start processing at the same time, Parallel.Foreach does not work this way. It looks at number of cores on your server and other parameters and generally only kicks off number of threads it thinks it can handle, without knowing too much about what's in those threads. So, if you have a lot of non-CPU bound threads that /can/ be kicked off at the same time without causing CPU over-utilization, default behaviour will not likely run them optimally.

How to do this optimally: I am sure there are a ton of solutions out there, but for reference, the way we've architected it in CloudMonix (that must kick off hundreds of independent threads and complete them as fast as possible) is by using ThreadPool.QueueUserWorkItem and manually keeping track number of threads that are running.

Basically, we use a Thread-safe collection to keep track of running threads that are started by ThreadPool.QueueUserWorkItem. Once threads complete, remove them from that collection. The queue-monitoring loop is indendent of executing logic in that collection. Queue-monitoring logic gets messages from the queue if the processing collection is not full up to the limit that you find most optimal. If there is space in the collection, it tries to pickup more messages from the queue, adds them to the collection and kick-start them via ThreadPool.QueueUserWorkItem. When processing completes, it kicks off a delegate that cleans up thread from the collection.

Hope this helps and makes sense