Linq calling task.Run accessing wrong member, defe

2019-08-08 17:32发布

I'm having a weird issue with a linq query using the wrong values. My code looks like this

await Task.WhenAll((from item in itemsToProcess
    let taskCount = count++
    select Task.Run(() => { process(item).Result; }))
    .AsParallel().ToArray());

Basically I have a list of 50k items that are getting called into a method that makes a web call. They are completely unrelated, can run in any order, and don't access anything shared. BUT, once in awhile, very randomly, it seems to pass the wrong item to the process method like you would encounter in a foreach loop if you didn't copy it to a local variable.

If I change my code to this

await Task.WhenAll((from item in itemsToProcess
    let taskCount = count++
    let itemCopy = item
    select Task.Run(() => { process(itemCopy).Result; }))
    .AsParallel().ToArray());

Then I don't seem to have this issue. So my question is, am I missing something, or is this expected behavior? I thought that the from clause of linq was supposed to copy to a local copy, but is that not the case? I'm having a hard time finding anything that addresses this directly. But I see plenty of examples of calling async methods inside a linq expression without doing the extra let.

I've also tried making the lambda async and awaiting the method, but then I run into no a threads situations. Maybe there is a completely better way of doing this? I would be happy to know about it. In a nutshell, all I'm doing is iterating a list and calling a method in parallel because it is I/O bound and not cpu bound. The other possibility is that there is tons of posts already about this, and I'm just searching for the wrong terminology. If so, I would be happy to know that as well.

1条回答
欢心
2楼-- · 2019-08-08 18:11

Parallel and asynchronous code rarely should be used together. Parallel is ideally just for CPU-bound code.

Why can't you just do this:

await Task.WhenAll(itemsToProcess.Select(item => process(item)));

Edit based on comments:

Asynchronous throttling is (somewhat) easily done by using SemaphoreSlim:

static SemaphoreSlim throttle = new SemaphoreSlim(50);
static async Task ProcessAsync(Item item)
{
  await throttle.WaitAsync();
  try
  {
    ... // Original process(item) code
  }
  finally
  {
    throttle.Release();
  }
}

This will throttle the item processing to 50. This is just a number I pulled out of the air; you should experiment with it a bit to find an appropriate value.

Note that the parallel processing throttling stopped working as soon as the work became asynchronous. Asynchronous work doesn't "take up" a thread, so it doesn't count against the parallel processing throttling (or thread pool injection rate throttling).

查看更多
登录 后发表回答