StaTaskScheduler和STA线程消息泵

30

TL;DR: 在由StaTaskScheduler运行的任务中出现死锁。 长篇版本:

我正在使用Parallel Team的ParallelExtensionsExtras中的StaTaskScheduler,以托管第三方提供的一些遗留STA COM对象。 StaTaskScheduler实现细节的描述如下:

好消息是TPL的实现能够在MTA或STA线程上运行,并且考虑了底层API(例如WaitHandle.WaitAll,在提供多个等待句柄时仅支持MTA线程)的相关差异。

我认为这意味着TPL的阻塞部分将使用一种消息泵等待API(例如CoWaitForMultipleHandles),以避免在STA线程上调用时出现死锁情况。

在我的情况下,我认为以下情况正在发生:进程内STA COM对象A调用进程外对象B,然后期望从B作为传出调用的一部分获得回调。
简化形式如下:
var result = await Task.Factory.StartNew(() =>
{
    // in-proc object A
    var a = new A(); 
    // out-of-proc object B
    var b = new B(); 
    // A calls B and B calls back A during the Method call
    return a.Method(b);     
}, CancellationToken.None, TaskCreationOptions.None, staTaskScheduler);

问题在于,a.Method(b) 从未返回。据我所知,这是因为 BlockingCollection<Task> 中的某个阻塞等待没有泵送消息,因此我对引用语句的假设可能是错误的。 编辑 当在测试 WinForms 应用程序的 UI 线程上执行相同的代码时(即向 Task.Factory.StartNew 提供 TaskScheduler.FromCurrentSynchronizationContext() 而不是 staTaskScheduler),它可以正常工作。
应该如何解决这个问题?我应该实现一个自定义同步上下文,在每个由 StaTaskScheduler 启动的 STA 线程上显式地使用 CoWaitForMultipleHandles 泵送消息并安装它吗?
如果是这样,BlockingCollection 的底层实现会调用我的 SynchronizationContext.Wait 方法吗?我可以使用 SynchronizationContext.WaitHelper 实现 SynchronizationContext.Wait 吗?
编辑:附带一些代码,展示了当执行阻塞等待时,托管STA线程不会进行消息泵。该代码是一个完整的控制台应用程序,可供复制/粘贴/运行:
using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleTestApp
{
    class Program
    {
        // start and run an STA thread
        static void RunStaThread(bool pump)
        {
            // test a blocking wait with BlockingCollection.Take
            var tasks = new BlockingCollection<Task>();

            var thread = new Thread(() => 
            {
                // Create a simple Win32 window 
                var hwndStatic = NativeMethods.CreateWindowEx(0, "Static", String.Empty, NativeMethods.WS_POPUP,
                    0, 0, 0, 0, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero);

                // subclass it with a custom WndProc
                IntPtr prevWndProc = IntPtr.Zero;

                var newWndProc = new NativeMethods.WndProc((hwnd, msg, wParam, lParam) =>
                {
                    if (msg == NativeMethods.WM_TEST)
                        Console.WriteLine("WM_TEST processed");
                    return NativeMethods.CallWindowProc(prevWndProc, hwnd, msg, wParam, lParam);
                });

                prevWndProc = NativeMethods.SetWindowLong(hwndStatic, NativeMethods.GWL_WNDPROC, newWndProc);
                if (prevWndProc == IntPtr.Zero)
                    throw new ApplicationException();

                // post a test WM_TEST message to it
                NativeMethods.PostMessage(hwndStatic, NativeMethods.WM_TEST, IntPtr.Zero, IntPtr.Zero);

                // BlockingCollection blocks without pumping, NativeMethods.WM_TEST never arrives
                try { var task = tasks.Take(); }
                catch (Exception e) { Console.WriteLine(e.Message); }

                if (pump)
                {
                    // NativeMethods.WM_TEST will arrive, because Win32 MessageBox pumps
                    Console.WriteLine("Now start pumping...");
                    NativeMethods.MessageBox(IntPtr.Zero, "Pumping messages, press OK to stop...", String.Empty, 0);
                }
            });

            thread.SetApartmentState(ApartmentState.STA);
            thread.Start();

            Thread.Sleep(2000);

            // this causes the STA thread to end
            tasks.CompleteAdding(); 

            thread.Join();
        }

        static void Main(string[] args)
        {
            Console.WriteLine("Testing without pumping...");
            RunStaThread(false);

            Console.WriteLine("\nTest with pumping...");
            RunStaThread(true);

            Console.WriteLine("Press Enter to exit");
            Console.ReadLine();
        }
    }

    // Interop
    static class NativeMethods
    {
        [DllImport("user32")]
        public static extern IntPtr SetWindowLong(IntPtr hwnd, int nIndex, WndProc newProc);

        [DllImport("user32")]
        public static extern IntPtr CallWindowProc(IntPtr lpPrevWndFunc, IntPtr hwnd, int msg, int wParam, int lParam);

        [DllImport("user32.dll")]
        public static extern IntPtr CreateWindowEx(int dwExStyle, string lpClassName, string lpWindowName, int dwStyle, int x, int y, int nWidth, int nHeight, IntPtr hWndParent, IntPtr hMenu, IntPtr hInstance, IntPtr lpParam);

        [DllImport("user32.dll")]
        public static extern bool PostMessage(IntPtr hwnd, uint msg, IntPtr wParam, IntPtr lParam);

        [DllImport("user32.dll")]
        public static extern int MessageBox(IntPtr hwnd, string text, String caption, int options);

        public delegate IntPtr WndProc(IntPtr hwnd, int msg, int wParam, int lParam);

        public const int GWL_WNDPROC = -4;
        public const int WS_POPUP = unchecked((int)0x80000000);
        public const int WM_USER = 0x0400;

        public const int WM_TEST = WM_USER + 1;
    }
}

这将产生以下输出:
在不进行泵送的情况下进行测试...
集合参数为空,并已标记为完成添加。
进行泵送测试... 集合参数为空,并已标记为完成添加。 现在开始泵送... WM_TEST 已处理 按 Enter 键退出

1
你引用的文本是指Task和线程池的实现,而不是StaTaskScheduler。托管线程确实会进行消息泵处理;请参见此处此处。所以我不知道为什么你会看到死锁。 - Stephen Cleary
@StephenCleary,我确实意识到这与TPL无关,而是涉及底层的WaitHandle.Wait。我已经编辑了问题,并附上了一些示例代码,展示了STA线程不会泵的情况。我的代码有错误吗? - avo
@Noseratio,我明白了。对象“A”可能确实在内部使用PostMessage,而CoWaitForMultipleHandlesWaitHandle.Wait中没有被调用,就像BlockingCollection正在等待的那样。我无法访问“A”的源代码以确定这一点。 - avo
1
@avo,你可能会对这个感兴趣。 - noseratio - open to work
参考:http://lostechies.com/gabrielschenker/2009/01/23/synchronizing-calls-to-the-ui-in-a-multi-threaded-application/ - LCJ
显示剩余5条评论
2个回答

38

我理解您的问题是:您仅使用StaTaskScheduler来组织经典的COM STA单元,用于您的旧COM对象。 您没有在StaTaskScheduler的STA线程上运行WinForms或WPF核心消息循环。也就是说,您没有在该线程中使用类似于Application.RunApplication.DoEventsDispatcher.PushFrame之类的东西。如果我这样做了错误的假设,请纠正我。

仅靠StaTaskScheduler无法在创建的STA线程上安装任何同步上下文。 因此,您要依赖CLR为您提供消息泵。 我只找到了一个隐含的确认,即CLR会在STA线程上执行消息泵,在Chris Brumme的《CLR中的单元和泵》文章中有所提及:

我不断地说,在STA线程上调用托管阻塞将执行“一些泵”。能否知道会被泵送的确切内容?不幸的是,泵送是一种超越人类理解的黑魔法。在Win2000及更高版本中,我们只需委托OLE32的CoWaitForMultipleHandles服务。

这表明CLR在STA线程上内部使用CoWaitForMultipleHandles。 此外,COWAIT_DISPATCH_WINDOW_MESSAGES标志的MSDN文档提到了这一点

...在STA中,只有一小组特殊的消息可以分派。

我进行了相关研究,但无法使用CoWaitForMultipleHandles从您的示例代码中泵送WM_TEST,我们在您的问题评论中进行了讨论。 我的理解是,《小组特殊的消息》仅限于某些COM Marshaller特定的消息,并不包括任何常规的通用消息,例如WM_TEST

因此,回答您的问题:

...我应该实现自定义同步上下文,该上下文将使用CoWaitForMultipleHandles明确地泵送消息,并在由StaTaskScheduler启动的每个STA线程上安装它吗?

我相信创建自定义的同步上下文并覆盖 SynchronizationContext.Wait 是正确的解决方案。

然而,应该避免使用 CoWaitForMultipleHandles,并且改用MsgWaitForMultipleObjectsEx。如果 MsgWaitForMultipleObjectsEx 指示队列中有一个挂起的消息,则应使用 PeekMessage(PM_REMOVE)DispatchMessage 手动处理它。然后,在同一个 SynchronizationContext.Wait 调用中继续等待句柄。

注意,MsgWaitForMultipleObjectsExMsgWaitForMultipleObjects 之间存在微妙但重要的区别。如果队列中已经看到了一个消息(例如使用 PeekMessage(PM_NOREMOVE)GetQueueStatus),但尚未删除,后者不会返回并一直阻塞。这对于泵送不好,因为您的 COM 对象可能正在使用类似 PeekMessage 的东西来检查消息队列。这可能会导致在不期望的情况下 MsgWaitForMultipleObjects 阻塞。

另一方面,带有 MWMO_INPUTAVAILABLE 标志的 MsgWaitForMultipleObjectsEx 没有这种缺点,并且在这种情况下会返回。

一段时间以前,我创建了 StaTaskScheduler 的自定义版本(可在此处作为ThreadAffinityTaskScheduler使用),试图解决不同的问题:维护具有线程亲和性的线程池,以便后续的 await 继续执行。如果您在多个 await 中使用 STA COM 对象,则线程亲和性非常重要。原始的 StaTaskScheduler 仅在其池被限制为 1 个线程时才表现出这种行为。

我继续对你的WM_TEST案例进行了更多实验。最初,我在STA线程上安装了标准的SynchronizationContext类的实例。预计WM_TEST消息不会被处理,结果也是如此。

然后,我重写了SynchronizationContext.Wait,只是将它转发到SynchronizationContext.WaitHelper。它确实被调用了,但仍然没有处理WM_TEST消息。

最后,我实现了一个完整的消息泵循环,以下是其中的核心部分:

// the core loop
var msg = new NativeMethods.MSG();
while (true)
{
    // MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns,
    // even if there's a message already seen but not removed in the message queue
    nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx(
        count, waitHandles,
        (uint)remainingTimeout,
        QS_MASK,
        NativeMethods.MWMO_INPUTAVAILABLE);

    if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult)
        return managedResult;

    // there is a message, pump and dispatch it
    if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE))
    {
        NativeMethods.TranslateMessage(ref msg);
        NativeMethods.DispatchMessage(ref msg);
    }
    if (hasTimedOut())
        return WaitHandle.WaitTimeout;
}

这确实可以,WM_TEST会被激活。下面是你测试的改编版本:

public static async Task RunAsync()
{
    using (var staThread = new Noseratio.ThreadAffinity.ThreadWithAffinityContext(staThread: true, pumpMessages: true))
    {
        Console.WriteLine("Initial thread #" + Thread.CurrentThread.ManagedThreadId);
        await staThread.Run(async () =>
        {
            Console.WriteLine("On STA thread #" + Thread.CurrentThread.ManagedThreadId);
            // create a simple Win32 window
            IntPtr hwnd = CreateTestWindow();

            // Post some WM_TEST messages
            Console.WriteLine("Post some WM_TEST messages...");
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(1), IntPtr.Zero);
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(2), IntPtr.Zero);
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(3), IntPtr.Zero);
            Console.WriteLine("Press Enter to continue...");
            await ReadLineAsync();

            Console.WriteLine("After await, thread #" + Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine("Pending messages in the queue: " + (NativeMethods.GetQueueStatus(0x1FF) >> 16 != 0));

            Console.WriteLine("Exiting STA thread #" + Thread.CurrentThread.ManagedThreadId);
        }, CancellationToken.None);
    }
    Console.WriteLine("Current thread #" + Thread.CurrentThread.ManagedThreadId);
}

输出结果:

初始线程 #9
在STA线程 #10 上
发布一些 WM_TEST 消息...
按 Enter 键继续...
处理了 WM_TEST: 1
处理了 WM_TEST: 2
处理了 WM_TEST: 3
await 后,在线程 #10 上 队列中待处理的消息: False 退出 STA 线程 #10 当前线程 #12 按任意键退出

请注意,此实现支持线程亲和性(即在await后仍然停留在线程#10)和消息泵。全源代码包含可重复使用的部分(ThreadAffinityTaskSchedulerThreadWithAffinityContext),并且可以在 此处作为独立控制台应用程序 找到。它尚未经过彻底测试,请自行承担风险。


1
它起作用了!我使用了您的ThreadWithAffinityContext,并像您的示例中所示一样未更改它。我在其中创建了我的COM对象AB。它确实解决了问题,死锁已经消失了!非常感谢您在此方面的工作,它对我非常有用,我认为其他人也会从中受益。 - avo
1
另外,你说得对,我在后台STA线程上没有使用任何.NET UI控件或表单,它仅用于传统的COM对象和一些辅助的.NET数据结构。 - avo
2
非常感谢您,Noseratio!这是我迄今为止看到的第一种可靠的管理COM互操作的方式,即第一种不容易破坏整个编程模型的方式。太棒了! - Simon Thum

17
STA线程抽取是一个大问题,很少有程序员可以愉快地解决死锁。关于此的开创性论文是由Chris Brumme撰写的,他是一位在.NET上工作的主要聪明人。您可以在这篇博客文章中找到它。不幸的是,它缺乏具体细节,他只是指出CLR会进行一些抽取,但没有任何关于确切规则的详细信息。
他所说的代码,在.NET 2.0中添加,存在于名为MsgWaitHelper()的内部CLR函数中。.NET 2.0的源代码可通过SSCLI20分发获得。非常完整,但未包含MsgWaitHelper()的源代码。相当不寻常。反编译它几乎是无望的,因为它非常庞大。
从他的博客文章中需要记住的是重新进入的危险性。在STA线程中进行抽取,由于其能够分派Windows消息并在程序不处于允许执行此类代码的正确状态时执行任意代码,因此是危险的。这是大多数VB6程序员知道的,当他使用DoEvents()停止UI中的模态循环时。我写了一篇关于它最典型危险的文章。MsgWaitHelper()本身也会进行这种类型的抽取,但是它非常选择性地允许运行哪种代码。
您可以在不附加调试器的情况下运行测试程序,然后附加一个非托管调试器,以了解它在内部执行的内容。您将看到它在NtWaitForMultipleObjects()上阻塞。我更进一步,在PeekMessageW()上设置了断点,以获得此堆栈跟踪:
user32.dll!PeekMessageW()   Unknown
combase.dll!CCliModalLoop::MyPeekMessage(tagMSG * pMsg, HWND__ * hwnd, unsigned int min, unsigned int max, unsigned short wFlag) Line 2305  C++
combase.dll!CCliModalLoop::PeekRPCAndDDEMessage() Line 2008 C++
combase.dll!CCliModalLoop::FindMessage(unsigned long dwStatus) Line 2087    C++
combase.dll!CCliModalLoop::HandleWakeForMsg() Line 1707 C++
combase.dll!CCliModalLoop::BlockFn(void * * ahEvent, unsigned long cEvents, unsigned long * lpdwSignaled) Line 1645 C++
combase.dll!ClassicSTAThreadWaitForHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * pdwIndex) Line 46 C++
combase.dll!CoWaitForMultipleHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * lpdwindex) Line 120 C++
clr.dll!MsgWaitHelper(int,void * *,int,unsigned long,int)   Unknown
clr.dll!Thread::DoAppropriateWaitWorker(int,void * *,int,unsigned long,enum WaitMode)   Unknown
clr.dll!Thread::DoAppropriateWait(int,void * *,int,unsigned long,enum WaitMode,struct PendingSync *)    Unknown
clr.dll!CLREventBase::WaitEx(unsigned long,enum WaitMode,struct PendingSync *)  Unknown
clr.dll!CLREventBase::Wait(unsigned long,int,struct PendingSync *)  Unknown
clr.dll!Thread::Block(int,struct PendingSync *) Unknown
clr.dll!SyncBlock::Wait(int,int)    Unknown
clr.dll!ObjectNative::WaitTimeout(bool,int,class Object *)  Unknown

注意,我记录了这个堆栈跟踪是在Windows 8.1上的,在旧版本的Windows上,它看起来会非常不同。COM模态循环在Windows 8中已经被大幅修改,对于WinRT程序来说也非常重要。虽然我对此并不了解太多,但它似乎有另一个名为ASTA的STA线程模型,执行一种更严格的泵送方式,并通过添加CoWaitForMultipleObjects()方法得以实现。

ObjectNative::WaitTimeout()就是SemaphoreSlim.Wait()在BlockingCollection.Take()方法内开始执行CLR代码的地方。你可以看到它穿过内部CLR代码的层级,最终抵达神秘的MsgWaitHelper()函数,然后切换到臭名昭著的COM模态调度器循环。

它进行“错误”类型的泵送而导致在您的程序中出现的问题是调用了CliModalLoop::PeekRPCAndDDEMessage()方法。换句话说,它只考虑被发布到特定内部窗口的交互操作消息,该窗口分派跨越公寓边界的COM调用。它将不会泵送在您自己的窗口消息队列中的消息。

这是可以理解的行为,当Windows能够看到您的UI线程空闲时,它只能绝对肯定重入不会损坏程序。当它本身泵送消息循环时,调用PeekMessage()或GetMessage()函数即表明该状态。问题是,你自己没有进行泵送。你违反了STA线程的核心约定,它必须进行消息循环的泵送。希望COM模态循环为您完成泵送就是徒劳无功。

实际上,您可以解决这个问题,尽管我不建议您这样做。CLR将把等待操作留给应用程序本身通过正确构造的SynchronizationContext.Current对象来执行。您可以通过派生自己的类并覆盖Wait()方法来创建一个SynchronizationContext对象。调用SetWaitNotificationRequired()方法来说服CLR让你自己来处理。下面是演示此方法的未经完整处理的版本:

class MySynchronizationProvider : System.Threading.SynchronizationContext {
    public MySynchronizationProvider() {
        base.SetWaitNotificationRequired();
    }
    public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout) {
        for (; ; ) {
            int result = MsgWaitForMultipleObjects(waitHandles.Length, waitHandles, waitAll, millisecondsTimeout, 8);
            if (result == waitHandles.Length) System.Windows.Forms.Application.DoEvents();
            else return result;
        }
    }
    [DllImport("user32.dll")]
    private static extern int MsgWaitForMultipleObjects(int cnt, IntPtr[] waitHandles, bool waitAll,
        int millisecondTimeout, int mask);        
}

并在您的线程开始时安装它:

    System.ComponentModel.AsyncOperationManager.SynchronizationContext =
        new MySynchronizationProvider();

现在您将看到WM_TEST消息被分派。调用Application.DoEvents()实现了此操作。我本可以使用PeekMessage+DispatchMessage来掩盖它,但这会混淆代码的危险性,最好不要隐藏DoEvents()。您正在进行非常危险的重入游戏,不要使用此代码。

长话短说,在正确使用StaThreadScheduler的唯一希望是在已经实现STA合同并像STA线程应该做的那样泵的代码中使用它。 它确实意味着为旧代码提供紧急救援,因为您无法控制线程状态。 就像在VB6程序或Office添加程序中启动的任何代码一样。 经过一些实验,我认为它实际上无法工作。值得注意的是,随着asych/await的可用性,它的需要应完全消除。


感谢你提供的出色解释。 - Just another metaprogrammer
感谢您详细解释了背后发生的情况,这让我清楚地了解了问题。您发布的“Wait”的版本确实会发送我的“WM_TEST”消息。然而,它并不能解决死锁的原始问题。我将尝试另一个答案建议的“MsgWaitForMultipleObjectsEx”与“MsgWaitForMultipleObjects”。 - avo
1
@HansPassant,我很想知道DoEvents与专门用于COM对象的后台STA线程有什么关系,该线程与主UI线程分开。顺便说一句,OP甚至没有指定他使用WPF还是WinForms。我强烈建议不要在没有创建任何WinForms组件的线程上调用DoEvents - noseratio - open to work
1
我并不是想用我的p/invoke CreateWindow示例来混淆任何人,如果我这样做了,对不起。它只是为了表明StaTaskScheduler不会泵送消息,我想不出更好的方法。在问题的第一部分中,我只使用COM对象。但是它们可能在内部使用一些Win32窗口,我没有它们的源代码。 - avo
1
@avo,是否使用DoEvents来进行消息泵处理取决于您,但请注意这个严重的错误会影响其行为。同时,请记住MsgWaitForMultipleObjectsExMsgWaitForMultipleObjects语义之间的区别,以观察队列中现有的消息。 - noseratio - open to work
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接