异步/等待,TAP和EAP

4

我正在尝试从使用异步套接字代码(包含一些混乱的BeginSend/EndSendBeginReceive/EndReceive以及许多内部"管理")转换到使用异步TcpClient。我对异步/等待非常新手,请多多包涵...

假设以下代码(已剥离无关代码):

public async void StartReceive()
{
    while (true)
    {
        var stream = this.MyInternalTcpClient.GetStream();
        if (stream == null) return;

        var buffer = new byte[BUFFERSIZE];
        var bytesread = await stream.ReadAsync(buffer, 0, BUFFERSIZE);
        if (bytesread == 0)
        {
            if (Closed != null)
                Closed(this, new ClosedEventArgs());
            return;
        }

        var message = this.Encoding.GetString(buffer, 0, bytesread);
        this.MyInternalStringBuilder.Append(message);
        // ... message processing here ...

        foreach (var p in parts) {
            //Raise event per message-"part"
            if (MessageReceived != null)
                MessageReceived(this, new MessageReceivedEventArgs(p));
        }
    }
}

我的类有一个内部的stringbuilder,每次接收到数据时都会进行追加(这是因为消息可以在多个接收“事件”中被分割)。然后,在满足某些条件时,处理字符串生成器(“运行缓冲区”),将消息分割成消息的“部分”。对于每个“部分”,都会触发一个事件。系统中可以运行许多此类实例。
我的问题是:
  1. 我是否正确地认为/理解了 MyInternalStringBuilder.Append 从未“无序”调用?每次 TcpListener 接收数据时,它将被“按顺序”添加到(内部)字符串构建器中?我不需要使用锁吗?
  2. 由于此 StartReceive 方法使用一个内部的(“无限”)循环并引发事件,因此我不明白使 StartReceive 方法成为 async 的意义,但是我必须这样做(显然可以使用 await)。我知道我正在混合 TAP/EAP,但出于与本问题无关的原因,我必须这样做。然而,感觉很“肮脏”,因为到目前为止我所了解的是“async 不应该是 void”。也许有更好的方法来解决这个问题(除了完全转向 TAP)?

我建议观看来自channel9的ASP.NET中的异步,它还涵盖了C#中使用AMP、EAP和TAP进行异步编程的简要历史。这可能对你有所帮助。 - Jim Wolff
我一回家就会翻译(因为我这里没有音频),但我应该指出,这段代码正在作为“系统”服务运行,而不是在IIS中运行,并且与ASP.NET无关(尽管我不确定视频是否特别涉及此问题,但...)。 - user08968902360872346
1
这个内容特别是关于ASP.NET的,但异步编程的一般概念是相同的,我相信它无论如何都会为您提供一些价值。 - Jim Wolff
4个回答

3
是的,它是按顺序进行的。因为您只发布了一个活动缓冲区,然后在发布下一个之前对其进行处理,事件顺序是确定性的(发布->接收->处理->发布->接收...)。如果您只在此循环中使用SB,则无需锁定。
但是,在MessageReceived事件中发生的情况显然存在很多“假设”。假设您做了合理的事情(例如处理并发布响应),那么就应该没问题。如果您尝试从事件中接收更多信息,那么一切都会失控(例如发送响应,然后等待响应的响应,这将是不好的)。如果您的处理是事件驱动状态机(处于“bar”状态中接收到“foo”消息,用“spam”进行响应并将状态更改为“bam”,返回到循环等待更多事件),则通常情况下应该没问题。显然,没有代码很难下结论,所有内容都基于您的声明和我对您所做声明的理解(这是可以的,您似乎知道自己在说什么)。
您所描述的处理不是最快的(因为传入字节在您处理当前缓冲区时没有空间缓冲),但实现高吞吐量将更加棘手,这正是您提到的顺序问题所在。此外,如果流量是请求-响应,则实际上并不重要。
发布的缓冲区:向网络提交缓冲区以填充其内容。在.Net中,流操作和AFD/tcp.sys之间有大约100万层抽象,但概念基本相同。

流量确实是请求->响应(->请求->响应...重复),应该是完全异步的;因此,更好的描述方式是消息->响应(->消息->响应...)或消息1->消息2->消息3->响应,其中响应可能是对消息2的响应,但发送时不重要,只要它在某个时候被发送即可(最好尽快)。我应该指出系统中可以'运行'几个(很多)这些"客户端",每个都有自己的"状态机"。 - user08968902360872346
如果发送方在期望消息2的响应之前发送消息3,则不是请求-响应。请求-响应是一种协议(更确切地说是消息交换模式),在该协议中,双方都不能发送消息,直到轮到它们为止(即发送方必须在发送消息3之前等待消息2的响应)。因此,在理论上,如果允许消息3在处理消息2并正在制定/发送响应时到来并进行处理,则可以获得更好的吞吐量。但是,除非必须这样做,否则不要这样做。潘多拉魔盒。 - Remus Rusanu
所有“发送者”做的就是大喊:“嘿,你知道Foo吗?','嘿,你知道Bar吗?','嘿,看看FooBar!'。有时“接收者”需要发送消息:“酷,告诉Foo我说了声嗨!”当“发送者”需要回应时,它会停止喊叫并静静地等待回应。因此,如果它发送消息“嘿,看看这个香蕉,告诉我你的想法”,那么它将等待服务器“一段时间”后回复“不错,它已经变质了,扔了它” ,然后“发送者”将采取行动并继续大喊。 - user08968902360872346
这是一种有点“啰嗦”的系统,两端都有状态机。在这个系统中,“发送方”报告事件,“接收方”可以选择告诉“发送方”有时要对发生的某些事情采取行动(即使这是一些消息之前的事情)。 - user08968902360872346
显然这只是一个基于香蕉广播隐喻的猜测,但你应该考虑为“呼喊”(即不同的TCP管道,甚至是UDP广播或组播)设置单独的通道。在以前的生活中,我曾因类似的问题而受到伤害,因为当所有东西都在一个管道上流动时会发生固有的优先级反转。 - Remus Rusanu
很遗憾,我无法控制“发送者”。他们只是通过一个通道/管道发送东西... - user08968902360872346

2
完成Remus的回答后,您可能希望确保只能调用一次StartReceive,否则您将面临严重问题,并且需要锁定。
关于void返回,我个人认为这是可以接受的情况之一,“顶级”异步方法以StartBegin开头,正确传达这是一个“fire-and-forget”方法的含义。也就是说,如果您有这样的方法,您可能想重新设计事物。
为此,我将使用诸如TPL Dataflow之类的库,其中不同的操作表示为异步块。在您的情况下,会有一个“从套接字读取块”→“处理块”→“发送响应块”,每个块在异步触发后依次执行,允许您在处理时继续读取。通过这样做,您将不再拥有这个大循环,void返回也不再存在。但是需要更改许多东西。

如果并发调用尝试运行循环,显然会出现混乱。重新创建任何原始发送的消息将是不可能的。我甚至没有想到会有人尝试这样做 :) - Remus Rusanu
代码中没有列出来,但是在这个方法中有一个“if _running -> throw”结构,它会阻止对其进行多次“启动”。 - user08968902360872346

2
我理解的没错吧,MyInternalStringBuilder.Append永远不会“乱序”调用?每次TcpListener接收到数据时,它都会按顺序添加到(内部)字符串构建器中?我不需要使用锁?
是的,您理解得没错。async代码虽然是异步的,但自然是顺序的。因此,即使在StartReceive的内部,它一直返回和恢复,但不会同时恢复多次。
由于这个StartReceive方法使用了一个内部的(“无限”的)循环并引发事件,所以我不明白为什么要将StartReceive方法设置为async,但我必须这样做(显然为了能够使用await)。我知道我正在混合TAP/EAP,但出于与本问题无关的原因,我必须这样做。但是,感觉有点“脏”,因为“async不应该是void”是我到目前为止收集到的信息。也许有更好的方法来解决这个问题(除了完全转换为TAP)?
我不是很喜欢使用async void。首先,我假设你在StartReceive中有一个顶级的try/catch,如果观察到任何异常,它将启动套接字关闭(如果没有,则需要一个)。个人而言,我会将其编写为一个async Task方法,并考虑将Task公开为属性(如果您已经设置好了错误处理事件,则不需要Task属性)。
还有一件事情要注意;socket新手经常编写严格的只读/只写循环时存在问题:在只读部分期间,读取器无法检测到半开放场景;在只写部分期间,尤其是对于恶意客户端,存在死锁的(微小)可能性。理想情况下,套接字应始终具有正在进行的读取操作(您已经实现了),并且还应定期发送一些内容(例如,保持活动状态)。我有一个sockets FAQ,其中详细介绍了这个问题。
为了解决这个问题,我建议您在所有的`await`后面加上`ConfigureAwait(false)`,并且在构造函数中捕获一个`SynchronizationContext`(如果没有,则使用`new SynchronizationContext()`),然后将您的EAP事件发布到该上下文中。接下来,您需要另外一个独立的循环,可以定期发送保持活动消息(如果协议允许),或者如果一段时间内没有读取任何内容,则终止套接字。

首先,我假设在StartReceive中有一个顶级的try/catch,如果观察到任何异常,它将启动套接字关闭。为了清晰起见,我省略了它,但它是循环的“包装”(例如try { internal-loop } catch {})。感谢FAQ和其他有趣的观察结果;协议不支持“保持活动状态”(或“心跳”),这确实有点糟糕,我需要跟踪某种内部计时器/超时来终止“非活动”连接... - user08968902360872346

1
如果您在多个线程之间共享StringBuilder,则绝对需要使用锁。
此外,我不理解您所说的“有序”的含义。Append将按顺序调用。它可能不是您想要的顺序,但是这段代码中没有任何内容可以确保任何特定的顺序。
我可以说,在该方法内部调用Append将按照代码执行它们的顺序调用,这应该与您从代码中期望的行为完全相同,但是如果多个线程同时调用此方法,则线程之间的顺序未确定。
至于您的第二个问题,为什么不直接在任务或线程中调用它而不使用async关键字来编写整个代码块?

这个想法是为每个“客户端”实例化我的类的一个实例。所谓“有序”,是指TcpClient接收数据的顺序(假设TcpClient处理了无序TCP数据包等)。至于在任务/线程中调用它:听起来像是个好主意……但我不会因为“await”更有效地安排不同的等待而耗尽线程,对吧? - user08968902360872346
是的,但是你仍然会保留代码原样。说实话,现在代码结构没有任何问题,但个人而言,我不喜欢那些无法等待完成或取消的“fire-and-forget”方法。 - Lasse V. Karlsen
我可以通过关闭TcpClient来完成/取消,这将触发if (bytesread == 0) -> return ,从而“打破”无限循环。 - user08968902360872346

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接