How would you marshall a call to a background thre

2019-07-17 02:24发布

Let's say I have a UI thread and a background thread that subscribe to a custom thread-safe ObservableCollection that I created so that whenever the collection changes it executes the callback within the appropriate context.

Now let's say I add something to the collection (from either thread, doesn't matter which one) and it now has to marshall the callback to both threads. To execute the callback within the UI's context I can simply do a Dispatcher.Invoke(...) and it executes the callback within the UI's context; great.

Now I want to execute the callback within the background thread's context (don't ask me why, it may well be that whatever it's accessing is affinitized to that specific thread or has thread-local storage it needs to access); how would I do that?

Background threads don't have a dispatcher/message pumping mechanism so I can't use a dispatcher or SynchronizationContext, so how would one interrupt a background thread and have it execute my callback within its context?

EDIT: I keep getting answers that are obviously wrong so I must not have explained myself correctly. Forget the UI thread and UI dispatchers guys, they were meant to marshall calls to the UI thread, that's it! Imagine two worker threads A and B. If A modifies my collection then A is in charge of marshalling the callback to itself and to B. Executing the callback within A's context is easy since A was the one triggering it : simply call the delegate in place. Now A needs to marshall the callback to B... now what? Dispatcher and SynContext are useless in this situation.

3条回答
在下西门庆
2楼-- · 2019-07-17 02:40

We have a component that must always run on the same STA background thread. We've achieved this by writing our own SynchronizationContext. This article is very helpful.

To summarise, you don't want to interrupt your worker thread, you want it to sit idle waiting for the next task that it should execute. You add jobs to a queue and it processes those jobs in order. The SynchronizationContext is a convenient abstraction around that idea. The SynchronizationContext is the owner of the worker thread - and the outside world does not interact with the thread directly: callers who want to execute a task on the worker thread make the request to the context which adds the job to the job queue. The worker is either working or polling the queue until another job is added, at which point it begins working again.

Update

Here is an example:

using System.Collections.Concurrent;
using System.Threading;

class LoadBalancedContext : SynchronizationContext
{
    readonly Thread thread1;

    readonly Thread thread2;

    readonly ConcurrentQueue<JobInfo> jobs = new ConcurrentQueue<JobInfo>();

    public LoadBalancedContext()
    {
        this.thread1 = new Thread(this.Poll) { Name = "T1" };
        this.thread2 = new Thread(this.Poll) { Name = "T2" };

        this.thread1.Start();
        this.thread2.Start();
    }

    public override void Post(SendOrPostCallback d, object state)
    {
        this.jobs.Enqueue(new JobInfo { Callback = d, State = state });
    }

    void Poll()
    {
        while (true)
        {
            JobInfo info;
            if (this.jobs.TryDequeue(out info))
            {
                info.Callback(info.State);
            }

            Thread.Sleep(100);
        }
    }

    class JobInfo
    {
        public SendOrPostCallback Callback { get; set; }

        public object State { get; set; }
    }
}

Usage:

var context = new LoadBalancedContext();

SendOrPostCallback callback = x =>
    {
        Trace.WriteLine(Thread.CurrentThread.Name);
        Thread.Sleep(200);
    };

context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);
context.Post(callback, null);

Thread.Sleep(1000);

The Send case is slightly more involved as you will need to listen for a reset event.. This is not production quality, but should give you an idea ow what you need to do.

Hope that helps.

查看更多
欢心
3楼-- · 2019-07-17 02:48

A good idea might also be extending your own TaskScheduler, you will have to implement three methods:

QueueTask, TryExecuteTaskInline and GetScheduledTasks

you can read about it here

That way, anytime you need to run something on your dedicated thread you could just do:

Task.Factory.StartNew(() => { SomeAction }, SomeCancellationToken, TaskCreationOptions
            new MyTaskSchedular());

and have it execute on your thread.

查看更多
迷人小祖宗
4楼-- · 2019-07-17 03:02

Forget dispatcher.invoke, forget the ui thread. Imagine I have 2 worker threads and I want to dispatch my event to both worker threads; what can I use?

I'd use two task schedulers for this (as @YuvalItzchakov's answer suggests), one for each thread. I'd also use a custom synchronization context for the worker thread, as @TheMouthofaCow's answer suggests.

That is, for a UI thread, I'd just save and use TaskScheduler.FromCurrentSynchronizationContext(). For the worker thread, I would start a thread and install a custom synchronization context on it, then use FromCurrentSynchronizationContext too.

Something like this (untested):

// UI thread
var uiTaskScheduler = TaskScheduler.FromCurrentSynchronizationContext();

using (var worker = new ThreadWithPumpingSyncContext())
{
    // call the worker thread
    var result = await worker.Run(async () => 
    {
        // worker thread
        await Task.Delay(1000);

        // call the UI thread
        await Task.Factory.StartNew(async () => 
        {
            // UI thread
            await Task.Delay(2000);
            MessageBox.Show("UI Thread!"), 

            // call the worker thread
            await worker.Run(() => 
            {
                // worker thread
                Thread.Sleep(3000)
            });

            // UI thread
            await Task.Delay(4000);
        }, uiTaskScheduler).Unwrap();

        // worker thread
        await Task.Delay(5000);
        return Type.Missing; // or implement a non-generic version of Run
    });
}

// ...

// ThreadWithSerialSyncContext renamed to ThreadWithPumpingSyncContext
class ThreadWithPumpingSyncContext : SynchronizationContext, IDisposable
{
    public readonly TaskScheduler Scheduler; // can be used to run tasks on the pumping thread
    readonly Task _mainThreadTask; // wrap the pumping thread as Task
    readonly BlockingCollection<Action> _actions = new BlockingCollection<Action>();

    // track async void methods
    readonly object _lock = new Object();
    volatile int _pendingOps = 0; // the number of pending async void method calls
    volatile TaskCompletionSource<Empty> _pendingOpsTcs = null; // to wait for pending async void method calls

    public ThreadWithPumpingSyncContext()
    {
        var tcs = new TaskCompletionSource<TaskScheduler>();
        _mainThreadTask = Task.Factory.StartNew(() =>
        {
            try
            {
                SynchronizationContext.SetSynchronizationContext(this);
                tcs.SetResult(TaskScheduler.FromCurrentSynchronizationContext());

                // pumping loop
                foreach (var action in _actions.GetConsumingEnumerable())
                    action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(null);
            }
        }, TaskCreationOptions.LongRunning);

        Scheduler = tcs.Task.Result;
    }

    // SynchronizationContext methods
    public override SynchronizationContext CreateCopy()
    {
        return this;
    }

    public override void OperationStarted()
    {
        lock (_lock)
        {
            if (_pendingOpsTcs != null && _pendingOpsTcs.Task.IsCompleted)
                throw new InvalidOperationException("OperationStarted"); // shutdown requested
            _pendingOps++;
        }
    }

    public override void OperationCompleted()
    {
        lock (_lock)
        {
            _pendingOps--;
            if (0 == _pendingOps && null != _pendingOpsTcs)
                _pendingOpsTcs.SetResult(Empty.Value);
        }
    }

    public override void Post(SendOrPostCallback d, object state)
    {
        _actions.Add(() => d(state));
    }

    public override void Send(SendOrPostCallback d, object state)
    {
        throw new NotImplementedException("Send");
    }

    // Task start helpers
    public Task Run(Action action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler);
    }

    public Task Run(Func<Task> action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler).Unwrap();
    }

    public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token = default(CancellationToken))
    {
        return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this.Scheduler).Unwrap();
    }

    // IDispose
    public void Dispose()
    {
        var disposingAlready = false;

        lock (_lock)
        {
            disposingAlready = null != _pendingOpsTcs;
            if (!disposingAlready)
            {
                // do not allow new async void method calls
                _pendingOpsTcs = new TaskCompletionSource<Empty>();
                if (0 == _pendingOps)
                    _pendingOpsTcs.TrySetResult(Empty.Value);
            }
        }

        // outside the lock
        if (!disposingAlready)
        {
            // wait for pending async void method calls
            _pendingOpsTcs.Task.Wait();

            // request the end of the pumping loop
            _actions.CompleteAdding();
        }

        _mainThreadTask.Wait();
    }

    struct Empty { public static readonly Empty Value = default(Empty); }
}

This give you some sort of cooperative asynchronous execution between two threads.

查看更多
登录 后发表回答