Please see this question for background information:
How do Tasks in the Task Parallel Library affect ActivityID?
That question asks how Tasks affect Trace.CorrelationManager.ActivityId. @Greg Samson answered his own question with a test program showing that ActivityId is reliable in the context of Tasks. The test program sets an ActivityId at the beginning of the Task delegate, sleeps to simulate work, then checks the ActivityId at the end to make sure that it is the same value (i.e. that it has not been modified by another thread). The program runs successfully.
While researching other "context" options for threading, Tasks, and Parallel operations (ultimately to provide better context for logging), I ran into a strange issue with Trace.CorrelationManager.LogicalOperationStack (it was strange to me anyway). I have copied my "answer" to his question below.
I think that it adequately describes the issue that I ran into (Trace.CorrelationManager.LogicalOperationStack apparently getting corrupted - or something - when used in the context of Parallel.For, but only if the Parallel.For itself is enclosed in a logical operation).
Here are my questions:
Should Trace.CorrelationManager.LogicalOperationStack be usable with Parallel.For? If so, should it make a difference if a logical operation is already in effect with the Parallel.For is started?
Is there a "correct" way to use LogicalOperationStack with Parallel.For? Could I code this sample program differntly so that it "works"? By "works", I mean that the LogicalOperationStack always has the expected number of entries and the entries themselves are the expected entries.
I have done some additional testing using Threads and ThreadPool threads, but I would have to go back and retry those tests to see if I ran into similar problems.
I will say that it does appear that Task/Parallel threads and ThreadPool threads DO "inherit" the Trace.CorrelationManager.ActivityId and Trace.CorrelationManager.LogicalOperationStack values from the parent thread. This is expected as these values are stored by the CorrelationManager using CallContext's LogicalSetData method (as opposed to SetData).
Again, please refer back to this question to get the original context for the "answer" that I posted below:
How do Tasks in the Task Parallel Library affect ActivityID?
See also this similar question (which so far has not been answered) on Microsoft's Parallel Extensions forum:
http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9
[BEGIN PASTE]
Please forgive my posting this as an answer as it is not really answer to your question, however, it is related to your question since it deals with CorrelationManager behavior and threads/tasks/etc. I have been looking at using the CorrelationManager's LogicalOperationStack
(and StartLogicalOperation/StopLogicalOperation
methods) to provide additional context in multithreading scenarios.
I took your example and modified it slightly to add the ability to perform work in parallel using Parallel.For. Also, I use StartLogicalOperation/StopLogicalOperation
to bracket (internally) DoLongRunningWork
. Conceptually, DoLongRunningWork
does something like this each time it is executed:
DoLongRunningWork
StartLogicalOperation
Thread.Sleep(3000)
StopLogicalOperation
I have found that if I add these logical operations to your code (more or less as is), all of the logical operatins remain in sync (always the expected number of operations on stack and the values of the operations on the stack are always as expected).
In some of my own testing I found that this was not always the case. The logical operation stack was getting "corrupted". The best explanation I could come up with is that the "merging" back of the CallContext information into the "parent" thread context when the "child" thread exits was causing the "old" child thread context information (logical operation) to be "inherited" by another sibling child thread.
The problem might also be related to the fact that Parallel.For apparently uses the main thread (at least in the example code, as written) as one of the "worker threads" (or whatever they should be called in the parallel domain). Whenever DoLongRunningWork is executed, a new logical operation is started (at the beginning) and stopped (at the end) (that is, pushed onto the LogicalOperationStack and popped back off of it). If the main thread already has a logical operation in effect and if DoLongRunningWork executes ON THE MAIN THREAD, then a new logical operation is started so the main thread's LogicalOperationStack now has TWO operations. Any subsequent executions of DoLongRunningWork (as long as this "iteration" of DoLongRunningWork is executing on the main thread) will (apparently) inherit the main thread's LogicalOperationStack (which now has two operations on it, rather than just the one expected operation).
It took me a long time to figure out why the behavior of the LogicalOperationStack was different in my example than in my modified version of your example. Finally I saw that in my code I had bracketed the entire program in a logical operation, whereas in my modified version of your test program I did not. The implication is that in my test program, each time my "work" was performed (analogous to DoLongRunningWork), there was already a logical operation in effect. In my modified version of your test program, I had not bracketed the entire program in a logical operation.
So, when I modified your test program to bracket the entire program in a logical operation AND if I am using Parallel.For, I ran into exactly the same problem.
Using the conceptual model above, this will run successfully:
Parallel.For
DoLongRunningWork
StartLogicalOperation
Sleep(3000)
StopLogicalOperation
While this will eventually assert due to an apparently out of sync LogicalOperationStack:
StartLogicalOperation
Parallel.For
DoLongRunningWork
StartLogicalOperation
Sleep(3000)
StopLogicalOperation
StopLogicalOperation
Here is my sample program. It is similar to yours in that it has a DoLongRunningWork method that manipulates the ActivityId as well as the LogicalOperationStack. I also have two flavors of kicking of DoLongRunningWork. One flavor uses Tasks one uses Parallel.For. Each flavor can also be executed such that the whole parallelized operation is enclosed in a logical operation or not. So, there are a total of 4 ways to execute the parallel operation. To try each one, simply uncomment the desired "Use..." method, recompile, and run. UseTasks
, UseTasks(true)
, and UseParallelFor
should all run to completion. UseParallelFor(true)
will assert at some point because the LogicalOperationStack does not have the expected number of entries.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace CorrelationManagerParallelTest
{
class Program
{
static void Main(string[] args)
{
//UseParallelFor(true) will assert because LogicalOperationStack will not have expected
//number of entries, all others will run to completion.
UseTasks(); //Equivalent to original test program with only the parallelized
//operation bracketed in logical operation.
////UseTasks(true); //Bracket entire UseTasks method in logical operation
////UseParallelFor(); //Equivalent to original test program, but use Parallel.For
//rather than Tasks. Bracket only the parallelized
//operation in logical operation.
////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
}
private static List<int> threadIds = new List<int>();
private static object locker = new object();
private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;
private static int mainThreadUsedInDelegate = 0;
// baseCount is the expected number of entries in the LogicalOperationStack
// at the time that DoLongRunningWork starts. If the entire operation is bracketed
// externally by Start/StopLogicalOperation, then baseCount will be 1. Otherwise,
// it will be 0.
private static void DoLongRunningWork(int baseCount)
{
lock (locker)
{
//Keep a record of the managed thread used.
if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
threadIds.Add(Thread.CurrentThread.ManagedThreadId);
if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
{
mainThreadUsedInDelegate++;
}
}
Guid lo1 = Guid.NewGuid();
Trace.CorrelationManager.StartLogicalOperation(lo1);
Guid g1 = Guid.NewGuid();
Trace.CorrelationManager.ActivityId = g1;
Thread.Sleep(3000);
Guid g2 = Trace.CorrelationManager.ActivityId;
Debug.Assert(g1.Equals(g2));
//This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
//in effect when the Parallel.For operation was started.
Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));
Trace.CorrelationManager.StopLogicalOperation();
}
private static void UseTasks(bool encloseInLogicalOperation = false)
{
int totalThreads = 100;
TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
Task task = null;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StartLogicalOperation();
}
Task[] allTasks = new Task[totalThreads];
for (int i = 0; i < totalThreads; i++)
{
task = Task.Factory.StartNew(() =>
{
DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
}, taskCreationOpt);
allTasks[i] = task;
}
Task.WaitAll(allTasks);
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StopLogicalOperation();
}
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
Console.ReadKey();
}
private static void UseParallelFor(bool encloseInLogicalOperation = false)
{
int totalThreads = 100;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StartLogicalOperation();
}
Parallel.For(0, totalThreads, i =>
{
DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
});
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StopLogicalOperation();
}
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
Console.ReadKey();
}
}
}
This whole issue of if LogicalOperationStack can be used with Parallel.For (and/or other threading/Task constructs) or how it can be used probably merits its own question. Maybe I will post a question. In the meantime, I wonder if you have any thoughts on this (or, I wonder if you had considered using LogicalOperationStack since ActivityId appears to be safe).
[END PASTE]
Does anyone have any thoughts on this issue?