Task TPL的Thread.Interrupt等效方法

8
一些背景信息:我的C#代码调用了一些非托管代码(C++),该代码进行阻塞等待。然而,这个阻塞等待是可警告的(就像Thread.Sleep - 我想它在幕后使用bAlertable TRUE调用WaitForSingleObjectEx);我确定它是可警告的,因为它可以被QueueUserAPC“唤醒”。
如果我能简单地使用托管线程,我将只需调用阻塞方法,然后在需要退出时使用Thread.Interrupt来“唤醒”线程;类似于这样:
void ThreadFunc() {
    try {
           Message message;
           comObject.GetMessage(out message);
           //....
     }
     catch (ThreadInterruptedException) {
        // We need to exit
        return;
     }
}

var t - new Thread(ThreadFunc);
//....
t.Interrupt();

(注意:我不使用此代码,但据我所知,它可能适用于这种特殊情况(在我无法控制的未托管代码中等待并发出警报)。我正在寻找TPL中最佳等效项(或更好的替代方法!)。

但我必须使用TPL(任务而不是托管线程),而未托管的方法超出了我的控制(例如,我无法修改它以调用WaitForMultipleObjectEx并在我发出信号的情况下返回)事件,例如)。

我正在寻找与任务的Thread.Interrupt等效项(将在基础线程上发布APC的内容)。 据我所知,CancellationTokens要求代码“具有任务意识”,并且不使用此技术,但我不确定:我想知道,如果任务执行Thread.Sleep会发生什么情况(我知道有一个Task.Wait,但它只是为了有一个非任务等待的示例,这是可警报的),它可以被取消吗?

我的假设是错误的(我的意思是,我可以只使用CT,一切都能正常工作吗?但是如何实现?)。

如果没有这样的方法... 我愿意听取建议。我真的希望避免混合线程和任务,或使用P/Invoke,但如果没有其他方法,我仍然希望以最“干净”的方式完成它(这意味着:没有粗鲁的中止,而是一些“任务式”的东西 :))

如果你好奇的话,我已经“确认”了Thread.Interrupt在我的情况下可以工作,因为它调用了QueueUserAPC。它调用InterruptInternal,然后调用Thread::UserInterrupt,然后调用Alert,这会将APC排队。实际上非常聪明,因为它允许您在不需要使用其他同步原语的情况下休眠/等待并唤醒线程。

我只需要找到一个遵循相同流程的TPL原语即可。


你的代码示例“可能”是足够安全的,这取决于省略部分。但特别要注意的是,如果您没有一种方法来保证在调用COM调用期间调用Thread.Interrupt(),则可能会在不恰当的时间中断您的托管代码以破坏状态。另请注意,根据COM调用正在执行的操作,您可能无法在那个时刻实际上中断线程。它很容易直到COM调用完成才抛出异常。 - Peter Duniho
哦,那不是问题。Thread.Interrupt 以非常特定的方式运作;它会“等待”线程处于 WaitSleepJoin 状态,所以不用担心代码会被中断,除非它处于“休眠”状态(参见 MSDN)。 - Lorenzo Dematté
请注意,我同意盲目使用 Sleep/Interrupt 是一种危险的模式,但如果一个人知道发生了什么,并不是坏事;在我看来,它实现得非常稳健。 - Lorenzo Dematté
1
是的,没错。只是你的例子中没有足够的细节来知道什么时候会发生这种情况,以及在代码的哪些点被中断是否真的安全。这完全取决于代码正在做什么,这就是我所说的全部。 - Peter Duniho
2个回答

4
我想知道,如果一个任务执行了Thread.Sleep(我知道有Task.Wait,但这只是为了举例非任务等待可以被警告),它是否可以被取消?
不行。任务的取消由用户定义。这是协作取消,需要用户显式检查CancellationToken的状态。
请注意,有一种重载的Task.Wait接受CancellationToken:
/// <summary>
/// Waits for the task to complete, for a timeout to occur, 
/// or for cancellation to be requested.
/// The method first spins and then falls back to blocking on a new event.
/// </summary>
/// <param name="millisecondsTimeout">The timeout.</param>
/// <param name="cancellationToken">The token.</param>
/// <returns>true if the task is completed; otherwise, false.</returns>
private bool SpinThenBlockingWait(int millisecondsTimeout, 
                                  CancellationToken cancellationToken)
{
    bool infiniteWait = millisecondsTimeout == Timeout.Infinite;
    uint startTimeTicks = infiniteWait ? 0 : (uint)Environment.TickCount;
    bool returnValue = SpinWait(millisecondsTimeout);
    if (!returnValue)
    {
        var mres = new SetOnInvokeMres();
        try
        {
            AddCompletionAction(mres, addBeforeOthers: true);
            if (infiniteWait)
            {
                returnValue = mres.Wait(Timeout.Infinite,
                                        cancellationToken);
            }
            else
            {
                uint elapsedTimeTicks = ((uint)Environment.TickCount) -
                                               startTimeTicks;
                if (elapsedTimeTicks < millisecondsTimeout)
                {
                    returnValue = mres.Wait((int)(millisecondsTimeout -
                                             elapsedTimeTicks), cancellationToken);
                }
            }
        }
        finally
        {
            if (!IsCompleted) RemoveContinuation(mres);
            // Don't Dispose of the MRES, because the continuation off
            // of this task may still be running.  
            // This is ok, however, as we never access the MRES' WaitHandle,
            // and thus no finalizable resources are actually allocated.
        }
    }
    return returnValue;
}

它将尝试根据某些条件启动线程。如果这不够,它最终会调用Monitor.Wait来实际阻塞:

/*========================================================================
** Waits for notification from the object (via a Pulse/PulseAll). 
** timeout indicates how long to wait before the method returns.
** This method acquires the monitor waithandle for the object 
** If this thread holds the monitor lock for the object, it releases it. 
** On exit from the method, it obtains the monitor lock back. 
** If exitContext is true then the synchronization domain for the context 
** (if in a synchronized context) is exited before the wait and reacquired 
**
** Exceptions: ArgumentNullException if object is null.
========================================================================*/
[System.Security.SecurityCritical]  // auto-generated
[ResourceExposure(ResourceScope.None)]
[MethodImplAttribute(MethodImplOptions.InternalCall)]
private static extern bool ObjWait(bool exitContext, int millisecondsTimeout, Object obj);

这正是我所怀疑的...谢谢你确认了它。 - Lorenzo Dematté
@PeterDuniho 这就是 Task.Wait 内部调用的内容。 - Yuval Itzchakov
所以它进行了繁忙等待..听到这个我有点失望,这肯定证实了我的怀疑:这里没有“真正的等待”。 - Lorenzo Dematté
@Lorenzo - 如果旋转不足够,它最终会调用 Monitor.Wait,这是一个阻塞调用。 - Yuval Itzchakov
请注意,取消令牌并没有一直传递到 Monitor.Wait。最终只会传递一个超时时间。 - Yuval Itzchakov
显示剩余4条评论

4

目前,所有现有的生产CLR宿主实现一对一的托管到非托管线程映射。这在Windows桌面操作系统系列中尤为明显,其中运行您的传统COM对象。

基于此,您可以使用TPL的Task.Run代替经典线程API,并通过p/invoke调用QueueUserAPC以释放您的COM对象,使其从可改变的等待状态中退出,当取消令牌被触发时。

下面的代码展示了如何做到这一点。需要注意的一点是,所有ThreadPool线程(包括由Task.Run启动的线程)都隐式地运行在COM MTA公寓下。因此,COM对象需要支持MTA模型而不进行隐式的COM编组。如果不是这种情况,则可能需要使用自定义任务调度程序(例如StaTaskScheduler)来代替Task.Run

using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    class Program
    {
        static int ComGetMessage()
        {
            NativeMethods.SleepEx(2000, true);
            return 42;
        }

        static int GetMessage(CancellationToken token)
        {
            var apcWasCalled = false;
            var gcHandle = default(GCHandle);
            var apcCallback = new NativeMethods.APCProc(target => 
            {
                apcWasCalled = true;
                gcHandle.Free();
            });

            var hCurThread = NativeMethods.GetCurrentThread();
            var hCurProcess = NativeMethods.GetCurrentProcess();
            IntPtr hThread;
            if (!NativeMethods.DuplicateHandle(
                hCurProcess, hCurThread, hCurProcess, out hThread,
                0, false, NativeMethods.DUPLICATE_SAME_ACCESS))
            {
                throw new System.ComponentModel.Win32Exception(Marshal.GetLastWin32Error());
            }
            try
            {
                int result;
                using (token.Register(() => 
                    {
                        gcHandle = GCHandle.Alloc(apcCallback);
                        NativeMethods.QueueUserAPC(apcCallback, hThread, UIntPtr.Zero);
                    },
                    useSynchronizationContext: false))
                {
                    result = ComGetMessage();
                }
                Trace.WriteLine(new { apcWasCalled });
                token.ThrowIfCancellationRequested();
                return result;
            }
            finally
            {
                NativeMethods.CloseHandle(hThread);
            }
        }

        static async Task TestAsync(int delay)
        {
            var cts = new CancellationTokenSource(delay);
            try
            {
                var result = await Task.Run(() => GetMessage(cts.Token));
                Console.WriteLine(new { result });
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Cancelled.");
            }
        }

        static void Main(string[] args)
        {
            TestAsync(3000).Wait();
            TestAsync(1000).Wait();
        }

        static class NativeMethods
        {
            public delegate void APCProc(UIntPtr dwParam);

            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern uint SleepEx(uint dwMilliseconds, bool bAlertable);

            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern uint QueueUserAPC(APCProc pfnAPC, IntPtr hThread, UIntPtr dwData);

            [DllImport("kernel32.dll")]
            public static extern IntPtr GetCurrentThread();

            [DllImport("kernel32.dll")]
            public static extern IntPtr GetCurrentProcess();

            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern bool CloseHandle(IntPtr handle);

            public const uint DUPLICATE_SAME_ACCESS = 2;

            [DllImport("kernel32.dll", SetLastError = true)]
            public static extern bool DuplicateHandle(IntPtr hSourceProcessHandle,
               IntPtr hSourceHandle, IntPtr hTargetProcessHandle, out IntPtr lpTargetHandle,
               uint dwDesiredAccess, bool bInheritHandle, uint dwOptions);
        }
    }
}

2
非常好的答案,我会采用这种方法,因为似乎没有“隐式”的APC由取消令牌本身发送。只有一个观察:即使您的前提是正确的,我也会额外保险并调用BeginThreadAffinity和EndThreadAffinity。 - Lorenzo Dematté
@LorenzoDematté,这里使用Begin/EndThreadAffinity肯定不会有问题。 - noseratio - open to work

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接