工作管理器收到新消息并启动任务。任务开始处理该消息并发送回复消息。此时,任务会等待回复的响应。
首先,我应该提到Async CTP非常擅长处理异步操作,但异步事件不太擅长。您可能需要考虑基于Rx的方法。但是,让我们继续使用Async CTP。
您有两个基本选项创建任务:
1.使用委托。例如,Task.Factory.StartNew将在线程池上运行一个委托。自定义任务工厂和调度程序为任务委托提供了更多选项(例如,指定委托必须在STA线程上运行)。
2.不使用委托。例如,TaskFactory.FromAsync包装现有的Begin/End方法对,TaskEx.FromResult返回“未来常数”,TaskCompletionSource可以用于显式地控制任务(FromAsync和FromResult都在内部使用TCS)。
如果工作处理是CPU密集型的,则将其交给Task.Factory.StartNew是有意义的。我假设工作处理是CPU密集型的。
工作管理器伪代码:
private void RespondToNewMessage(IPacketMsg message)
{
IJob job = ..;
Task.Factory.StartNew(job.RunJob(message));
}
private ConcurrentDictionary<int, TaskCompletionSource<IResponse>> responseTasks = ..;
public Task<IResponse> GetResponseForReplyAsync(int replyId)
{
var tcs = new TaskCompletionSource<IResponse>();
responseTasks.Add(replyId, tcs);
return tcs.Task;
}
private void RespondToResponse(IResponse response)
{
var tcs = responseTasks[response.ReplyId];
responseTasks.Remove(response.ReplyId);
tcs.TrySetComplete(response);
}
这个想法是让作业管理器也管理一个未完成响应的列表。为了实现这一点,我引入了一个简单的整数回复标识符,作业管理器可以使用它来确定哪个响应与哪个回复相对应。
现在作业可以像这样工作:
public override void RunJob(IPacketMsg packet)
{
var myReply = new Packet();
var response = jobManager.GetResponseForReplyAsync(myReply.ReplyId);
SendReply(myReply);
await response;
}
由于我们正在将作业放在线程池线程上,因此存在一些棘手的问题:
GetResponseForReplyAsync
必须在发送回复之前调用(注册任务),然后稍后await
。这是为了避免出现这样的情况:在我们有机会注册之前,就可能发送回复并收到响应。
RespondToResponse
将在完成任务之前删除任务注册,以防完成任务会导致使用相同id发送另一个回复。
如果作业足够短,不需要将其放置在线程池线程上,则可以简化解决方案。
GetResponse().Wait()
即可。 - TejsRunJob
,它将启动SendReply
,并且您希望GetResponse
的主体等待SendReply
完成,但又不会阻塞线程? - Tejs