StaTaskScheduler and STA thread message pumping

2019-01-01 07:09发布

TL;DR: A deadlock inside a task run by StaTaskScheduler. Long version:

I'm using StaTaskScheduler from ParallelExtensionsExtras by Parallel Team, to host some legacy STA COM objects supplied by a third party. The description of the StaTaskScheduler implementation details says the following:

The good news is that TPL’s implementation is able to run on either MTA or STA threads, and takes into account relevant differences around underlying APIs like WaitHandle.WaitAll (which only supports MTA threads when the method is provided multiple wait handles).

I thought that would mean the blocking parts of TPL would use a wait API which pumps messages, like CoWaitForMultipleHandles, to avoid deadlock situations when called on an STA thread.

In my situation, I believe the following is happening: in-proc STA COM object A makes a call to out-of-proc object B, then expects a callback from B via as a part of the outgoing call.

In a simplified form:

var result = await Task.Factory.StartNew(() =>
{
    // in-proc object A
    var a = new A(); 
    // out-of-proc object B
    var b = new B(); 
    // A calls B and B calls back A during the Method call
    return a.Method(b);     
}, CancellationToken.None, TaskCreationOptions.None, staTaskScheduler);

The problem is, a.Method(b) never returns. As far as I can tell, this happens because a blocking wait somewhere inside BlockingCollection<Task> does not pump messages, so my assumption about the quoted statement is probably wrong.

EDITED The same code works when is executed on the UI thread of the test WinForms application (that is, providing TaskScheduler.FromCurrentSynchronizationContext() instead of staTaskScheduler to Task.Factory.StartNew).

What is the right way to solve this? Should I implemented a custom synchronization context, which would explicitly pump messages with CoWaitForMultipleHandles, and install it on each STA thread started by StaTaskScheduler?

If so, will the underlying implementation of BlockingCollection be calling my SynchronizationContext.Wait method? Can I use SynchronizationContext.WaitHelper to implement SynchronizationContext.Wait?


EDITED with some code showing that a managed STA thread doesn't pump when doing a blocking wait. The code is a complete console app ready for copy/paste/run:

using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleTestApp
{
    class Program
    {
        // start and run an STA thread
        static void RunStaThread(bool pump)
        {
            // test a blocking wait with BlockingCollection.Take
            var tasks = new BlockingCollection<Task>();

            var thread = new Thread(() => 
            {
                // Create a simple Win32 window 
                var hwndStatic = NativeMethods.CreateWindowEx(0, "Static", String.Empty, NativeMethods.WS_POPUP,
                    0, 0, 0, 0, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero);

                // subclass it with a custom WndProc
                IntPtr prevWndProc = IntPtr.Zero;

                var newWndProc = new NativeMethods.WndProc((hwnd, msg, wParam, lParam) =>
                {
                    if (msg == NativeMethods.WM_TEST)
                        Console.WriteLine("WM_TEST processed");
                    return NativeMethods.CallWindowProc(prevWndProc, hwnd, msg, wParam, lParam);
                });

                prevWndProc = NativeMethods.SetWindowLong(hwndStatic, NativeMethods.GWL_WNDPROC, newWndProc);
                if (prevWndProc == IntPtr.Zero)
                    throw new ApplicationException();

                // post a test WM_TEST message to it
                NativeMethods.PostMessage(hwndStatic, NativeMethods.WM_TEST, IntPtr.Zero, IntPtr.Zero);

                // BlockingCollection blocks without pumping, NativeMethods.WM_TEST never arrives
                try { var task = tasks.Take(); }
                catch (Exception e) { Console.WriteLine(e.Message); }

                if (pump)
                {
                    // NativeMethods.WM_TEST will arrive, because Win32 MessageBox pumps
                    Console.WriteLine("Now start pumping...");
                    NativeMethods.MessageBox(IntPtr.Zero, "Pumping messages, press OK to stop...", String.Empty, 0);
                }
            });

            thread.SetApartmentState(ApartmentState.STA);
            thread.Start();

            Thread.Sleep(2000);

            // this causes the STA thread to end
            tasks.CompleteAdding(); 

            thread.Join();
        }

        static void Main(string[] args)
        {
            Console.WriteLine("Testing without pumping...");
            RunStaThread(false);

            Console.WriteLine("\nTest with pumping...");
            RunStaThread(true);

            Console.WriteLine("Press Enter to exit");
            Console.ReadLine();
        }
    }

    // Interop
    static class NativeMethods
    {
        [DllImport("user32")]
        public static extern IntPtr SetWindowLong(IntPtr hwnd, int nIndex, WndProc newProc);

        [DllImport("user32")]
        public static extern IntPtr CallWindowProc(IntPtr lpPrevWndFunc, IntPtr hwnd, int msg, int wParam, int lParam);

        [DllImport("user32.dll")]
        public static extern IntPtr CreateWindowEx(int dwExStyle, string lpClassName, string lpWindowName, int dwStyle, int x, int y, int nWidth, int nHeight, IntPtr hWndParent, IntPtr hMenu, IntPtr hInstance, IntPtr lpParam);

        [DllImport("user32.dll")]
        public static extern bool PostMessage(IntPtr hwnd, uint msg, IntPtr wParam, IntPtr lParam);

        [DllImport("user32.dll")]
        public static extern int MessageBox(IntPtr hwnd, string text, String caption, int options);

        public delegate IntPtr WndProc(IntPtr hwnd, int msg, int wParam, int lParam);

        public const int GWL_WNDPROC = -4;
        public const int WS_POPUP = unchecked((int)0x80000000);
        public const int WM_USER = 0x0400;

        public const int WM_TEST = WM_USER + 1;
    }
}

This produces the output:

Testing without pumping...
The collection argument is empty and has been marked as complete with regards to additions.

Test with pumping...
The collection argument is empty and has been marked as complete with regards to additions.
Now start pumping...
WM_TEST processed
Press Enter to exit

2条回答
谁念西风独自凉
2楼-- · 2019-01-01 07:47

My understanding of your problem: you are using StaTaskScheduler only to organize the classic COM STA apartment for your legacy COM objects. You're not running a WinForms or WPF core message loop on the STA thread of StaTaskScheduler. That is, you're not using anything like Application.Run, Application.DoEvents or Dispatcher.PushFrame inside that thread. Correct me if this is a wrong assumption.

By itself, StaTaskScheduler doesn't install any synchronization context on the STA threads it creates. Thus, you're relying upon the CLR to pump messages for you. I've only found an implicit confirmation that the CLR pumps on STA threads, in Apartments and Pumping in the CLR by Chris Brumme:

I keep saying that managed blocking will perform “some pumping” when called on an STA thread. Wouldn’t it be great to know exactly what will get pumped? Unfortunately, pumping is a black art which is beyond mortal comprehension. On Win2000 and up, we simply delegate to OLE32’s CoWaitForMultipleHandles service.

This indicates the CLR uses CoWaitForMultipleHandles internally for STA threads. Further, the MSDN docs for COWAIT_DISPATCH_WINDOW_MESSAGES flag mention this:

... in STA is only a small set of special-cased messages dispatched.

I did some research on that, but could not get to pump the WM_TEST from your sample code with CoWaitForMultipleHandles, we discussed that in the comments to your question. My understanding is, the aforementioned small set of special-cased messages is really limited to some COM marshaller-specific messages, and doesn't include any regular general-purpose messages like your WM_TEST.

So, to answer your question:

... Should I implemented a custom synchronization context, which would explicitly pump messages with CoWaitForMultipleHandles, and install it on each STA thread started by StaTaskScheduler?

Yes, I believe that creating a custom synchronization context and overriding SynchronizationContext.Wait is indeed the right solution.

However, you should avoid using CoWaitForMultipleHandles, and use MsgWaitForMultipleObjectsEx instead. If MsgWaitForMultipleObjectsEx indicates there's a pending message in the queue, you should manually pump it with PeekMessage(PM_REMOVE) and DispatchMessage. Then you should continue waiting for the handles, all inside the same SynchronizationContext.Wait call.

Note there's a subtle but important difference between MsgWaitForMultipleObjectsEx and MsgWaitForMultipleObjects. The latter doesn't return and keeps blocking, if there's a message already seen in the queue (e.g., with PeekMessage(PM_NOREMOVE) or GetQueueStatus), but not removed. That's not good for pumping, because your COM objects might be using something like PeekMessage to inspect the message queue. That might later cause MsgWaitForMultipleObjects to block when not expected.

OTOH, MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE flag doesn't have such shortcoming, and would return in this case.

A while ago I created a custom version of StaTaskScheduler (available here as ThreadAffinityTaskScheduler) in attempt to solve a different problem: maintaining a pool of threads with thread affinity for subsequent await continuations. The thread affinity is vital if you use STA COM objects across multiple awaits. The original StaTaskScheduler exhibits this behavior only when its pool is limited to 1 thread.

So I went ahead and did some more experimenting with your WM_TEST case. Originally, I installed an instance of the standard SynchronizationContext class on the STA thread. The WM_TEST message didn't get pumped, which was expected.

Then I overridden SynchronizationContext.Wait to just forward it to SynchronizationContext.WaitHelper. It did get called, but still didn't pump.

Finally, I implemented a full-featured message pump loop, here's the core part of it:

// the core loop
var msg = new NativeMethods.MSG();
while (true)
{
    // MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns,
    // even if there's a message already seen but not removed in the message queue
    nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx(
        count, waitHandles,
        (uint)remainingTimeout,
        QS_MASK,
        NativeMethods.MWMO_INPUTAVAILABLE);

    if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult)
        return managedResult;

    // there is a message, pump and dispatch it
    if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE))
    {
        NativeMethods.TranslateMessage(ref msg);
        NativeMethods.DispatchMessage(ref msg);
    }
    if (hasTimedOut())
        return WaitHandle.WaitTimeout;
}

This does work, WM_TEST gets pumped. Below is an adapted version of your test:

public static async Task RunAsync()
{
    using (var staThread = new Noseratio.ThreadAffinity.ThreadWithAffinityContext(staThread: true, pumpMessages: true))
    {
        Console.WriteLine("Initial thread #" + Thread.CurrentThread.ManagedThreadId);
        await staThread.Run(async () =>
        {
            Console.WriteLine("On STA thread #" + Thread.CurrentThread.ManagedThreadId);
            // create a simple Win32 window
            IntPtr hwnd = CreateTestWindow();

            // Post some WM_TEST messages
            Console.WriteLine("Post some WM_TEST messages...");
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(1), IntPtr.Zero);
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(2), IntPtr.Zero);
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(3), IntPtr.Zero);
            Console.WriteLine("Press Enter to continue...");
            await ReadLineAsync();

            Console.WriteLine("After await, thread #" + Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine("Pending messages in the queue: " + (NativeMethods.GetQueueStatus(0x1FF) >> 16 != 0));

            Console.WriteLine("Exiting STA thread #" + Thread.CurrentThread.ManagedThreadId);
        }, CancellationToken.None);
    }
    Console.WriteLine("Current thread #" + Thread.CurrentThread.ManagedThreadId);
}

The output:

Initial thread #9
On STA thread #10
Post some WM_TEST messages...
Press Enter to continue...
WM_TEST processed: 1
WM_TEST processed: 2
WM_TEST processed: 3

After await, thread #10
Pending messages in the queue: False
Exiting STA thread #10
Current thread #12
Press any key to exit

Note this implementation supports both the thread affinity (it stays on the thread #10 after await) and the message pumping. The full source code contains re-usable parts (ThreadAffinityTaskScheduler and ThreadWithAffinityContext) and is available here as self-contained console app. It hasn't been thoroughly tested, so use it at your own risk.

查看更多
其实,你不懂
3楼-- · 2019-01-01 07:53

The subject of STA thread pumping is a large one with very few programmers having an enjoyable time solving deadlock. The seminal paper about it was written by Chris Brumme, a principal smart guy that worked on .NET. You'll find it in this blog post. Unfortunately it is rather short on specifics, he doesn't go beyond noting that the CLR does a bit of pumping but without any details on the exact rules.

The code he's talking about, added in .NET 2.0, is present in an internal CLR function named MsgWaitHelper(). The source code for .NET 2.0 is available through the SSCLI20 distribution. Very complete, but the source for MsgWaitHelper() is not included. Quite unusual. Decompiling it is rather a lost cause, it is very large.

The one thing to take away from his blog post is the danger of re-entrancy. Pumping in an STA thread is dangerous for its ability to dispatch Windows messages and get arbitrary code to execute when your program isn't in the correct state to allow such code to execute. Something that most any VB6 programmer knows when he used DoEvents() to get a modal loop in his code to stop freezing the UI. I wrote a post about its most typical dangers. MsgWaitHelper() does this exact kind of pumping itself, it however is very selective about exactly what kind of code it allows to run.

You can get some insight in what it does inside your test program by running the program without a debugger attached and then attaching an unmanaged debugger. You'll see it blocking on NtWaitForMultipleObjects(). I took it one step further and set a breakpoint on PeekMessageW(), to get this stack trace:

user32.dll!PeekMessageW()   Unknown
combase.dll!CCliModalLoop::MyPeekMessage(tagMSG * pMsg, HWND__ * hwnd, unsigned int min, unsigned int max, unsigned short wFlag) Line 2305  C++
combase.dll!CCliModalLoop::PeekRPCAndDDEMessage() Line 2008 C++
combase.dll!CCliModalLoop::FindMessage(unsigned long dwStatus) Line 2087    C++
combase.dll!CCliModalLoop::HandleWakeForMsg() Line 1707 C++
combase.dll!CCliModalLoop::BlockFn(void * * ahEvent, unsigned long cEvents, unsigned long * lpdwSignaled) Line 1645 C++
combase.dll!ClassicSTAThreadWaitForHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * pdwIndex) Line 46 C++
combase.dll!CoWaitForMultipleHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * lpdwindex) Line 120 C++
clr.dll!MsgWaitHelper(int,void * *,int,unsigned long,int)   Unknown
clr.dll!Thread::DoAppropriateWaitWorker(int,void * *,int,unsigned long,enum WaitMode)   Unknown
clr.dll!Thread::DoAppropriateWait(int,void * *,int,unsigned long,enum WaitMode,struct PendingSync *)    Unknown
clr.dll!CLREventBase::WaitEx(unsigned long,enum WaitMode,struct PendingSync *)  Unknown
clr.dll!CLREventBase::Wait(unsigned long,int,struct PendingSync *)  Unknown
clr.dll!Thread::Block(int,struct PendingSync *) Unknown
clr.dll!SyncBlock::Wait(int,int)    Unknown
clr.dll!ObjectNative::WaitTimeout(bool,int,class Object *)  Unknown

Beware that I recorded this stack trace on Windows 8.1, it will look pretty different on older Windows versions. The COM modal loop has been heavily tinkered with in Windows 8, it is also a very big deal to WinRT programs. Don't know that much about it, but it appears to have another STA threading model named ASTA that does a more restrictive kind of pumping, enshrined in the added CoWaitForMultipleObjects()

ObjectNative::WaitTimeout() is where the SemaphoreSlim.Wait() inside the BlockingCollection.Take() method starts executing CLR code. You see it plowing through the levels of internal CLR code to arrive at the mythical MsgWaitHelper() function, then switching to the infamous COM modal dispatcher loop.

The bat signal sign of it doing the "wrong" kind of pumping in your program is the call to CliModalLoop::PeekRPCAndDDEMessage() method. In other words, it is only considering the kind of interop messages that get posted to a specific internal window that dispatches the COM calls that cross an apartment boundary. It will not pump the messages that are in the message queue for your own window.

This is understandable behavior, Windows can only be absolutely sure that re-entrancy won't kill your program when it can see that your UI thread is idle. It is idle when it pumps the message loop itself, a call to PeekMessage() or GetMessage() indicates that state. Problem is, you don't pump yourself. You violated the core contract of an STA thread, it must pump the message loop. Hoping that the COM modal loop will do the pumping for you is thus idle hope.

You can actually fix this, even though I don't recommend you do. The CLR will leave it to the application itself to perform the wait by a properly constructed SynchronizationContext.Current object. You can create one by deriving your own class and override the Wait() method. Call the SetWaitNotificationRequired() method to convince the CLR that it should leave it up to you. An incomplete version that demonstrates the approach:

class MySynchronizationProvider : System.Threading.SynchronizationContext {
    public MySynchronizationProvider() {
        base.SetWaitNotificationRequired();
    }
    public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout) {
        for (; ; ) {
            int result = MsgWaitForMultipleObjects(waitHandles.Length, waitHandles, waitAll, millisecondsTimeout, 8);
            if (result == waitHandles.Length) System.Windows.Forms.Application.DoEvents();
            else return result;
        }
    }
    [DllImport("user32.dll")]
    private static extern int MsgWaitForMultipleObjects(int cnt, IntPtr[] waitHandles, bool waitAll,
        int millisecondTimeout, int mask);        
}

And install it at the start of your thread:

    System.ComponentModel.AsyncOperationManager.SynchronizationContext =
        new MySynchronizationProvider();

You'll now see your WM_TEST message getting dispatched. It the call to Application.DoEvents() that dispatched it. I could have covered it up by using PeekMessage + DispatchMessage but that would obfuscate the danger of this code, best to not stick DoEvents() under the table. You really are playing a very dangerous re-entrancy game here. Don't use this code.

Long story short, the only hope of using StaThreadScheduler correctly is when it is used in code that already implemented the STA contract and pumps like an STA thread should do. It was really meant as a band-aid for old code where you don't have to luxury to control the thread state. Like any code that started life in a VB6 program or Office add-in. Experimenting a bit with it, I don't think it actually can work. Notable too is that the need for it ought the be completely eliminated with the availability of asych/await.

查看更多
登录 后发表回答