-->

如何并行蔚蓝的辅助角色?(How to parallelize an azure worker ro

2019-10-24 03:09发布

我有在Azure上运行辅助角色。

该工作进程,其中有大量的整数的队列。 对于每个整数I具有(根据整数从1秒到10分钟)以做处理相当长。

由于这是非常耗时的,我想在平行于做这些处理。 不幸的是,我的并行似乎当我和400点的整数的队列进行测试,以效率不高。

下面是我的实现:

  public class WorkerRole : RoleEntryPoint {
        private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
        private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
        private readonly Manager _manager = Manager.Instance;
        private static readonly LogManager logger = LogManager.Instance;

        public override void Run() {
            logger.Info("Worker is running");

            try {
                this.RunAsync(this.cancellationTokenSource.Token).Wait();
            }
            catch (Exception e) {
                logger.Error(e, 0, "Error Run Worker: " + e);
            }
            finally {
                this.runCompleteEvent.Set();
            }
        }

        public override bool OnStart() {
            bool result = base.OnStart();

            logger.Info("Worker has been started");

            return result;
        }

        public override void OnStop() {
            logger.Info("Worker is stopping");

            this.cancellationTokenSource.Cancel();
            this.runCompleteEvent.WaitOne();

            base.OnStop();

            logger.Info("Worker has stopped");
        }

        private async Task RunAsync(CancellationToken cancellationToken) {
            while (!cancellationToken.IsCancellationRequested) {
                try {
                    _manager.ProcessQueue();
                }
                catch (Exception e) {
                    logger.Error(e, 0, "Error RunAsync Worker: " + e);
                }
            }
            await Task.Delay(1000, cancellationToken);

        }
    }
}

而ProcessQueue的实现:

  public void ProcessQueue() {
            try {

                _queue.FetchAttributes();

                int? cachedMessageCount = _queue.ApproximateMessageCount;

                if (cachedMessageCount != null && cachedMessageCount > 0) {

                    var listEntries = new List<CloudQueueMessage>();

                    listEntries.AddRange(_queue.GetMessages(MAX_ENTRIES));

                    Parallel.ForEach(listEntries, ProcessEntry);
                }
            }
            catch (Exception e) {
                logger.Error(e, 0, "Error ProcessQueue: " + e);
            }
}

和ProcessEntry

    private void ProcessEntry(CloudQueueMessage entry) {
        try {
            int id = Convert.ToInt32(entry.AsString);

            Service.GetData(id);

            _queue.DeleteMessage(entry);

        }
        catch (Exception e) {
            _queueError.AddMessage(entry);
            _queue.DeleteMessage(entry);
            logger.Error(e, 0, "Error ProcessEntry: " + e);
        }
    }

在ProcessQueue功能,我尝试用MAX_ENTRIES的不同的值:第一= 20,然后= 2。 这似乎是与MAX_ENTRIES = 20慢,但MAX_ENTRIES的任何值,它似乎很慢。

我的VM是A2中。

我真的不知道,如果我做正确的并行化; 也许问题来自于工人本身(这可能是很难有这种并行)。

Answer 1:

你没有提到你正在使用的Azure的消息队列技术,但是,对于在这里我要并行处理多个任务的消息我倾向于使用的服务总线队列和订阅消息泵模式 ,借力onMessage()方法上都可用服务总线队列和订阅的客户:

  • QueueClient的onMessage() - https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.queueclient.onmessage.aspx
  • SubscriptionClient的onMessage() - https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.subscriptionclient.onmessage.aspx
  • 这个东西是如何工作:-)概述- http://fabriccontroller.net/blog/posts/introducing-the-event-driven-message-programming-model-for-the-windows-azure-service-bus/

从MSDN:

当调用的onMessage(),客户端启动内部消息泵不断轮询队列或订阅。 此消息泵由一个无限循环发出一个接收()调用的。 如果调用超时,它会发出一个接收()调用。

这种模式允许你使用一个处理的WaWorkerHost过程中一个单独的线程在收到经纪公司的消息实例中的代表(或者以我的情况下,匿名函数)。 事实上,以增加吞吐量的水平,则可以指定消息泵应该提供的线程的数目,从而允许用户从该队列中并行接收和处理2,4,8个消息。 你还可以告诉消息泵,自动将邮件标记为已完成后,委托已成功完成处理消息。 无论是线程数,并自动完成指令的重载方法的参数OnMessageOptions传递。

public override void Run()
{
    var onMessageOptions = new OnMessageOptions()
    {
        AutoComplete = true, // Message-Pump will call Complete on messages after the callback has completed processing.
        MaxConcurrentCalls = 2 // Max number of threads the Message-Pump can spawn to process messages.
    };

    sbQueueClient.OnMessage((brokeredMessage) =>
    {

        // Process the Brokered Message Instance here

    }, onMessageOptions);

    RunAsync(_cancellationTokenSource.Token).Wait();
}

您还可以利用RunAsync()方法,如果所需的主要辅助角色线程上执行其他任务。

最后,我也建议你看看你的缩放工作者角色实例出至少2(用于容错和冗余),以提高整体的吞吐量。 从我对这种模式的多个生产部署看到的onMessage()执行完全当多个工作者角色实例正在运行。



Answer 2:

有几件事情要考虑这里:

  1. 是你个人的任务CPU密集型? 如果是这样,并行可能无法帮助。 但是,如果他们大多在数据处理任务等待由其他资源进行处理,并行化是一个好主意。

  2. 如果并行化是一个好主意,考虑不使用Parallel.ForEach队列处理。 Parallel.Foreach有阻止你是非常优化的两个问题:

    • 该代码会等到所有开锣线程在移动之前完成处理。 所以,如果你有5个线程需要每10秒1个线程需要10分钟后,整个处理时间为Parallel.Foreach将是10分钟。

    • 即使你是假设所有的线程将在同一时间开始处理,Parallel.Foreach不以这种方式工作。 它着眼于服务器上的内核和其它参数的数量一般只有揭开序幕的线程数它认为它可以处理,没有太多了解什么是在那些线程。 所以,如果你有很多非CPU绑定线程/可/可同时拉开序幕,而不会导致CPU过度利用的,默认的行为不太可能最佳运行它们。

如何做到这一点最佳:我相信有一吨的解决方案在那里,但作参考,途中我们在架构它CloudMonix (必须揭开序幕,数百个独立的线程,并完成他们尽可能快地)是使用ThreadPool.QueueUserWorkItem和手动保存正在运行的线程的曲目编号。

基本上,我们使用一个线程安全集合保持运行由ThreadPool.QueueUserWorkItem启动线程的轨道。 一旦线程完成后,从该集合中删除。 队列监测循环是indendent该集合中执行的逻辑。 队列监视逻辑会从队列中的消息,如果处理收集不充分,对您找到最优化的极限。 如果集合中的空间,它试图以皮卡从队列中更多的消息,将它们添加到集合,并通过ThreadPool.QueueUserWorkItem脚踏启动它们。 当处理完成时,它揭开序幕,从收集清理线程的委托。

希望这有助于和有意义



文章来源: How to parallelize an azure worker role?