Azure的ServiceBus和异步 - 生存还是毁灭?(Azure ServiceBus & a

2019-08-18 00:02发布

我在Azure上运行服务总线,抽每秒10-100消息

最近,我切换到.NET 4.5和兴奋重构所有的代码有“异步”和每行“等待 ”至少两次,以确保它做“正确” :)

现在,我想知道是否确实是好还是坏 。 如果你可以看一下代码片段,让我知道你的想法是什么。 我如果线程上下文切换不给我比利多的悲伤,所有的异步...特别担心(看!dumpheap它绝对是一个因素)

描述的只是一点 - 我将张贴2种方法 - 一个没有在ConcurrentQueue while循环,等待新的消息,并且在同一时间发送一个消息,另一种方法。 我还使用完全一样Azure的博士规定的瞬态故障处理块。

发送回路(起步之初,等待新的消息):

private async void SendingLoop()
    {
        try
        {
            await this.RecreateMessageFactory();

            this.loopSemaphore.Reset();
            Buffer<SendMessage> message = null;

            while (true)
            {
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }
                this.semaphore.WaitOne();
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }

                while (this.queue.TryDequeue(out message))
                {                       
                    try
                    {
                        using (message)
                        {
                            //only take send the latest message
                            if (!this.queue.IsEmpty)
                            {
                                this.Log.Debug("Skipping qeued message, Topic: " + message.Value.Topic);
                                continue;
                            }
                            else
                            {
                                if (this.Topic == null || this.Topic.Path != message.Value.Topic)
                                    await this.EnsureTopicExists(message.Value.Topic, this.cancel.Token);

                                if (this.cancel.Token.IsCancellationRequested)
                                    break;
                                await this.SendMessage(message, this.cancel.Token);
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        break;
                    }
                    catch (Exception ex)
                    {
                        ex.LogError();
                    }
                }
            }
        }
        catch (OperationCanceledException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }
        finally
        {
            if (this.loopSemaphore != null)
                this.loopSemaphore.Set();
        }
    }

发送一条消息:

private async Task SendMessage(Buffer<SendMessage> message, CancellationToken cancellationToken)
    {
        //this.Log.Debug("MessageBroadcaster.SendMessage to " + this.GetTopic());
        bool entityNotFound = false;

        if (this.MessageSender.IsClosed)
        {
            //this.Log.Debug("MessageBroadcaster.SendMessage MessageSender closed, recreating " + this.GetTopic());
            await this.EnsureMessageSender(cancellationToken);
        }

        try
        {
            await this.sendMessageRetryPolicy.ExecuteAsync(async () =>
            {
                message.Value.Body.Seek(0, SeekOrigin.Begin);
                using (var msg = new BrokeredMessage(message.Value.Body, false))
                {
                    await Task.Factory.FromAsync(this.MessageSender.BeginSend, this.MessageSender.EndSend, msg, null);
                }
            }, cancellationToken);
        }
        catch (MessagingEntityNotFoundException)
        {
            entityNotFound = true;                
        }
        catch (OperationCanceledException)
        { }
        catch (ObjectDisposedException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }

        if (entityNotFound)
        {
            if (!cancellationToken.IsCancellationRequested)
            {
                await this.EnsureTopicExists(message.Value.Topic, cancellationToken);
            }
        }
    }

上面的代码是从发送消息1 /秒的“发送方”类。 我在任何给定的时间运行约50-100情况下,所以它可能是一个相当数量的线程。

顺便说一句不用担心EnsureMessageSender,RecreateMessageFactory,EnsureTopicExists太多,他们不叫经常。

我会不只是有一个后台线程通过消息队列工作,并同步发送消息的更好,提供了所有我需要的是在同一时间发送一个消息,不用担心异步的东西,避免开销与它的到来。

需要注意的是,通常是几毫秒内以一个消息发送到Azure的服务总线,它不是真的很贵。 (除了在时候它的速度慢,超时或没有与服务总线后端的一个问题,它可以被挂了一段时间尝试发送的东西)。

感谢,并为长期职位对不起,

斯特沃

建议的解决方案

就这个例子是我的情况的解决方案?

static void Main(string[] args)
    {
        var broadcaster = new BufferBlock<int>(); //queue
        var cancel = new CancellationTokenSource();

        var run = Task.Run(async () =>
        {
            try
            {
                while (true)
                {
                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;                       

                    //async wait until a value is available
                    var val = await broadcaster.ReceiveAsync(cancel.Token).ConfigureAwait(false);
                    int next = 0;

                    //greedy - eat up and ignore all the values but last
                    while (broadcaster.TryReceive(out next))
                    {
                        Console.WriteLine("Skipping " + val);
                        val = next;
                    }

                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;

                    Console.WriteLine("Sending " + val);

                    //simulate sending delay
                    await Task.Delay(1000).ConfigureAwait(false); 

                    Console.WriteLine("Value sent " + val);                        
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }

        }, cancel.Token);

        //simulate sending messages. One every 200mls 
        for (int i = 0; i < 20; i++)
        {
            Console.WriteLine("Broadcasting " + i);
            broadcaster.Post(i);
            Thread.Sleep(200);
        }

        cancel.Cancel();
        run.Wait();
    }

Answer 1:

你说:

上面的代码是从发送消息1 /秒的“发送方”类。 我在任何给定的时间运行约50-100情况下,所以它可能是一个相当数量的线程。

这是异步一个很好的案例。 在这里,您节省大量的线程。 异步,因为它不是基于线程的, 减少上下文切换。 它没有上下文切换中,需要等待一些情况。 取而代之的是,接下来的工作项目被同一个线程上处理(如果有的话)。

出于这个原因,你的异步解决方案一定规模比一个同步更好。 无论它实际上是在您的工作流程50-100实例使用较少的CPU需要测量。 在多个实例也有异步正在变快的概率越高。

现在,有一个问题与落实:您正在使用ConcurrentQueue这不是异步就绪。 所以,你实际上做甚至在你的异步版本中使用50-100线程。 他们将阻止(您想避免),或忙碌等待燃烧的100%的CPU(这似乎是在你的执行的情况下!)。 你需要摆脱这个问题,使排队异步了。 也许SemaphoreSlim是帮助在这里,因为它可以异步侍候。



Answer 2:

首先,请记住, Task != Thread 。 任务(和async方法延续)被安排线程池,其中微软已经投入吨只要你的任务是相当短的是创造奇迹的优化。

回顾你的代码,一行提出了一个标志: semaphore.WaitOne 。 我假设你正在使用这个作为一种信号,有在队列中的数据。 这是不好的,因为它是一个内部的阻塞等待async方法。 通过使用阻塞等待,代码从一个轻量级的延续变成更重的线程池线程。

所以,我会关注@ USR的建议,更换队列(和信号)与async就绪的队列。 TPL数据流的BufferBlock<T>是一个async可用-ready生产者/消费者队列通过的NuGet 。 我首先推荐这一个,因为它听起来像你的项目可以从使用数据流比更广泛就像一个队列中受益(但队列是一个很好的地方开始)。

其它async -ready数据结构存在; 我AsyncEx库有几个人。 这也不难给自己建立一个简单的一个; 我有一个关于这个问题的博客文章 。 但我建议T​​PL数据流在你的情况。



文章来源: Azure ServiceBus & async - To be, or not to be?