在异步中调用PInvoke不会在执行后返回到主线程

4

我正在使用一个未管理的库,该库要求所有对其API的调用都在同一线程上运行。我们希望使用Reactive扩展的EventLoopScheduler来实现这一点,因为我们将在其他地方使用Observable。

我正在使用类似下面代码示例中的Run方法来在调度程序中执行代码,该方法将始终在同一线程上运行。当我使用托管代码时,这按预期工作,所有调用都在事件循环管理的线程上运行,并且异步调用之前/之后是主线程。

但是,当我调用P / Invoke(在代码示例中的函数仅用于举例,我并没有在我的代码中真正调用它,但行为是相同的)时,线程确实在事件循环线程上运行,但之后的所有操作也是如此!

我已经尝试添加ConfigureAwait(true)(和false),但没有改变任何内容。我对这种行为感到非常困惑,为什么调用P / Invoke会改变await之后继续运行的线程!!?

以下是重现代码:

[DllImport("user32.dll", CharSet = CharSet.Unicode, SetLastError = true)]
private static extern int MessageBox(IntPtr hWnd, string lpText, string lpCaption, uint uType);

public static Task Run(Action action, IScheduler scheduler)
{
    return Observable.Start(action, scheduler).SingleAsync().ToTask();
}

public static string ThreadInfo() =>
    $"\"{Thread.CurrentThread.Name}\" ({Thread.CurrentThread.ManagedThreadId})";

private static async Task Main(string[] args)
{
    var scheduler = new EventLoopScheduler();

    Console.WriteLine($"Before managed call on thread {ThreadInfo()}");

    await Run(() => Console.WriteLine($"Managed call on thread {ThreadInfo()}"), scheduler);

    Console.WriteLine($"After managed call on thread {ThreadInfo()}");

    Console.WriteLine($"Before PInvoke on thread {ThreadInfo()}");

    await Run(() => MessageBox(IntPtr.Zero, $"Running on thread {ThreadInfo()}", "Attention", 0), scheduler);

    Console.WriteLine($"After PInvoke on thread {ThreadInfo()}");
}

执行后会返回类似以下的结果:
Before managed call on thread "" (1)
Managed call on thread "Event Loop 1" (6)
After managed call on thread "" (1)
Before PInvoke on thread "" (1)
Message box displayed with text: Running on thread "Event Loop 1" (6)
After PInvoke on thread "Event Loop 1" (6)

我原本期望

Before managed call on thread "" (1)
Managed call on thread "Event Loop 1" (6)
After managed call on thread "" (1)
Before PInvoke on thread "" (1)
Message box displayed with text: Running on thread "Event Loop 1" (6)
After PInvoke on thread "" (1)

1
@SimonMourier 是的,首先这比仅创建事件循环要多做很多工作。然后,我实际上一开始就尝试了这个方法,但我无法找到一个好的解决方案使其与我用来轮询某些API调用的RX Observables配合工作,这个解决方案更适合我正在做的其他事情。对我而言更重要的是,即使我最终采取不同的做法,我也想知道为什么这样做不起作用。它会做一些非常反直觉的事情,破坏了我对C#异步/等待的理解。 - Gimly
控制台应用程序很特殊,因为它没有设置SynchronizationContext: https://devblogs.microsoft.com/pfxteam/await-synchronizationcontext-and-console-apps/。然而,我已经使用EventLoopScheduler进行了一些测试,但仍然无法正常工作: https://pastebin.com/raw/Kh2Yw9pn 看起来不太好。使用“常规”任务/任务调度程序可以正常工作。 - Simon Mourier
有趣。这是netfx还是netcore?我还没有使用过EventLoopScheduler,但我猜想它的想法是安装一个自定义同步上下文,将工作排队在初始线程上? - Voo
如果我有时间的话,明天我会试着使用 Rx 进行一些实验。但是我的第一个猜测是 EventLoopScheduler 并没有设置同步上下文本身,而只是在可观察对象内部使用。当你执行 await <observable> 时,当前的同步上下文被使用,这是控制台应用程序的默认上下文,与 EventLoopScheduler 没有任何关系,这意味着你会得到默认的“从线程池中选择线程”的效果。其中有一些优化可以很容易地导致你回到同步代码的同一线程。 - Voo
@Voo 在我的情况下是 .Net Core 3.1,我需要尝试使用 netfx。就我对 EventLoopScheduler 的理解,它应该创建一个线程,专门用于计划在其上运行的工作。正如您可以从我的示例中看到的那样,在 Run 方法中传递的工作确实在 EventLoopScheduler 创建的线程中运行,但 await 后的继续也在该线程上,这在异步/等待上下文中是没有意义的。 - Gimly
显示剩余2条评论
1个回答

3

任务

任务(Task)或者承诺(Promise)只是回调(callbacks)的一种抽象。而async/await只是对任务的一种语法糖。

由于它是回调抽象,await不会阻塞线程。 为什么它看起来像是在阻塞呢?那是因为await将你的代码重写成一个状态机(state-machine),当被等待的任务(Task)完成时,该状态机会进入到下一个状态。

大致上会被重写成这样:

switch (state)
{
    case 0:
        Console.WriteLine($"Before managed call on thread {ThreadInfo()}");
        Await(Run(() => Console.WriteLine($"Managed call on thread {ThreadInfo()}"), scheduler));
        return;
    case 1:

        Console.WriteLine($"After managed call on thread {ThreadInfo()}");
        Console.WriteLine($"Before PInvoke on thread {ThreadInfo()}");
        Await(Run(() => MessageBox(IntPtr.Zero, $"Running on thread {ThreadInfo()}", "Attention", 0), scheduler));
        return;
    case 2:
        Console.WriteLine($"After PInvoke on thread {ThreadInfo()}");
        return;
}

实际重写使用的是goto而不是switch,但概念相同。因此,当任务完成时,它会以相同的线程上下文中的状态+ = 1调用该状态机。只有在使用任务调度程序时才会看到任务池线程。

抽象中的泄漏

你看到这种特定行为的解释是:
After managed call on thread "" (1)

这相当复杂。它与预定的惰性计算立即完成与否有关。如果在第一个托管调用中添加 Thread.Sleep,则会注意到继续运行在事件循环线程上。

这是由于调度优化更喜欢只在当前正在运行的情况下排队。当您调用ToTask()时,您正在使用默认调度程序,即当前线程调度程序。

当前线程调度程序的工作方式如下:

空闲吗?立即运行。

忙碌吗?将工作排队。

立即运行行为是您在主线程上看到日志运行的原因。 如果您只添加

var scheduler = new EventLoopScheduler();
scheduler.Schedule(() => Thread.Sleep(1000));

从一开始,你让事件循环忙碌起来,导致所有事情都排队等待,因此你会在事件循环线程中看到所有日志记录。所以这与P/Invoke无关。

需要明确的是,这并不是指定观察者调度程序的问题,而是订阅的问题。当将Observable转换为其他抽象类型(如Tasks、Enumerables、Blocking Joins等)时,可能会出现一些内部复杂性泄漏。


这种“立即在当前线程上排队”的行为是一种不错的优化,但可能会带来很多问题。我记得曾经读过一个Stephen(我想)的文章,讲述了他们如何在某个框架代码中解决这个问题,以确保堆栈不会被耗尽(因为如果立即执行,堆栈会不断增长,与之相反,如果将其发布到线程池,则不会)。 - Voo
我想我记得。队列(queue)-如果需要的行为是微妙错误的原因。但是对于高性能是必要的。 - Asti
非常感谢您的详细解释,@Asti。那么,您会如何确保只有“Run”方法内的代码在特定线程上运行,而不是在“await”之后的继续执行? - Gimly
如果你真的想让它在主线程上运行,你必须使用Task.Wait而不是await(不推荐)。只需向ToTask方法传递另一个调度程序即可。 - Asti
@Gimly(仍然不了解RX.Net,因此可能有更好的解决方案),但我能看到的最简单的解决方法是将ToTask返回的任务包装在一个未完成的任务中,这应该意味着继续被发布到同步上下文而不是立即继续。我非常模糊地记得在上述博客文章中有一个更好的解决方案,但我的Googlefu很弱。 - Voo
1
有一个 ObserveOn 重载函数,它可以接受同步上下文参数。 - Asti

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接