Let's say I have a UI thread and a background thread that subscribe to a custom thread-safe ObservableCollection that I created so that whenever the collection changes it executes the callback within the appropriate context.
Now let's say I add something to the collection (from either thread, doesn't matter which one) and it now has to marshall the callback to both threads. To execute the callback within the UI's context I can simply do a Dispatcher.Invoke(...) and it executes the callback within the UI's context; great.
Now I want to execute the callback within the background thread's context (don't ask me why, it may well be that whatever it's accessing is affinitized to that specific thread or has thread-local storage it needs to access); how would I do that?
Background threads don't have a dispatcher/message pumping mechanism so I can't use a dispatcher or SynchronizationContext, so how would one interrupt a background thread and have it execute my callback within its context?
EDIT: I keep getting answers that are obviously wrong so I must not have explained myself correctly. Forget the UI thread and UI dispatchers guys, they were meant to marshall calls to the UI thread, that's it! Imagine two worker threads A and B. If A modifies my collection then A is in charge of marshalling the callback to itself and to B. Executing the callback within A's context is easy since A was the one triggering it : simply call the delegate in place. Now A needs to marshall the callback to B... now what? Dispatcher and SynContext are useless in this situation.
We have a component that must always run on the same STA background thread. We've achieved this by writing our own SynchronizationContext
. This article is very helpful.
To summarise, you don't want to interrupt your worker thread, you want it to sit idle waiting for the next task that it should execute. You add jobs to a queue and it processes those jobs in order. The SynchronizationContext
is a convenient abstraction around that idea. The SynchronizationContext
is the owner of the worker thread - and the outside world does not interact with the thread directly: callers who want to execute a task on the worker thread make the request to the context which adds the job to the job queue. The worker is either working or polling the queue until another job is added, at which point it begins working again.
Update
Here is an example:
using System.Collections.Concurrent;
using System.Threading;
class LoadBalancedContext : SynchronizationContext
{
readonly Thread thread1;
readonly Thread thread2;
readonly ConcurrentQueue<JobInfo> jobs = new ConcurrentQueue<JobInfo>();
public LoadBalancedContext()
{
this.thread1 = new Thread(this.Poll) { Name = "T1" };
this.thread2 = new Thread(this.Poll) { Name = "T2" };
this.thread1.Start();
this.thread2.Start();
}
public override void Post(SendOrPostCallback d, object state)
{
this.jobs.Enqueue(new JobInfo { Callback = d, State = state });
}
void Poll()
{
while (true)
{
JobInfo info;
if (this.jobs.TryDequeue(out info))
{
info.Callback(info.State);
}
Thread.Sleep(100);
}
}
class JobInfo
{
public SendOrPostCallback Callback { get; set; }
public object State { get; set; }
}
}
Usage:
var context = new LoadBalancedContext();
SendOrPostCallback callback = x =>
{
Trace.WriteLine(Thread.CurrentThread.Name);
Thread.Sleep(200);
};
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
Thread.Sleep(1000);
The Send
case is slightly more involved as you will need to listen for a reset event.. This is not production quality, but should give you an idea ow what you need to do.
Hope that helps.
A good idea might also be extending your own TaskScheduler, you will have to implement three methods:
QueueTask, TryExecuteTaskInline and GetScheduledTasks
you can read about it here
That way, anytime you need to run something on your dedicated thread you could just do:
Task.Factory.StartNew(() => { SomeAction }, SomeCancellationToken, TaskCreationOptions
new MyTaskSchedular());
and have it execute on your thread.
Forget dispatcher.invoke, forget the ui thread. Imagine I have 2 worker
threads and I want to dispatch my event to both worker threads; what
can I use?
I'd use two task schedulers for this (as @YuvalItzchakov's answer suggests), one for each thread. I'd also use a custom synchronization context for the worker thread, as @TheMouthofaCow's answer suggests.
That is, for a UI thread, I'd just save and use TaskScheduler.FromCurrentSynchronizationContext()
. For the worker thread, I would start a thread and install a custom synchronization context on it, then use FromCurrentSynchronizationContext
too.
Something like this (untested):
// UI thread
var uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();
using (var worker = new ThreadWithPumpingSyncContext())
{
// call the worker thread
var result = await worker.Run(async () =>
{
// worker thread
await Task.Delay(1000);
// call the UI thread
await Task.Factory.StartNew(async () =>
{
// UI thread
await Task.Delay(2000);
MessageBox.Show("UI Thread!"),
// call the worker thread
await worker.Run(() =>
{
// worker thread
Thread.Sleep(3000)
});
// UI thread
await Task.Delay(4000);
}, uiTaskScheduler).Unwrap();
// worker thread
await Task.Delay(5000);
return Type.Missing; // or implement a non-generic version of Run
});
}
// ...
// ThreadWithSerialSyncContext renamed to ThreadWithPumpingSyncContext
class ThreadWithPumpingSyncContext : SynchronizationContext, IDisposable
{
public readonly TaskScheduler Scheduler; // can be used to run tasks on the pumping thread
readonly Task _mainThreadTask; // wrap the pumping thread as Task
readonly BlockingCollection<Action> _actions = new BlockingCollection<Action>();
// track async void methods
readonly object _lock = new Object();
volatile int _pendingOps = 0; // the number of pending async void method calls
volatile TaskCompletionSource<Empty> _pendingOpsTcs = null; // to wait for pending async void method calls
public ThreadWithPumpingSyncContext()
{
var tcs = new TaskCompletionSource<TaskScheduler>();
_mainThreadTask = Task.Factory.StartNew(() =>
{
try
{
SynchronizationContext.SetSynchronizationContext(this);
tcs.SetResult(TaskScheduler.FromCurrentSynchronizationContext());
// pumping loop
foreach (var action in _actions.GetConsumingEnumerable())
action();
}
finally
{
SynchronizationContext.SetSynchronizationContext(null);
}
}, TaskCreationOptions.LongRunning);
Scheduler = tcs.Task.Result;
}
// SynchronizationContext methods
public override SynchronizationContext CreateCopy()
{
return this;
}
public override void OperationStarted()
{
lock (_lock)
{
if (_pendingOpsTcs != null && _pendingOpsTcs.Task.IsCompleted)
throw new InvalidOperationException("OperationStarted"); // shutdown requested
_pendingOps++;
}
}
public override void OperationCompleted()
{
lock (_lock)
{
_pendingOps--;
if (0 == _pendingOps && null != _pendingOpsTcs)
_pendingOpsTcs.SetResult(Empty.Value);
}
}
public override void Post(SendOrPostCallback d, object state)
{
_actions.Add(() => d(state));
}
public override void Send(SendOrPostCallback d, object state)
{
throw new NotImplementedException("Send");
}
// Task start helpers
public Task Run(Action action, CancellationToken token = default(CancellationToken))
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler);
}
public Task Run(Func<Task> action, CancellationToken token = default(CancellationToken))
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler).Unwrap();
}
public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token = default(CancellationToken))
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler).Unwrap();
}
// IDispose
public void Dispose()
{
var disposingAlready = false;
lock (_lock)
{
disposingAlready = null != _pendingOpsTcs;
if (!disposingAlready)
{
// do not allow new async void method calls
_pendingOpsTcs = new TaskCompletionSource<Empty>();
if (0 == _pendingOps)
_pendingOpsTcs.TrySetResult(Empty.Value);
}
}
// outside the lock
if (!disposingAlready)
{
// wait for pending async void method calls
_pendingOpsTcs.Task.Wait();
// request the end of the pumping loop
_actions.CompleteAdding();
}
_mainThreadTask.Wait();
}
struct Empty { public static readonly Empty Value = default(Empty); }
}
This give you some sort of cooperative asynchronous execution between two threads.