停止Parallel.ForEachAsync

15

我对C#中的停止Parallel.ForEachAsync循环感兴趣(考虑到StopBreak之间的区别);对于Parallel.ForEach,我可以进行以下操作:

Parallel.ForEach(items, (item, state) =>
{
    if (cancellationToken.IsCancellationRequested)
    {
        state.Stop();
        return;
    }

    // some process on the item
    Process(item);
});

然而,由于我有一个需要异步执行的进程,我转而使用Parallel.ForEachAsyncForEachAsync没有Stop()方法,我可以像下面这样使用break来中断循环,但我想知道这是否是最有效的中断循环的方法(换句话说,当收到取消请求时,循环需要尽快停止)。
await Parallel.ForEachAsync(items, async (item, state) =>
{
    if (cancellationToken.IsCancellationRequested)
    {
        return;
    }

    // some async process on the item
    await ProcessAsync(item);
});

这个问题也很相关。它使用标志而不是 cancellationToken。 - Martin Wickman
2个回答

16
Parallel.ForEachAsyncbody委托有一个CancellationToken作为其第二个参数。这个令牌是由API提供的,它不是你在ParallelOptions中传递的同一个令牌。你可以将这个令牌转发给lambda内部调用的任何异步方法。如果你调用的是不可取消的方法,那么你最好在lambda内部的关键位置调用ThrowIfCancellationRequested
CancellationTokenSource cts = new();
ParallelOptions options = new() { CancellationToken = cts.Token };

try
{
    await Parallel.ForEachAsync(items, options, async (item, ct) =>
    {
        //...
        ct.ThrowIfCancellationRequested();
        //...
        await ProcessAsync(item, ct);
        //...
        ct.ThrowIfCancellationRequested();
        //...
    });
}
catch (OperationCanceledException ex)
{
    // ...
}

在上面的示例中,作为 lambda 参数提供的令牌 ct 不仅在 ParallelOptions.CancellationToken 被取消时取消,而且在 ProcessAsync 操作失败时也会取消。这种机制可以更快地传播异常。并行循环在发生错误时不会立即完成,因为它遵循禁止发出并忘记操作的原则。循环内部启动的所有操作必须在整个循环成功完成或失败之前完成。Lambda 中的令牌使得将此延迟降到最低成为可能。


ParallelOptions.CancellationToken 和你在回调函数中得到的那个(例如你的示例中的 ct)有什么区别?如果选项中的那个被取消,你在回调函数中得到的那个也会被取消吗? - Martin Wickman
1
@MartinWickman感谢您的评论。我编辑了答案并解决了这个问题。 - Theodor Zoulias
1
以下是关于此问题的相关内容:.NET 6 Parallel.ForEachAsync中需要两个取消标记吗? - Theodor Zoulias

-3
你需要像这样的东西:
await Parallel.ForEachAsync(items, async (item, state) =>
{
    await ProcessAsync(item, cancellationToken);
});


async Task ProcessAsync(string item, CancellationToken ct)
{
    while (!ct.IsCancellationRequested)
    {
        //Process
    }
}

1
这种方法的缺点在于它中断循环的速度非常快。例如,如果有 1e9 个项目,并且在刚开始处理第一个项目时请求取消操作,那么这种方法将在返回之前迭代所有剩余的 1e9 - 1 个项目。 - Dr. Strangelove

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接