使用C#的新异步特性,等待网络数据包的最佳方法是什么?

7

最近我一直在尝试新的Async CTP,但是遇到了一个不确定如何处理的情况。

在我的当前代码库中,我使用“任务”和“任务管理器”的概念。任务仅存在于处理初始消息、发送响应并等待响应的目的。

我已经有基于同步套接字的现有代码,其中网络线程正在等待数据到达,然后将其传递给事件处理程序,最终传递给作业管理器。

作业管理器查找哪个作业将处理消息,并将其传递。

因此,情况是这样的:

  1. 作业管理器收到新消息并启动作业。
  2. 作业开始处理消息并发送回复消息。
  3. 此时作业将等待响应以回复。

以下是伪代码示例:

class MyJob : Job
{
    public override void RunJob( IPacketMsg packet )
    {
        // handle packet

        var myReply = new Packet();
        SendReply( myReply );

        await GetResponse();
    }
}

但我不确定如何进行第三步。作业管理器将获得响应,然后将其传递给正在运行的作业。但我不确定如何使作业等待响应。

我考虑创建一个等待的任务,它只是在WaitHandle上阻塞,但这是最好的解决方案吗?

在这种情况下还有其他事情可以做吗?

编辑 关于Async CTP,如果没有使用UI会发生什么情况。我已经阅读了Eric Lippert的异步博客,但我不认为它涉及到没有UI线程的情况下所有工作原理(它是否会分离出一个后台工作程序或...?)


所以您希望此线程在等待GetResponse时被阻止?如果是这样,那么“await”不是您需要使用的模式。您只需执行GetResponse().Wait()即可。 - Tejs
@Tejs:不,我不想让网络线程阻塞,我只想让这个作业的函数在等待响应时阻塞。其他作业需要能够在此作业等待响应时启动。本质上,我正在寻找类似协程的东西,具有在响应未到达时暂停此作业执行的能力。 - Ryan Stecker
请给我一个使用案例 - 您想执行 RunJob,它将启动 SendReply,并且您希望 GetResponse 的主体等待 SendReply 完成,但又不会阻塞线程? - Tejs
SendReply将同步完成。当它完成时,我假设我正在通信的远程方已经收到了我的回复并正在处理它。我希望GetResponse阻止(但不阻止整个网络线程),直到远程端向我发送了对我的回复的响应。 - Ryan Stecker
1
我确实没有涉及到那个问题。但是我碰巧知道十月份的MSDN杂志将会有一篇关于异步上下文背后工作原理的文章,所以请注意。 - Eric Lippert
3个回答

5
工作管理器收到新消息并启动任务。任务开始处理该消息并发送回复消息。此时,任务会等待回复的响应。
首先,我应该提到Async CTP非常擅长处理异步操作,但异步事件不太擅长。您可能需要考虑基于Rx的方法。但是,让我们继续使用Async CTP。
您有两个基本选项创建任务:
1.使用委托。例如,Task.Factory.StartNew将在线程池上运行一个委托。自定义任务工厂和调度程序为任务委托提供了更多选项(例如,指定委托必须在STA线程上运行)。
2.不使用委托。例如,TaskFactory.FromAsync包装现有的Begin/End方法对,TaskEx.FromResult返回“未来常数”,TaskCompletionSource可以用于显式地控制任务(FromAsync和FromResult都在内部使用TCS)。
如果工作处理是CPU密集型的,则将其交给Task.Factory.StartNew是有意义的。我假设工作处理是CPU密集型的。
工作管理器伪代码:
// Responds to a new message by starting a new job on the thread pool.
private void RespondToNewMessage(IPacketMsg message)
{
  IJob job = ..;
  Task.Factory.StartNew(job.RunJob(message));
}

// Holds tasks waiting for a response.
private ConcurrentDictionary<int, TaskCompletionSource<IResponse>> responseTasks = ..;

// Asynchronously gets a response for the specified reply.
public Task<IResponse> GetResponseForReplyAsync(int replyId)
{
  var tcs = new TaskCompletionSource<IResponse>();
  responseTasks.Add(replyId, tcs);
  return tcs.Task;
}

// Responds to a new response by completing and removing its task.
private void RespondToResponse(IResponse response)
{
  var tcs = responseTasks[response.ReplyId];
  responseTasks.Remove(response.ReplyId);
  tcs.TrySetComplete(response);
}

这个想法是让作业管理器也管理一个未完成响应的列表。为了实现这一点,我引入了一个简单的整数回复标识符,作业管理器可以使用它来确定哪个响应与哪个回复相对应。

现在作业可以像这样工作:

public override void RunJob(IPacketMsg packet)
{
  // handle packet
  var myReply = new Packet();
  var response = jobManager.GetResponseForReplyAsync(myReply.ReplyId);
  SendReply(myReply);

  await response;
}

由于我们正在将作业放在线程池线程上,因此存在一些棘手的问题:

  1. GetResponseForReplyAsync必须在发送回复之前调用(注册任务),然后稍后await。这是为了避免出现这样的情况:在我们有机会注册之前,就可能发送回复并收到响应。
  2. RespondToResponse将在完成任务之前删除任务注册,以防完成任务会导致使用相同id发送另一个回复。

如果作业足够短,不需要将其放置在线程池线程上,则可以简化解决方案。


感谢您的详细回复。然而,您的示例代码引发了我另一个疑问。我可以重构我的代码,使整个作业在任务中执行,但我担心当许多消息同时到达时会耗尽线程池。此外,并不是每个作业都必须等待响应,有些只需处理到达的消息,然后完成。这些类型的作业同步完成是可以接受的,因为不会发生重度处理。需要响应的作业将大部分时间花在等待响应上。 - Ryan Stecker
1
请记住,并非每个 Task 都在线程池上运行 - 实际上,有些甚至没有委托。你可以在基类中有一个 virtual Task RunJobAsync() 方法,而不是将 RunJob 传递给 StartNew。派生类可以通过调用 StartNew(使用线程池)或使用 async 方法(异步,在仅用于继续的线程池上)或调用 TaskEx.FromResult(同步)来实现它。这种最后一种方法被称为快速路径 - Stephen Cleary

3
关于Async CTP,如果用户界面没有被使用,会发生什么情况呢?我已经阅读了Eric Lippert的Async博客,但我认为它从未涉及在没有UI线程的情况下后台如何工作的问题(它是否会启动一个后台工作线程或者其他操作...?) await将返回到其同步上下文。在UI进程中,这是UI消息循环。在ASP.NET中,这是ASP.NET线程池。在其他情况下(控制台应用程序和Win32服务),没有上下文,因此延续被排队到ThreadPool。这通常不是期望的行为,因此我编写了一个AsyncContext类,可用于这些情况。
不使用BackgroundWorker。在像您这样的服务器端场景中,根本没有后台线程是很常见的。

1

您只需像这样使用await模式连接其余的事件处理程序:

 public async void RunJob(IPacketMsg msg)
 {
     // Do Stuff

     var response = await GetResponse();

     // response is "string", not "Task<string>"

     // Do More Stuff
 }

 public Task<string> GetResponse()
 {
     return Task.Factory.StartNew(() =>
        {
             _networkThingy.WaitForDataAvailable();

             return _networkThingy.ResponseString;
        });
 }

当您的获取响应任务完成时,方法的其余部分将在当前同步上下文中继续执行。然而,在此之前,您的方法执行被暂停(因此,在GetResponse中启动的任务完成之前,等待后的任何代码都不会运行)。


基本上,这归结为任务阻塞数据。在我的情况下,它将是作业中的WaitHandle成员,作业管理器将设置该成员以告诉作业数据包已到达。这是最佳选择吗?还是应该使用类似于Monitor.Wait/Pulse的东西,或者这两个本质上是相同的? - Ryan Stecker

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接