Parallel.ForEach not setting all values in loop

2019-08-15 06:06发布

问题:

I am querying a sql data base for some employees. When i receive these employees I and looping each one using the Parallel.ForEach.

The only reason I'm looping he employees that was retrieved from the database is so expand a few of the properties that I do not want to clutter up the data base with.

Never-the-less in this example I am attempting to set the Avatar for the current employee in the loop, but only one out of three always gets set, none of the other employees Avatar ever gets set to their correct URI. Basically, I'm taking the file-name of the avatar and building the full path to the users folder.

What am I doing wrong here to where each employees Avatar is not updated with the full path to their directory, like the only one that is being set? Parallel stack and there is in deep four

I'm sure I've got the code formatted incorrectly. I've looked at that Parallel Task and it does in deep create 4 Parallel Task on 6 Threads.

Can someone point out to me the correct way to format the code to use Parallel?

Also, one thing, if I remove the return await Task.Run()=> from the GetEmployees method I get an error of cannot finish task because some other task fished first.

The Parallel is acting as if it is only setting one of the Avatars for one of the employees.

---Caller

   public async static Task<List<uspGetEmployees_Result>> GetEmployess(int professionalID, int startIndex, int pageSize, string where, string equals)
{
    var httpCurrent = HttpContext.Current;

    return await Task.Run(() =>
        {
            List<uspGetEmployees_Result> emps = null;
            try
            {
                using (AFCCInc_ComEntities db = new AFCCInc_ComEntities())
                {
                    var tempEmps = db.uspGetEmployees(professionalID, startIndex, pageSize, where, equals);
                    if (tempEmps != null)
                    {
                        emps = tempEmps.ToList<uspGetEmployees_Result>();

                        Parallel.ForEach<uspGetEmployees_Result>(
                             emps,
                            async (e) =>
                            {
                                e.Avatar = await Task.Run(() => BuildUserFilePath(e.Avatar, e.UserId, httpCurrent, true));
                            }
                         );
                    };
                }
            }
            catch (SqlException ex)
            {
                throw ex;
            };
            return emps;
        });
}

--Callee

    static string BuildUserFilePath(object fileName, object userProviderKey, HttpContext context, bool resolveForClient = false)
{
    return string.Format("{0}/{1}/{2}",
                                   resolveForClient ?
                                   context.Request.Url.AbsoluteUri.Replace(context.Request.Url.PathAndQuery, "") : "~",
                                   _membersFolderPath + AFCCIncSecurity.Folder.GetEncryptNameForSiteMemberFolder(userProviderKey.ToString(), _cryptPassword),
                                   fileName.ToString());
}

----------------------------------------Edit------------------------------------

The final code that I'm using with everyone's help. Thanks so much!

public async static Task<List<uspGetEmployees_Result>> GetEmployess(int professionalID, int startIndex, int pageSize, string where, string equals)
    {
        var httpCurrent = HttpContext.Current;
        List<uspGetEmployees_Result> emps = null;

        using (AFCCInc_ComEntities db = new AFCCInc_ComEntities())
        {

            emps = await Task.Run(() => (db.uspGetEmployees(professionalID, startIndex, pageSize, where, equals) ?? Enumerable.Empty<uspGetEmployees_Result>()).ToList());

            if (emps.Count() == 0) { return null; }
            int skip = 0;
            while (true)
            {
                // Do parallel processing in "waves".
                var tasks = emps
                      .Take(Environment.ProcessorCount)
                      .Select(e => Task.Run(() => e.Avatar = BuildUserFilePath(e.Avatar, e.UserId, httpCurrent, true))) // No await here - we just want the tasks.
                      .Skip(skip)
                      .ToArray();

                if (tasks.Length == 0) { break; }

                skip += Environment.ProcessorCount;
                await Task.WhenAll(tasks);
            };
        }
        return emps;
    }

回答1:

  1. Your definition of BuildUserFilePath and its usage are inconsistent. The definition clearly states that it's a string-returning method, whereas its usage implies that it returns a Task<>.
  2. Parallel.ForEach and async don't mix very well - that's the reason your bug happened in the first place.
  3. Unrelated but still worth noting: your try/catch is redundant as all it does is rethrow the original SqlException (and even that it doesn't do very well because you'll end up losing the stack trace).
  4. Do you really, really want to return null?

    public async static Task<List<uspGetEmployees_Result>> GetEmployess(int professionalID, int startIndex, int pageSize, string where, string equals)
    {
        var httpCurrent = HttpContext.Current;
    
        // Most of these operations are unlikely to be time-consuming,
        // so why await the whole thing?
        using (AFCCInc_ComEntities db = new AFCCInc_ComEntities())
        {
            // I don't really know what exactly uspGetEmployees returns
            // and, if it's an IEnumerable, whether it yields its elements lazily.
            // The fact that it can be null, however, bothers me, so I'll sidestep it.
            List<uspGetEmployees_Result> emps = await Task.Run(() =>
                (db.uspGetEmployees(professionalID, startIndex, pageSize, where, equals) ?? Enumerable.Empty<uspGetEmployees_Result>()).ToList()
            );
    
            // I'm assuming that BuildUserFilePath returns string - no async.
            await Task.Run(() =>
            {
                Parallel.ForEach(emps, e =>
                {
                    // NO async/await within the ForEach delegate body.
                    e.Avatar = BuildUserFilePath(e.Avatar, e.UserId, httpCurrent, true);
                });
            });
        }
    }
    


回答2:

There seems to be over-use of async and Task.Run() in this code. For example, what are you hoping to achieve from this segment?

  Parallel.ForEach<uspGetEmployees_Result>(
                             emps,
                            async (e) =>
                            {
                                e.Avatar = await Task.Run(() => BuildUserFilePath(e.Avatar, e.UserId, httpCurrent, true));
                            }
                         );

You're already using await on the result of the entire method, and you've used a Parallel.ForEach to get parallel execution of items in your loop, so what does the additional use of await Task.Run() get you? The code would certainly be a lot easier to follow without it.

It is not clear to me what you are trying to achieve here. Can you describe what your objectives are for this method?