没有接受CancellationToken的异步操作应该如何正确取消?

32

以下是正确的取消方式是什么?

var tcpListener = new TcpListener(connection);
tcpListener.Start();
var client = await tcpListener.AcceptTcpClientAsync();

仅仅调用 tcpListener.Stop() 似乎会导致一个 ObjectDisposedException 异常并且 AcceptTcpClientAsync 方法不接受一个 CancellationToken 结构。

我是不是完全忽略了某些显而易见的东西?

2个回答

25
假设您不想在TcpListener上调用Stop方法, 这里没有完美的解决方案。
如果您可以接受在一定时间内未完成操作时收到通知,但允许原始操作完成,则可以创建一个扩展方法,如下所示:
public static async Task<T> WithWaitCancellation<T>( 
    this Task<T> task, CancellationToken cancellationToken) 
{
    // The tasck completion source. 
    var tcs = new TaskCompletionSource<bool>(); 

    // Register with the cancellation token.
    using(cancellationToken.Register( s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs) ) 
    {
        // If the task waited on is the cancellation token...
        if (task != await Task.WhenAny(task, tcs.Task)) 
            throw new OperationCanceledException(cancellationToken); 
    }

    // Wait for one or the other to complete.
    return await task; 
}

上述内容来自Stephen Toub的博客文章"How do I cancel non-cancelable async operations?"

这里需要强调的是,这并不实际上取消操作,因为AcceptTcpClientAsync方法没有一个带有CancellationToken的重载,所以它无法被取消。

这意味着,如果扩展方法表明取消确实发生了,那么你正在取消对原始Task回调的等待,而不是取消操作本身。

为此,我将方法从WithCancellation重命名为WithWaitCancellation,以表明您取消的是等待而不是实际操作。
从那里开始,在您的代码中使用它很容易:
// Create the listener.
var tcpListener = new TcpListener(connection);

// Start.
tcpListener.Start();

// The CancellationToken.
var cancellationToken = ...;

// Have to wait on an OperationCanceledException
// to see if it was cancelled.
try
{
    // Wait for the client, with the ability to cancel
    // the *wait*.
    var client = await tcpListener.AcceptTcpClientAsync().
        WithWaitCancellation(cancellationToken);
}
catch (AggregateException ae)
{
    // Async exceptions are wrapped in
    // an AggregateException, so you have to
    // look here as well.
}
catch (OperationCancelledException oce)
{
    // The operation was cancelled, branch
    // code here.
}

请注意,如果等待被取消,则必须包装对客户端的调用以捕获抛出的OperationCanceledException实例。
我还添加了一个AggregateException catch,因为从异步操作中抛出异常时会进行包装(在这种情况下,您应该自行测试)。
这就留下了一个问题,即在有一个像Stop方法这样的方法(基本上是任何猛烈拆除一切的方法,无论正在发生什么),哪种方法更好,这当然取决于您的情况。
如果您不共享正在等待的资源(在本例中为 TcpListener),那么调用 abort 方法并吞噬任何来自等待操作的异常可能会更好地利用资源(当您调用 stop 并在其他等待操作的区域监视该位时,必须翻转一位)。这样会增加代码的复杂性,但如果您关心资源利用率并尽快清理资源,并且可以选择此选项,则应该采用此方法。
如果资源利用不是问题,并且您对更协作的机制感到满意,并且不共享资源,则使用 WithWaitCancellation 方法是可以的。优点在于它是更干净和更易于维护的代码。

这行代码:using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true) , tcs)) 和我写的方式有什么区别(因为我不理解前面那个):using (cancellationToken.Register(() => tcs.TrySetResult(true)))? - Alonzzo2

15
尽管casperOne的答案是正确的,但有一个更清晰的潜在实现方法可以用于WithCancellation(或WithWaitCancellation)扩展方法来实现相同的目标:
static Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken)
{
    return task.IsCompleted
        ? task
        : task.ContinueWith(
            completedTask => completedTask.GetAwaiter().GetResult(),
            cancellationToken,
            TaskContinuationOptions.ExecuteSynchronously,
            TaskScheduler.Default);
}
  • 首先,我们通过检查任务是否已经完成来进行快速路径优化。
  • 然后,我们简单地注册一个继续任务到原始任务,并传递 CancellationToken 参数。
  • 如果可能的话同步提取原始任务的结果(或异常,如果有)使用 TaskContinuationOptions.ExecuteSynchronously,否则使用 ThreadPool 线程(TaskScheduler.Default),同时观察取消标记 CancellationToken

如果原始任务在 CancellationToken 被取消之前完成,则返回的任务将存储结果。否则,任务将被取消并在等待时抛出 TaskCancelledException 异常。


为什么要使用completedTask.GetAwaiter().GetResult()而不是只用completedTask.Result - Glenn Slayden
2
@GlennSlayden,它抛出第一个(即“真正的”)异常(如果有的话),而不是 Task.Result 抛出的 AggregateException - i3arnon
好的,太棒了;谢谢。你是否也想将 cancellationToken.ThrowIfCancellationRequested(); 添加为你有用的扩展方法的第一行? - Glenn Slayden
@GlennSlayden 你可以这样做...虽然这并不是真正必要的。继续操作将立即被取消。此外,消费者可能会因为同步抛出异常而感到惊讶,而不是等待任务完成。 - i3arnon
1
@i3arnon 在 ContinueWith 方法中,cancellationToken 参数被描述为“将分配给新继续任务的 CancellationToken”。您确定这实际上会在原始任务完成之前“提前”取消吗?文档表明情况并非如此。 - Jack Ukleja
@Schneider,它并没有。正如你引用的那样,在原始任务完成之前,新的 continuation 将被取消。 - i3arnon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接