如何结合使用TaskCompletionSource和CancellationTokenSource?

40

我有这样的代码(在此处进行了简化),等待完成任务:

var task_completion_source = new TaskCompletionSource<bool>();
observable.Subscribe(b => 
   { 
      if (b) 
          task_completion_source.SetResult(true); 
   });
await task_completion_source.Task;    

这个想法是订阅并等待布尔值流中的true。这将完成“任务”,我可以在await之后继续进行。

然而,我想取消等待,而不是取消订阅。我想传递取消令牌(以某种方式)给task_completion_source,这样当我取消令牌源时,await就会继续进行。

更新CancellationTokenSource与此代码无关,我只有来自它的令牌。


task_completion_source.SetCanceled 有什么问题吗?请注意,这假定您正确处理了任务取消 :) - Luaan
@Luaan,没有什么问题,只是没有正在运行的代码可以执行它。每个人都在等待某些东西--订阅等待数据(可能没有),await等待任务完成。 - astrowalker
如果您通过取消令牌取消异步进程,它将触发一个TaskCanceledException异常,从而结束等待(您需要处理异常)。 - Steve Bird
@SteveBird,你说的“取消异步进程”是指在CancellationTokenSource中设置取消吗?如果是,那么它是函数的外部方。如果你的意思是处理已取消的令牌,那么这正是我的问题。我不明白我如何首先使用该令牌。 - astrowalker
3个回答

61

如果我理解得正确,你可以这样做:

using (cancellationToken.Register(() => {
    // this callback will be executed when token is cancelled
    task_comletion_source.TrySetCanceled();
})) {
    // ...
    await task_comletion_source.Task;
}

请注意,它将在您的等待操作中引发异常,您需要处理它。


3
该操作已经澄清,您可能需要使用CancellationToken token; token.Register(...来匹配他的代码。此外,您可能希望使用TrySetCanceled(并使用TrySetResult),这样如果任务已经完成,就不会抛出异常。 - Scott Chamberlain
非常感谢,令牌源不是问题所在,因为它没有参与(原始编辑中). - astrowalker
@ScottChamberlain,啊,谢谢你提醒我关于Try...的事情,虽然我没有意外掉进那个陷阱,但现在我也不会了,多亏了你。 - astrowalker
2
从其他答案的评论中可以看出,将Register调用的结果放入using块中以便将其正确处理掉可能是一个好主意。我认为这应该可以避免提到的资源泄漏。 - ygoe
2
@ygoe 谢谢,已完成。虽然我认为这只是一个很小的泄漏,但当然如果能避免就应该避免。 - Evk

18

我建议您不要自己构建。有许多关于取消令牌的边界情况很烦琐,很难做到完全正确。例如,如果从Register返回的注册信息没有被释放,可能会导致资源泄漏。

相反,您可以使用我的AsyncEx.Tasks中的Task.WaitAsync扩展方法:

var task_completion_source = new TaskCompletionSource<bool>();
observable.Subscribe(b => 
{ 
  if (b) 
    task_completion_source.SetResult(true); 
});
await task_completion_source.Task.WaitAsync(cancellationToken);

补充一下,我强烈建议您使用 ToTask 而不是显式地使用 TaskCompletionSource。再次强调,ToTask 会为您很好地处理边缘情况。


1
如果任务已经完成,Set* 将会抛出异常。默认情况下,Set*TrySet* 允许同步继续(在 await 之后,您的调用堆栈可能在 Subscribe 内部)。默认情况下,任务允许子任务。 - Stephen Cleary
@StephenCleary,在什么情况下会发生这样的边缘情况(Registration未被处理)? - Justin Skiles
2
@JustinSkiles:如果注册未被处理(例如,使用已接受答案中的代码),并且您拥有一个真正的(可取消)CancellationToken,它是长期存在的(即代表进程关闭),则注册将保持活动状态,TCS也将保持活动状态。如果在进程运行期间多次执行此代码,则每次运行时都会泄漏另一个注册和TCS。 - Stephen Cleary
@MgSam:不,团队成员都知道我的库。我认为他们认为它是一个不错的附加组件,所以NuGet是一个很好的选择。 - Stephen Cleary
1
有点矛盾:不要写自己的,但我自己写了一个 :)。 - Wouter
显示剩余2条评论

2

这是我自己翻译的内容。我差点犯了一个错误,没有处理注册表(感谢Stephen Cleary)。

    /// <summary>
    /// This allows a TaskCompletionSource to be await with a cancellation token and timeout.
    /// 
    /// Example usable:
    /// 
    ///     var tcs = new TaskCompletionSource<bool>();
    ///           ...
    ///     var result = await tcs.WaitAsync(timeoutTokenSource.Token);
    /// 
    /// A TaskCanceledException will be thrown if the given cancelToken is canceled before the tcs completes or errors. 
    /// </summary>
    /// <typeparam name="TResult">Result type of the TaskCompletionSource</typeparam>
    /// <param name="tcs">The task completion source to be used  </param>
    /// <param name="cancelToken">This method will throw an OperationCanceledException if the cancelToken is canceled</param>
    /// <param name="timeoutMs">This method will throw a TimeoutException if it doesn't complete within the given timeout, unless the timeout is less then or equal to 0 or Timeout.Infinite</param>
    /// <param name="updateTcs">If this is true and the given cancelToken is canceled then the underlying tcs will also be canceled.  If this is true a timeout occurs the underlying tcs will be faulted with a TimeoutException.</param>
    /// <returns>The tcs.Task</returns>
    public static async Task<TResult> WaitAsync<TResult>(this TaskCompletionSource<TResult> tcs, CancellationToken cancelToken, int timeoutMs = Timeout.Infinite, bool updateTcs = false)
    {
        // The overrideTcs is used so we can wait for either the give tcs to complete or the overrideTcs.  We do this using the Task.WhenAny method.
        // one issue with WhenAny is that it won't return when a task is canceled, it only returns when a task completes so we complete the
        // overrideTcs when either the cancelToken is canceled or the timeoutMs is reached.
        //
        var overrideTcs = new TaskCompletionSource<TResult>();
        using( var timeoutCancelTokenSource = (timeoutMs <= 0 || timeoutMs == Timeout.Infinite) ? null : new CancellationTokenSource(timeoutMs) )
        {
            var timeoutToken = timeoutCancelTokenSource?.Token ?? CancellationToken.None;
            using( var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancelToken, timeoutToken) )
            {
                // This method is called when either the linkedTokenSource is canceled.  This lets us assign a value to the overrideTcs so that
                // We can break out of the await WhenAny below.
                //
                void CancelTcs()
                {
                    if( updateTcs && !tcs.Task.IsCompleted )
                    {
                        // ReSharper disable once AccessToDisposedClosure (in this case, CancelTcs will never be called outside the using)
                        if( timeoutCancelTokenSource?.IsCancellationRequested ?? false )
                            tcs.TrySetException(new TimeoutException($"WaitAsync timed out after {timeoutMs}ms"));
                        else
                            tcs.TrySetCanceled();
                    }

                    overrideTcs.TrySetResult(default(TResult));
                }

                using( linkedTokenSource.Token.Register(CancelTcs) )
                {
                    try
                    {
                        await Task.WhenAny(tcs.Task, overrideTcs.Task);
                    }
                    catch { /* ignore */ }

                    // We always favor the result from the given tcs task if it has completed.
                    //
                    if( tcs.Task.IsCompleted )
                    {
                        // We do another await here so that if the tcs.Task has faulted or has been canceled we won't wrap those exceptions
                        // in a nested exception.  While technically accessing the tcs.Task.Result will generate the same exception the
                        // exception will be wrapped in a nested exception.  We don't want that nesting so we just await.
                        await tcs.Task;
                        return tcs.Task.Result;
                    }

                    // It wasn't the tcs.Task that got us our of the above WhenAny so go ahead and timeout or cancel the operation.
                    //
                    if( timeoutCancelTokenSource?.IsCancellationRequested ?? false )
                        throw new TimeoutException($"WaitAsync timed out after {timeoutMs}ms");

                    throw new OperationCanceledException();
                }
            }
        }
    }

如果在tcs获得结果或出错之前,cancelToken被取消,这将引发TaskCanceledException异常。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接