强制任务在当前线程继续执行?

10

我正在为.NET移植AKKA框架(现在还不要太认真,目前只是对其中Actor部分的周末黑客马拉松)

我遇到了一些关于"Future"支持的问题。 在Java/Scala Akka中,Future需要使用Await同步等待调用。 很像.NET Task.Wait()。

我的目标是支持真正的异步等待。 它现在可以工作,但是在当前的解决方案中,延续在错误的线程上执行。

当向我的Actor之一传递包含未来等待块的消息时,就会出现以下结果。 如您所见,Actor总是在同一线程上执行,而等待块则在随机线程池线程上执行。

actor thread: 6
await thread 10
actor thread: 6
await thread 12
actor thread: 6
actor thread: 6
await thread 13
...

演员使用DataFlow BufferBlock<Message> 接收消息, 或者,我使用RX通过bufferblock订阅消息。 配置如下:
var messages = new BufferBlock<Message>()
{
        BoundedCapacity = 100,
        TaskScheduler = TaskScheduler.Default,
};
messages.AsObservable().Subscribe(this);

目前为止还不错。

然而,当我等待未来的结果时,像这样:

protected override void OnReceive(IMessage message)
{
    ....

    var result = await Ask(logger, m);
    // This is not executed on the same thread as the above code
    result.Match()  
       .With<SomeMessage>(t => {
       Console.WriteLine("await thread {0}",
          System.Threading.Thread.CurrentThread.GetHashCode());
        })
       .Default(_ => Console.WriteLine("Unknown message"));
     ...

我知道使用async await的这种行为是正常的,但是我必须确保只有一个线程可以访问我的actor。
我不希望future同步运行,我希望它像正常情况下一样异步运行,但是我希望后续处理程序/actor在与消息处理器相同的线程上运行。
我的未来支持代码如下:
public Task<IMessage> Ask(ActorRef actor, IMessage message)
{
    TaskCompletionSource<IMessage> result = 
        new TaskCompletionSource<IMessage>();
    var future = Context.ActorOf<FutureActor>(name : Guid.NewGuid().ToString());

    // once this object gets a response, 
    // we set the result for the task completion source
    var futureActorRef = new FutureActorRef(result);            
    future.Tell(new SetRespondTo(), futureActorRef); 
    actor.Tell(message, future); 
    return result.Task;
}

有什么办法可以强制让上述代码的继续运行在启动它的同一线程上?

2
那个线程上的某些东西必须合作。你不能劫持一个线程。该线程必须以某种方式调用你的继续。也许可以使用自定义同步上下文。 - usr
1
线程池线程不会使用特定的工作项,它们会从队列中获取未指定的项。您将无法针对任何特定的线程进行操作。但是,您可以使用自定义任务计划程序来控制线程。这现在变得棘手了。也许更容易的方法是去除要求最终落到同一线程的要求?! - usr
1
我想说的是SetResult不能保证阻塞。如果你在文档中找不到这个声明,那么它就不能保证,你不能依赖它。你也无法测试它。 - usr
1
@usr:不幸的是,这并非总是如此(http://blogs.msdn.com/b/pfxteam/archive/2012/02/07/10265067.aspx)。 - Stephen Cleary
显示剩余13条评论
2个回答

6
我正在为.NET制作AKKA框架的移植版。
不错。我去参加了CodeMash '13上的一个Akka演讲,尽管从未接触过Java/Scala/Akka。我看到了在.NET库/框架中有很多潜力。微软正在开发类似的东西,希望最终能够普及(目前只有有限预览)。
我怀疑尽可能留在Dataflow/Rx世界是更容易的方法; 当您有异步操作(每个操作具有单个启动和单个结果)时,async最好,而Dataflow和Rx与流和订阅(具有单个启动和多个结果)配合得更好。因此,我的第一反应是将缓冲区块链接到具有特定调度程序的ActionBlock,或使用ObserveOn将Rx通知移动到特定调度程序,而不是尝试在async方面执行它。当然,我并不真正熟悉Akka API设计,所以需要谨慎对待。
无论如何,我的 async intro 描述了调度 await 继续的仅有两个可靠选项: SynchronizationContext.CurrentTaskScheduler.Current。如果你的 Akka 端口更像是一个框架(其中你的代码进行托管,并且最终用户代码始终由你的代码执行),那么使用 SynchronizationContext 可能是有意义的。如果你的端口更像是一个(其中最终用户代码进行托管并根据需要调用你的代码),那么使用 TaskScheduler 更合适。

自定义SynchronizationContext的示例并不多,因为这相当罕见。我在我的AsyncEx库中有一个AsyncContextThread类型,它为该线程定义了SynchronizationContextTaskScheduler。有几个自定义TaskScheduler的示例,例如Parallel Extensions Extras,其中包括一个STA scheduler和一个"current thread" scheduler


作为一个提示,Microsoft正在公开开发ActorFx库。我有点不清楚Orleans和ActorFx在未来是否应该合并,或者怎么处理。 F#用户组也讨论了将Akka移植到F#的问题,并提供了一些实现代码的指针。我想主要问题在于Fakka-F#Akka及其在扩大F#影响力方面可能发挥的作用 - Veksi

0
任务调度程序决定在新线程上或当前线程上运行任务。虽然有一个选项可以强制在新线程上运行,但没有强制在当前线程上运行的选项。 不过,有一个名为Task.RunSynchronously()的方法,可以在当前TaskScheduler上同步运行任务。 此外,如果您正在使用async/await,则已经有一个类似的问题了。

2
但是这个问题不是关于运行任务,而是关于在 await 之后执行的位置。 - svick

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接