我在Azure上运行服务总线,抽每秒约10-100消息 。
最近,我切换到.NET 4.5和兴奋重构所有的代码有“异步”和每行“等待 ”至少两次,以确保它做“正确” :)
现在,我想知道是否确实是好还是坏 。 如果你可以看一下代码片段,让我知道你的想法是什么。 我如果线程上下文切换不给我比利多的悲伤,所有的异步...特别担心(看!dumpheap它绝对是一个因素)
描述的只是一点 - 我将张贴2种方法 - 一个没有在ConcurrentQueue while循环,等待新的消息,并且在同一时间发送一个消息,另一种方法。 我还使用完全一样Azure的博士规定的瞬态故障处理块。
发送回路(起步之初,等待新的消息):
private async void SendingLoop()
{
try
{
await this.RecreateMessageFactory();
this.loopSemaphore.Reset();
Buffer<SendMessage> message = null;
while (true)
{
if (this.cancel.Token.IsCancellationRequested)
{
break;
}
this.semaphore.WaitOne();
if (this.cancel.Token.IsCancellationRequested)
{
break;
}
while (this.queue.TryDequeue(out message))
{
try
{
using (message)
{
//only take send the latest message
if (!this.queue.IsEmpty)
{
this.Log.Debug("Skipping qeued message, Topic: " + message.Value.Topic);
continue;
}
else
{
if (this.Topic == null || this.Topic.Path != message.Value.Topic)
await this.EnsureTopicExists(message.Value.Topic, this.cancel.Token);
if (this.cancel.Token.IsCancellationRequested)
break;
await this.SendMessage(message, this.cancel.Token);
}
}
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
ex.LogError();
}
}
}
}
catch (OperationCanceledException)
{ }
catch (Exception ex)
{
ex.LogError();
}
finally
{
if (this.loopSemaphore != null)
this.loopSemaphore.Set();
}
}
发送一条消息:
private async Task SendMessage(Buffer<SendMessage> message, CancellationToken cancellationToken)
{
//this.Log.Debug("MessageBroadcaster.SendMessage to " + this.GetTopic());
bool entityNotFound = false;
if (this.MessageSender.IsClosed)
{
//this.Log.Debug("MessageBroadcaster.SendMessage MessageSender closed, recreating " + this.GetTopic());
await this.EnsureMessageSender(cancellationToken);
}
try
{
await this.sendMessageRetryPolicy.ExecuteAsync(async () =>
{
message.Value.Body.Seek(0, SeekOrigin.Begin);
using (var msg = new BrokeredMessage(message.Value.Body, false))
{
await Task.Factory.FromAsync(this.MessageSender.BeginSend, this.MessageSender.EndSend, msg, null);
}
}, cancellationToken);
}
catch (MessagingEntityNotFoundException)
{
entityNotFound = true;
}
catch (OperationCanceledException)
{ }
catch (ObjectDisposedException)
{ }
catch (Exception ex)
{
ex.LogError();
}
if (entityNotFound)
{
if (!cancellationToken.IsCancellationRequested)
{
await this.EnsureTopicExists(message.Value.Topic, cancellationToken);
}
}
}
上面的代码是从发送消息1 /秒的“发送方”类。 我在任何给定的时间运行约50-100情况下,所以它可能是一个相当数量的线程。
顺便说一句不用担心EnsureMessageSender,RecreateMessageFactory,EnsureTopicExists太多,他们不叫经常。
我会不只是有一个后台线程通过消息队列工作,并同步发送消息的更好,提供了所有我需要的是在同一时间发送一个消息,不用担心异步的东西,避免开销与它的到来。
需要注意的是,通常是几毫秒内以一个消息发送到Azure的服务总线,它不是真的很贵。 (除了在时候它的速度慢,超时或没有与服务总线后端的一个问题,它可以被挂了一段时间尝试发送的东西)。
感谢,并为长期职位对不起,
斯特沃
建议的解决方案
就这个例子是我的情况的解决方案?
static void Main(string[] args)
{
var broadcaster = new BufferBlock<int>(); //queue
var cancel = new CancellationTokenSource();
var run = Task.Run(async () =>
{
try
{
while (true)
{
//check if we are not finished
if (cancel.IsCancellationRequested)
break;
//async wait until a value is available
var val = await broadcaster.ReceiveAsync(cancel.Token).ConfigureAwait(false);
int next = 0;
//greedy - eat up and ignore all the values but last
while (broadcaster.TryReceive(out next))
{
Console.WriteLine("Skipping " + val);
val = next;
}
//check if we are not finished
if (cancel.IsCancellationRequested)
break;
Console.WriteLine("Sending " + val);
//simulate sending delay
await Task.Delay(1000).ConfigureAwait(false);
Console.WriteLine("Value sent " + val);
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}, cancel.Token);
//simulate sending messages. One every 200mls
for (int i = 0; i < 20; i++)
{
Console.WriteLine("Broadcasting " + i);
broadcaster.Post(i);
Thread.Sleep(200);
}
cancel.Cancel();
run.Wait();
}