我有在Azure上运行辅助角色。
该工作进程,其中有大量的整数的队列。 对于每个整数I具有(根据整数从1秒到10分钟)以做处理相当长。
由于这是非常耗时的,我想在平行于做这些处理。 不幸的是,我的并行似乎当我和400点的整数的队列进行测试,以效率不高。
下面是我的实现:
public class WorkerRole : RoleEntryPoint {
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
private readonly Manager _manager = Manager.Instance;
private static readonly LogManager logger = LogManager.Instance;
public override void Run() {
logger.Info("Worker is running");
try {
this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
catch (Exception e) {
logger.Error(e, 0, "Error Run Worker: " + e);
}
finally {
this.runCompleteEvent.Set();
}
}
public override bool OnStart() {
bool result = base.OnStart();
logger.Info("Worker has been started");
return result;
}
public override void OnStop() {
logger.Info("Worker is stopping");
this.cancellationTokenSource.Cancel();
this.runCompleteEvent.WaitOne();
base.OnStop();
logger.Info("Worker has stopped");
}
private async Task RunAsync(CancellationToken cancellationToken) {
while (!cancellationToken.IsCancellationRequested) {
try {
_manager.ProcessQueue();
}
catch (Exception e) {
logger.Error(e, 0, "Error RunAsync Worker: " + e);
}
}
await Task.Delay(1000, cancellationToken);
}
}
}
而ProcessQueue的实现:
public void ProcessQueue() {
try {
_queue.FetchAttributes();
int? cachedMessageCount = _queue.ApproximateMessageCount;
if (cachedMessageCount != null && cachedMessageCount > 0) {
var listEntries = new List<CloudQueueMessage>();
listEntries.AddRange(_queue.GetMessages(MAX_ENTRIES));
Parallel.ForEach(listEntries, ProcessEntry);
}
}
catch (Exception e) {
logger.Error(e, 0, "Error ProcessQueue: " + e);
}
}
和ProcessEntry
private void ProcessEntry(CloudQueueMessage entry) {
try {
int id = Convert.ToInt32(entry.AsString);
Service.GetData(id);
_queue.DeleteMessage(entry);
}
catch (Exception e) {
_queueError.AddMessage(entry);
_queue.DeleteMessage(entry);
logger.Error(e, 0, "Error ProcessEntry: " + e);
}
}
在ProcessQueue功能,我尝试用MAX_ENTRIES的不同的值:第一= 20,然后= 2。 这似乎是与MAX_ENTRIES = 20慢,但MAX_ENTRIES的任何值,它似乎很慢。
我的VM是A2中。
我真的不知道,如果我做正确的并行化; 也许问题来自于工人本身(这可能是很难有这种并行)。