如何使UdpClient.ReceiveAsync()可取消?

16

我有一个接口INetwork,其中包含一个方法:

Task<bool> SendAsync(string messageToSend, CancellationToken ct)

接口的一种实现代码如下:

public async Task<bool> SendAsync(string messageToSend, CancellationToken ct)
{
  var udpClient = new UdpClient();
  var data = Encoding.UTF8.GetBytes (messageToSend);
  var sentBytes = await udpClient.SendAsync(data);
  return sentBytes == data.Length; 
}

不幸的是,UdpClient类的SendAsync()方法不接受CancellationToken

因此,我开始对其进行更改:

public Task<bool> SendAsync(string messageToSend, CancellationToken ct)
{
  var udpClient = new UdpClient();
  var data = Encoding.UTF8.GetBytes (messageToSend);
  var sendTask = udpClient.SendAsync(data);
  sendTask.Wait(ct);

  if(sendTask.Status == RanToCompletion)
  {
    return sendTask.Result == data.Length;
  }
}

显然这样做不起作用,因为没有返回Task。但是如果我返回任务,则签名不再匹配。SendAsync()返回一个Task<int>,但我需要一个Task<bool>

现在我很困惑。 :-)如何解决这个问题?


在默认模式下,我认为 sendTask.Result == data.Length 永远不会为false,而且只有当您使用占用多于一个字节的字符时,data.Length != messageToSend.Length 才成立。 - Scott Chamberlain
@ScottChamberlain 可能是。但这并不改变问题。如果接口需要一个布尔值来指示发送是否成功,那么如何从 Task<int> 获取 Task<bool>? - Krumelur
当您在SendAsync上等待时,只需返回任务即可,如果失败,它将抛出异常。 - Scott Chamberlain
@ScottChamberlain 我无法从SendAsync()返回Task。它是Task<int>,但接口中方法的签名需要Task<bool>。 - Krumelur
可能是异步网络操作永远不会完成的重复问题。 - i3arnon
3个回答

25

我知道有些晚了,但最近我不得不使一个UdpClient的ReceiveAsync/SendAsync能被取消。

你的第一个代码块是没有使用取消功能进行发送(顺便说一下,你的标题写的是接收...)。

你的第二个代码块绝对不是正确的方法。你调用了 *Async,然后又使用 Task.Wait 阻塞等待调用完成。这使得调用实际上是同步的,而调用 *Async 版本毫无意义。最好的解决方案是如下使用 Async:

...
var sendTask = udpClient.SendAsync(data);
var tcs = new TaskCompletionSource<bool>();
using( ct.Register( s => tcs.TrySetResult(true), null) )
{
    if( sendTask != await Task.WhenAny( task, tcs.Task) )
        // ct.Cancel() called
    else
        // sendTask completed first, so .Result will not block
}
...

在UdpClient中没有内置的方法来取消操作(这些函数都不接受CancellationToken),但您可以利用使用Task.WhenAny等待多个任务的能力。这将返回第一个完成的任务(这也是使用Task.Delay()实现超时的简单方法)。然后,我们只需要创建一个任务,该任务将在CancellationToken被取消时完成,这可以通过创建TaskCompletionSource并设置其为CancellationToken的回调来完成。

一旦取消,我们可以关闭套接字以实际“取消”底层读写操作。

这个想法最初来自于另一个处理文件句柄的SO答案,但它也适用于套接字。我通常会编写一个扩展方法来封装它:

public static class AsyncExtensions
{
    public static async Task<T> WithCancellation<T>( this Task<T> task, CancellationToken cancellationToken )
    {
        var tcs = new TaskCompletionSource<bool>();
        using( cancellationToken.Register( s => ( (TaskCompletionSource<bool>)s ).TrySetResult( true ), tcs ) )
        {
            if( task != await Task.WhenAny( task, tcs.Task ) )
            {
                throw new OperationCanceledException( cancellationToken );
            }
        }

        return task.Result;
    }
}
然后像这样使用它:
try
{
    var data = await client.ReceiveAsync().WithCancellation(cts.Token);
    await client.SendAsync(data.Buffer, data.Buffer.Length, toep).WithCancellation(cts.Token);
}
catch(OperationCanceledException)
{
    client.Close();
}

这个解决方案发布已经有几年了 - 有任何改进或变化吗? - khargoosh
我已经好几年没用C#了,所以我不知道有没有这样的。但是如果有的话我会很惊讶,因为在异步库中通常会使用这种方法(在函数外处理超时)。 - bj0
点赞;令人惊讶的是,即使发布了 System.IO.Pipelines,微软仍然没有解决这个问题;但是这个解决方法非常完美。非常感谢(如果有兴趣,我已将其转换为 F#)。 - knocte
如果你正在使用协程(异步),那么最不想做的事情就是使用阻塞调用来挂起执行,因为它会阻止其他协程运行。异步地执行让你在等待网络调用完成时可以做其他工作。 - bj0
4
这个解决方案存在一个潜在问题。即使取消标记已被触发,SendAsync 任务仍然会持续运行。如果程序在取消时设计为关闭,则可能没有问题。但是,如果程序继续运行,则孤立的 SendAsync 任务将继续运行直到完成。具有取消标记参数的 API 通常不会工作这种方式。 - Beevik
显示剩余2条评论

1
首先,如果你想返回Task<bool>,你可以简单地使用Task.FromResult()来实现。但是你可能不应该这样做,因为有一个异步方法实际上是同步的并没有太多意义。
除此之外,我认为你不应该假装方法已经被取消,即使它没有被取消。你可以在开始真正的SendAsync()之前检查令牌,但就只能这样了。
如果你真的想尽快假装方法已经被取消,你可以使用带有取消的ContinueWith()
var sentBytes = await sendTask.ContinueWith(t => t.Result, ct);

1
我不明白。SendAsync()是异步的,但不能被取消。Send()是同步的,但可以被取消...有时候微软的API看起来有点奇怪。 - Krumelur
@Krumelur 你如何取消 Send()?我没有看到任何方法可以这样做。 - svick
1
-1 是由我返回的。只有在 ReceiveAsync 完成后,才会调用 ContinueWith。如果数据永远不到达怎么办?应用程序甚至不会关闭(在我的情况下)! - uTILLIty
如果您取消 ct 令牌,那么由 ContinueWith 返回的 Task 将立即被取消。但是正如我所说,除非必须使用该代码,否则不应使用。 - svick

0

首先:你不能直接“取消”UdpClient.ReceiveAsync(),但是你可以在等待一段时间或者你想要取消的时间后简单地忽略它,但如果你对这个“无限等待任务线程”有焦虑感,你可以:

让我们分析使用场景:

  1. 不会重用 UdpClient 实例:
    只需使用它,ReceiveAsync 在被处理后就会关闭。
using (var udpClient = new UdpClient())
{
    await udpClient.SendAsync(sendData, sendData.Length, remoteEndPoint);
    var receiveAsyncTask = udpClient.ReceiveAsync();
    // wait time or use CancellationToken
    if (receiveAsyncTask.Wait(1000))
    {
        var data = receiveAsyncTask.Result;
        ProcessResult(data);
    }
}
  1. 将重复使用 UdpClient 实例:
    不用担心。当您“发送数据”时,将关闭先前的 ReceiveAsync 任务。
await udpClient.SendAsync(sendData, sendData.Length, remoteEndPoint);
var receiveAsyncTask = udpClient.ReceiveAsync();
// wait time or use CancellationToken
if (receiveAsyncTask.Wait(1000))
{
    var data = receiveAsyncTask.Result;
    ProcessResult(data);
}
  1. "我想要它立即关闭!"
    (2.) 告诉你 "发送数据" 将会关闭之前的 ReceiveAsync 任务。所以只需发送一个新的空消息。
await udpClient.SendAsync(sendData, sendData.Length, remoteEndPoint);
var receiveAsyncTask = udpClient.ReceiveAsync();
// wait time or use CancellationToken
if (receiveAsyncTask.Wait(1000))
{
    var data = receiveAsyncTask.Result;
    ProcessResult(data);
}
else
{
    udpClient.Send(new byte[0], 0, "127.0.0.1", 1);
    await receiveAsyncTask.ContinueWith(task => task.Dispose());
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接