在C#中正确使用取消标记

6

我最近接触了C#语言并致力于从cassandra中获取数据。下面的代码可以从Cassandra中获取数据并且它能正常工作。

我唯一遇到的问题是在ProcessCassQuery方法中,我将CancellationToken.None传递给了我的requestExecuter函数,这可能不是正确的做法。应该如何处理这种情况,并采取什么措施来正确处理它?

/**
 *
 * Below method does multiple async calls on each table for their corresponding id's by limiting it down using Semaphore.
 *
 */
private async Task<List<T>> ProcessCassQueries<T>(IList<int> ids, Func<CancellationToken, int, Task<T>> mapperFunc, string msg) where T : class
{
    var tasks = ids.Select(async id => 
    {
        await semaphore.WaitAsync();
        try
        {
            ProcessCassQuery(ct => mapperFunc(ct, id), msg);
        }
        finally
        {
            semaphore.Release();
        }
    });

  return (await Task.WhenAll(tasks)).Where(e => e != null).ToList();
}

// this might not be good idea to do it. how can I improve below method?
private Task<T> ProcessCassQuery<T>(Func<CancellationToken, Task<T>> requestExecuter, string msg) where T : class
{
    return requestExecuter(CancellationToken.None);
}

你的代码会因为什么而取消?用户交互吗?你需要创建一个 CancellationTokenSource,然后传递一个对你的 Func<T> 的引用 -- 你需要检查是否调用了 Cancel() 方法。我将写一个快速示例。 - Glenn Ferrie
1个回答

11
根据官方文档所述,取消标记允许传播取消信号。例如,这可以用于取消长时间运行的操作,因为某种原因不再有意义或者仅仅是因为太长时间了。 CancelationTokenSource将允许您获取自定义令牌,您可以将其传递给requestExecutor。它还提供了取消正在运行的Task的手段。
private CancellationTokenSource cts = new CancellationTokenSource();

// ...

private Task<T> ProcessCassQuery<T>(Func<CancellationToken, Task<T>> requestExecuter, string msg) where T : class
{
    return requestExecuter(cts.Token);
}

例子

让我们看一个不同的最小/虚拟示例,这样我们就可以查看其内部。

考虑以下方法GetSomethingAsync,它将每秒返回递增的整数。

调用token.ThrowIfCancellationRequested将确保在外部动作取消此进程时引发TaskCanceledException。还可以采取其他方法,例如,检查token.IsCancellationRequested是否为真,并采取相应措施。

private static async IAsyncEnumerable<int> GetSomethingAsync(CancellationToken token)
{
    Console.WriteLine("starting to get something");

    token.ThrowIfCancellationRequested();

    for (var i = 0; i < 100; i++)
    {
        await Task.Delay(1000, token);
        yield return i;
    }

    Console.WriteLine("finished getting something");
}

现在让我们构建主方法来调用上面的方法。

public static async Task Main()
{    
    var cts = new CancellationTokenSource();

    // cancel it after 3 seconds, just for demo purposes
    cts.CancelAfter(3000);
    // or: Task.Delay(3000).ContinueWith(_ => { cts.Cancel(); });

    await foreach (var i in GetSomethingAsync(cts.Token))
    {
        Console.WriteLine(i);
    }
}

如果我们运行此代码,将会得到类似于以下的输出:
starting to get something
0
1
Unhandled exception. System.Threading.Tasks.TaskCanceledException: A task was canceled.

当然,这只是一个虚拟的例子,取消可以由用户操作或某个事件触发,不一定非得是一个计时器。

我现在明白了,谢谢你的解释。在我的情况下,我想在1000毫秒后超时。在我的示例中,我该如何实现?如果可能,你能为我的场景提供一个例子吗? - dragons
一种选择是将您的requestExecutor更改为使用token.ThrowIfCancellationRequested(),然后在ProcessCassQuery中,在调用requestExecutor之前应该能够使用cts.CancelAfter(1000);。您可以捕获TaskCanceledException以优雅地处理取消。 - rph
您可能/应该更喜欢将令牌传播到Cassandra驱动程序中,并让它处理令牌,而不是自己使用ThrowIfCancellationRequested。只是现在不记得驱动程序操作是否支持取消令牌。 - rph
不认为驱动程序支持这个,所以需要在我的代码中高效地处理它。 - dragons
我明白了。看起来这个主题有一些讨论 here,其中包括一个合理的解释为什么不支持它。因此,我建议您使用上面解释的 CancellationTokenCancellationTokenSource 将一个工作版本发布到 Code Review Stack Exchange,即使它不是最有效的版本,也是一个更好的地方,在那里可以得到指导。 - rph

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接