如何从TaskCompletionSource取消任务?

3
我正在尝试创建一个异步ProducerConsumerCollection,为此我使用了这个msdn页面(http://msdn.microsoft.com/en-us/library/hh873173.aspx(页面底部))。
现在我正在尝试添加一个超时,这是我所做的:
    public async Task<T> TakeWithTimeout(int timeout)
    {
            Task<T> takeTask = this.Take();

            if (timeout <= 0 || takeTask == await Task.WhenAny(this.tasks.Take(), Task.Delay(timeout)))
            {
                return await takeTask;
            }
            else
            {
                // Timeout
                return default(T);
            }
        }
    }

此代码存在问题,如果超时,它不会取消Take()方法创建的任务。

由于该任务已被TaskCompletionSource“创建”,因此我无法给它一个cancellationToken?

那么,如何取消它并正确实现具有超时的Take操作呢?

谢谢:)

3个回答

6
编写一个能够取消安全的、支持异步操作的生产者/消费者集合并不容易。你需要做的是将 Take 修改为接受 CancellationToken 作为参数,并注册一个处理程序,以便在取消时取消 TaskCompletionSource
我强烈建议您使用内置了取消支持的 BufferBlock<T>
如果您无法使用TPL Dataflow(例如,您正在使用PCL或目标平台不支持Dataflow),那么您可以使用我开源的AsyncEx library中的生产者/消费者集合(例如AsyncProducerConsumerQueueAsyncCollection)。这两个集合都基于AsyncLockAsyncConditionVariable,这是一个我在我的博客中简要描述的设计(未涉及取消细节)。在这种设计中支持生产者/消费者集合中的取消关键在于支持AsyncConditionVariable.WaitAsync中的取消;一旦您的条件变量类型支持取消,那么您的集合也将轻松支持它。

这只是为了训练目的,所以我想实现自己的类 :)我将取消令牌“ct”添加到方法中,并添加了以下行(如果m_collection.Count == 0):ct.Register(() => tcs.TrySetResult(default(T)));这个TrySetResult被正确调用了,但是取消的任务似乎没有被正确关闭 :( - Nisalon
1
你还需要考虑一下,当“Put”从等待列表中检索到已经被取消的TCS时会发生什么。 - Stephen Cleary

2

我只是想分享一下如何取消来自TaskCompletionSource的任务的解决方案,因为这正是我自己所需要的。

我猜这个解决方案也可以用于你的具体需求,但它并不与特定的超时功能相关,所以这是一个通用的解决方案(或者我希望是这样)。

这是一个扩展方法:

    public static async Task WaitAsync<T>(this TaskCompletionSource<T> tcs, CancellationToken ctok)
    {

        CancellationTokenSource cts = null;
        CancellationTokenSource linkedCts = null;

        try {

            cts = new CancellationTokenSource();
            linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, ctok);

            var exitTok = linkedCts.Token;

            Func<Task> listenForCancelTaskFnc = async () => {
                await Task.Delay(-1, exitTok).ConfigureAwait(false); 
            };

            var cancelTask = listenForCancelTaskFnc();

            await Task.WhenAny(new Task[] { tcs.Task, cancelTask }).ConfigureAwait(false);

            cts.Cancel();

        } finally {

            if(linkedCts != null) linkedCts.Dispose();

        }

    }

使用方法:

    async Task TestAsync(CancellationToken ctok) {

        var tcs = new TaskCompletionSource<bool>();

        if (somethingOrTheOther) {
            tcs.TrySetResult(true);
        }

        await tcs.WaitAsync(ctok);

    }

这个想法是创建一个监督异步任务,它会一直等待直到被取消。我们可以使用这个任务来在TaskCompletionSource还没有满足条件但我们需要退出的情况下“提前退出”。监督任务保证在WaitAsync结束时被取消,无论它如何从WhenAny中退出。如果TaskCompletionSource已经满足了结果并且WhenAny完成了,监督睡眠任务会短暂地保持不变,直到下一行调用cts.Cancel(),或者它被exitToken取消,这是传入的ctok或内部cts.Token的组合标记。总之,希望这样说得通——如果这段代码有任何问题,请告诉我...

1
取消令牌可能很棘手,特别是在取消令牌从未被触发的情况下。当用户代码调用 TestAsync(CancellationToken.None) 时会发生什么?此外,请注意 CreateLinkedTokenSource 是为数不多的几个 API 之一,您必须处理生成的 cts,否则很容易遇到内存泄漏问题。 - Stephen Cleary
2
我在这里有一个更强大的解决方案 - 使用方法 - Stephen Cleary
@StephenCleary 我认为传入 CancellationToken.None 没有问题。这只是意味着你依赖于 TaskCompletionSource 最终被触发。我已经修复了代码以处理 CreateLinkedTokenSource 的释放。感谢您的帮助。同时也感谢您提供的替代实现。它使用了我通常不用的东西,所以比我的实现更加高级。除了我目前的代码之外,还有什么问题吗?在这一点上,我的实现有什么“不够健壮”的地方吗? - Bernie Habermeier
沉睡线程不监视 exitTok - 只有 ctok,而它永远不会被取消。 - Stephen Cleary
哎呀,那完全不是我想要的。谢谢!已在上面的代码中修复。 - Bernie Habermeier
显示剩余2条评论

0
一个可能稍微不那么复杂/更加灵活的方法是让TaskCompletionSource完成它的工作,然后在你准备返回任务的时候,可以使用类似于Bernie答案中的技术来“附加”取消操作。
扩展方法:
public static class TaskExtensions
{
    /// <summary>
    /// Apply cancellation support to an arbitrary task
    /// </summary>
    public static async Task OrCancelledBy(this Task task, CancellationToken cancellationToken) =>
        await Task.WhenAny(
            task, 
            Task.Delay(-1, cancellationToken) // Wait forever until cancellation
        );

    /// <summary>
    /// Apply cancellation support to an arbitrary task that returns TResult
    /// </summary>
    public static async Task<TResult> OrCancelledBy<TResult>(this Task<TResult> task, CancellationToken cancellationToken)
    {
        async Task<TResult> WaitUntilCancelled(CancellationToken ct)
        {
            await Task.Delay(-1, ct); // Wait forever until cancellation
            return default!;
        }

        return await await Task.WhenAny(task, WaitUntilCancelled(cancellationToken));
    }
}

使用方法:

    async Task TestAsync(CancellationToken ct) 
    {
        var tcs = new TaskCompletionSource<bool>();

        if (somethingOrTheOther) {
            tcs.TrySetResult(true);
        }

        await tcs.Task.OrCancelledBy(ct);
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接