我正在为.NET移植AKKA框架(现在还不要太认真,目前只是对其中Actor部分的周末黑客马拉松)
我遇到了一些关于"Future"支持的问题。 在Java/Scala Akka中,Future需要使用Await同步等待调用。 很像.NET Task.Wait()。
我的目标是支持真正的异步等待。 它现在可以工作,但是在当前的解决方案中,延续在错误的线程上执行。
当向我的Actor之一传递包含未来等待块的消息时,就会出现以下结果。 如您所见,Actor总是在同一线程上执行,而等待块则在随机线程池线程上执行。
actor thread: 6
await thread 10
actor thread: 6
await thread 12
actor thread: 6
actor thread: 6
await thread 13
...
演员使用DataFlow
BufferBlock<Message>
接收消息,
或者,我使用RX通过bufferblock订阅消息。
配置如下:var messages = new BufferBlock<Message>()
{
BoundedCapacity = 100,
TaskScheduler = TaskScheduler.Default,
};
messages.AsObservable().Subscribe(this);
目前为止还不错。
然而,当我等待未来的结果时,像这样:
protected override void OnReceive(IMessage message)
{
....
var result = await Ask(logger, m);
// This is not executed on the same thread as the above code
result.Match()
.With<SomeMessage>(t => {
Console.WriteLine("await thread {0}",
System.Threading.Thread.CurrentThread.GetHashCode());
})
.Default(_ => Console.WriteLine("Unknown message"));
...
我知道使用async await的这种行为是正常的,但是我必须确保只有一个线程可以访问我的actor。
我不希望future同步运行,我希望它像正常情况下一样异步运行,但是我希望后续处理程序/actor在与消息处理器相同的线程上运行。
我的未来支持代码如下:
public Task<IMessage> Ask(ActorRef actor, IMessage message)
{
TaskCompletionSource<IMessage> result =
new TaskCompletionSource<IMessage>();
var future = Context.ActorOf<FutureActor>(name : Guid.NewGuid().ToString());
// once this object gets a response,
// we set the result for the task completion source
var futureActorRef = new FutureActorRef(result);
future.Tell(new SetRespondTo(), futureActorRef);
actor.Tell(message, future);
return result.Task;
}
有什么办法可以强制让上述代码的继续运行在启动它的同一线程上?